Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add AddTagTTL and AddEdgeTTL for Session Pool #323

Merged
merged 1 commit into from
Feb 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
20 changes: 20 additions & 0 deletions session_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,16 @@ func (pool *SessionPool) CreateTag(tag LabelSchema) (*ResultSet, error) {
return rs, nil
}

func (pool *SessionPool) AddTagTTL(tagName string, colName string, duration uint) (*ResultSet, error) {
q := fmt.Sprintf(`ALTER TAG %s TTL_DURATION = %d, TTL_COL = "%s";`, tagName, duration, colName)
rs, err := pool.ExecuteAndCheck(q)
if err != nil {
return nil, err
}

return rs, nil
}

func (pool *SessionPool) DescTag(tagName string) ([]Label, error) {
q := fmt.Sprintf("DESC TAG %s;", tagName)
rs, err := pool.ExecuteAndCheck(q)
Expand Down Expand Up @@ -353,6 +363,16 @@ func (pool *SessionPool) CreateEdge(edge LabelSchema) (*ResultSet, error) {
return rs, nil
}

func (pool *SessionPool) AddEdgeTTL(tagName string, colName string, duration uint) (*ResultSet, error) {
q := fmt.Sprintf(`ALTER EDGE %s TTL_DURATION = %d, TTL_COL = "%s";`, tagName, duration, colName)
rs, err := pool.ExecuteAndCheck(q)
if err != nil {
return nil, err
}

return rs, nil
}

func (pool *SessionPool) DescEdge(edgeName string) ([]Label, error) {
q := fmt.Sprintf("DESC EDGE %s;", edgeName)
rs, err := pool.ExecuteAndCheck(q)
Expand Down
181 changes: 166 additions & 15 deletions session_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ func TestSessionPoolSpaceChange(t *testing.T) {
assert.Equal(t, resultSet.GetSpaceName(), "test_space_1", "space name should be test_space_1")
}

func TestSessionPoolApplySchema(t *testing.T) {
func TestSessionPoolCreateTagAndEdge(t *testing.T) {
spaceName := "test_space_schema"
err := prepareSpace(spaceName)
if err != nil {
Expand Down Expand Up @@ -415,27 +415,29 @@ func TestSessionPoolApplySchema(t *testing.T) {
},
},
}

_, err = sessionPool.CreateTag(tagSchema)
if err != nil {
t.Fatal(err)
}

tags, err := sessionPool.ShowTags()
if err != nil {
t.Fatal(err)
}
assert.Equal(t, 1, len(tags), "should have 1 tags")
assert.Equal(t, "account", tags[0].Name, "tag name should be account")
assert.Equal(t, 1, len(tags))
assert.Equal(t, "account", tags[0].Name)
labels, err := sessionPool.DescTag("account")
if err != nil {
t.Fatal(err)
}
assert.Equal(t, 3, len(labels), "should have 3 labels")
assert.Equal(t, "name", labels[0].Field, "field name should be name")
assert.Equal(t, "string", labels[0].Type, "field type should be string")
assert.Equal(t, "email", labels[1].Field, "field name should be email")
assert.Equal(t, "string", labels[1].Type, "field type should be string")
assert.Equal(t, "phone", labels[2].Field, "field name should be phone")
assert.Equal(t, "int64", labels[2].Type, "field type should be int64")
assert.Equal(t, 3, len(labels))
assert.Equal(t, "name", labels[0].Field)
assert.Equal(t, "string", labels[0].Type)
assert.Equal(t, "email", labels[1].Field)
assert.Equal(t, "string", labels[1].Type)
assert.Equal(t, "phone", labels[2].Field)
assert.Equal(t, "int64", labels[2].Type)

edgeSchema := LabelSchema{
Name: "account_email",
Expand All @@ -446,23 +448,172 @@ func TestSessionPoolApplySchema(t *testing.T) {
},
},
}

_, err = sessionPool.CreateEdge(edgeSchema)
if err != nil {
t.Fatal(err)
}

edges, err := sessionPool.ShowEdges()
if err != nil {
t.Fatal(err)
}
assert.Equal(t, 1, len(edges), "should have 1 edges")
assert.Equal(t, "account_email", edges[0].Name, "edge name should be account_email")
assert.Equal(t, 1, len(edges))
assert.Equal(t, "account_email", edges[0].Name)
labels, err = sessionPool.DescEdge("account_email")
if err != nil {
t.Fatal(err)
}
assert.Equal(t, 1, len(labels), "should have 1 labels")
assert.Equal(t, "email", labels[0].Field, "field name should be email")
assert.Equal(t, "string", labels[0].Type, "field type should be string")
assert.Equal(t, 1, len(labels))
assert.Equal(t, "email", labels[0].Field)
assert.Equal(t, "string", labels[0].Type)
}

func TestSessionPoolAddTTL(t *testing.T) {
spaceName := "test_space_ttl"
err := prepareSpace(spaceName)
if err != nil {
t.Fatal(err)
}
defer dropSpace(spaceName)

hostAddress := HostAddress{Host: address, Port: port}
config, err := NewSessionPoolConf(
"root",
"nebula",
[]HostAddress{hostAddress},
spaceName)
if err != nil {
t.Errorf("failed to create session pool config, %s", err.Error())
}

// allow only one session in the pool so it is easier to test
config.maxSize = 1

// create session pool
sessionPool, err := NewSessionPool(*config, DefaultLogger{})
if err != nil {
t.Fatal(err)
}
defer sessionPool.Close()

spaces, err := sessionPool.ShowSpaces()
if err != nil {
t.Fatal(err)
}
assert.LessOrEqual(t, 1, len(spaces), "should have at least 1 space")
var spaceNames []string
for _, space := range spaces {
spaceNames = append(spaceNames, space.Name)
}
assert.Contains(t, spaceNames, spaceName)

tagSchema := LabelSchema{
Name: "user",
Fields: []LabelFieldSchema{
{
Field: "created_at",
Type: "int64",
Nullable: false,
},
},
}

_, err = sessionPool.CreateTag(tagSchema)
if err != nil {
t.Fatal(err)
}

// Add TTL to tag
_, err = sessionPool.AddTagTTL("user", "created_at", 5)
if err != nil {
t.Fatal(err)
}

tags, err := sessionPool.ShowTags()
if err != nil {
t.Fatal(err)
}
assert.Equal(t, 1, len(tags))
assert.Equal(t, "user", tags[0].Name)
labels, err := sessionPool.DescTag("user")
if err != nil {
t.Fatal(err)
}
assert.Equal(t, 1, len(labels))
assert.Equal(t, "created_at", labels[0].Field)
assert.Equal(t, "int64", labels[0].Type)

edgeSchema := LabelSchema{
Name: "friend",
Fields: []LabelFieldSchema{
{
Field: "created_at",
Type: "int64",
Nullable: false,
},
},
}

_, err = sessionPool.CreateEdge(edgeSchema)
if err != nil {
t.Fatal(err)
}

// Add TTL to edge
_, err = sessionPool.AddEdgeTTL("friend", "created_at", 3)
if err != nil {
t.Fatal(err)
}

edges, err := sessionPool.ShowEdges()
if err != nil {
t.Fatal(err)
}
assert.Equal(t, 1, len(edges))
assert.Equal(t, "friend", edges[0].Name)
labels, err = sessionPool.DescEdge("friend")
if err != nil {
t.Fatal(err)
}
assert.Equal(t, 1, len(labels))
assert.Equal(t, "created_at", labels[0].Field)
assert.Equal(t, "int64", labels[0].Type)

// Wait for 5 seconds to wait for the schema is ready
time.Sleep(5 * time.Second)

now := time.Now().Unix()
// Insert vertices and edges
q := fmt.Sprintf(`INSERT VERTEX user(created_at) VALUES "test1":(%d);`, now)
_, err = sessionPool.ExecuteAndCheck(q)
if err != nil {
t.Fatal(err)
}
q = fmt.Sprintf(`INSERT VERTEX user(created_at) VALUES "test2":(%d);`, now)
_, err = sessionPool.ExecuteAndCheck(q)
if err != nil {
t.Fatal(err)

}
q = fmt.Sprintf(`INSERT EDGE friend(created_at) VALUES "test1" -> "test2":(%d);`, now)
_, err = sessionPool.ExecuteAndCheck(q)
if err != nil {
t.Fatal(err)
}

rs, err := sessionPool.ExecuteAndCheck(`FETCH PROP ON friend "test1" -> "test2" YIELD edge AS e;`)
if err != nil {
t.Fatal(err)
}
assert.Equal(t, 1, len(rs.GetRows()))
// Sleep for 5 seconds to wait for the tag to be expired
time.Sleep(5 * time.Second)
rs, err = sessionPool.ExecuteAndCheck(`FETCH PROP ON friend "test1" -> "test2" YIELD edge AS e;`)
if err != nil {
t.Fatal(err)
}
assert.Equal(t, 0, len(rs.GetRows()))
}

func TestIdleSessionCleaner(t *testing.T) {
Expand Down