Skip to content

Commit

Permalink
feat(eventstore): order by creation_date and sequence (#5568)
Browse files Browse the repository at this point in the history
* feat(eventstore): order by `creation_date` and `sequence`

* fix(logstore): use correct event type

---------

Co-authored-by: Livio Spring <livio.a@gmail.com>
  • Loading branch information
adlerhurst and livio-a committed Apr 5, 2023
1 parent cf9d74f commit 4c1169b
Show file tree
Hide file tree
Showing 6 changed files with 21 additions and 20 deletions.
4 changes: 2 additions & 2 deletions internal/eventstore/repository/sql/crdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,10 +255,10 @@ func (db *CRDB) db() *sql.DB {

func (db *CRDB) orderByEventSequence(desc bool) string {
if desc {
return " ORDER BY event_sequence DESC"
return " ORDER BY creation_date DESC, event_sequence DESC"
}

return " ORDER BY event_sequence"
return " ORDER BY creation_date, event_sequence"
}

func (db *CRDB) eventQuery() string {
Expand Down
2 changes: 1 addition & 1 deletion internal/eventstore/repository/sql/crdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1166,7 +1166,7 @@ func TestCRDB_Push_ResourceOwner(t *testing.T) {
}
}

rows, err := testCRDBClient.Query("SELECT resource_owner FROM eventstore.events WHERE aggregate_type = $1 AND aggregate_id = ANY($2) ORDER BY event_sequence", tt.fields.aggregateType, tt.fields.aggregateIDs)
rows, err := testCRDBClient.Query("SELECT resource_owner FROM eventstore.events WHERE aggregate_type = $1 AND aggregate_id = ANY($2) ORDER BY creation_date, event_sequence", tt.fields.aggregateType, tt.fields.aggregateIDs)
if err != nil {
t.Error("unable to query inserted rows: ", err)
return
Expand Down
14 changes: 7 additions & 7 deletions internal/eventstore/repository/sql/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,7 +595,7 @@ func Test_query_events_mocked(t *testing.T) {
},
fields: fields{
mock: newMockClient(t).expectQuery(t,
`SELECT creation_date, event_type, event_sequence, previous_aggregate_sequence, previous_aggregate_type_sequence, event_data, editor_service, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE \( aggregate_type = \$1 \) ORDER BY event_sequence DESC`,
`SELECT creation_date, event_type, event_sequence, previous_aggregate_sequence, previous_aggregate_type_sequence, event_data, editor_service, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE \( aggregate_type = \$1 \) ORDER BY creation_date DESC, event_sequence DESC`,
[]driver.Value{repository.AggregateType("user")},
),
},
Expand Down Expand Up @@ -624,7 +624,7 @@ func Test_query_events_mocked(t *testing.T) {
},
fields: fields{
mock: newMockClient(t).expectQuery(t,
`SELECT creation_date, event_type, event_sequence, previous_aggregate_sequence, previous_aggregate_type_sequence, event_data, editor_service, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE \( aggregate_type = \$1 \) ORDER BY event_sequence LIMIT \$2`,
`SELECT creation_date, event_type, event_sequence, previous_aggregate_sequence, previous_aggregate_type_sequence, event_data, editor_service, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE \( aggregate_type = \$1 \) ORDER BY creation_date, event_sequence LIMIT \$2`,
[]driver.Value{repository.AggregateType("user"), uint64(5)},
),
},
Expand Down Expand Up @@ -653,7 +653,7 @@ func Test_query_events_mocked(t *testing.T) {
},
fields: fields{
mock: newMockClient(t).expectQuery(t,
`SELECT creation_date, event_type, event_sequence, previous_aggregate_sequence, previous_aggregate_type_sequence, event_data, editor_service, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE \( aggregate_type = \$1 \) ORDER BY event_sequence DESC LIMIT \$2`,
`SELECT creation_date, event_type, event_sequence, previous_aggregate_sequence, previous_aggregate_type_sequence, event_data, editor_service, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE \( aggregate_type = \$1 \) ORDER BY creation_date DESC, event_sequence DESC LIMIT \$2`,
[]driver.Value{repository.AggregateType("user"), uint64(5)},
),
},
Expand Down Expand Up @@ -683,7 +683,7 @@ func Test_query_events_mocked(t *testing.T) {
},
fields: fields{
mock: newMockClient(t).expectQuery(t,
`SELECT creation_date, event_type, event_sequence, previous_aggregate_sequence, previous_aggregate_type_sequence, event_data, editor_service, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events AS OF SYSTEM TIME '-1 ms' WHERE \( aggregate_type = \$1 \) ORDER BY event_sequence DESC LIMIT \$2`,
`SELECT creation_date, event_type, event_sequence, previous_aggregate_sequence, previous_aggregate_type_sequence, event_data, editor_service, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events AS OF SYSTEM TIME '-1 ms' WHERE \( aggregate_type = \$1 \) ORDER BY creation_date DESC, event_sequence DESC LIMIT \$2`,
[]driver.Value{repository.AggregateType("user"), uint64(5)},
),
},
Expand Down Expand Up @@ -712,7 +712,7 @@ func Test_query_events_mocked(t *testing.T) {
},
fields: fields{
mock: newMockClient(t).expectQueryErr(t,
`SELECT creation_date, event_type, event_sequence, previous_aggregate_sequence, previous_aggregate_type_sequence, event_data, editor_service, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE \( aggregate_type = \$1 \) ORDER BY event_sequence DESC`,
`SELECT creation_date, event_type, event_sequence, previous_aggregate_sequence, previous_aggregate_type_sequence, event_data, editor_service, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE \( aggregate_type = \$1 \) ORDER BY creation_date DESC, event_sequence DESC`,
[]driver.Value{repository.AggregateType("user")},
sql.ErrConnDone),
},
Expand Down Expand Up @@ -741,7 +741,7 @@ func Test_query_events_mocked(t *testing.T) {
},
fields: fields{
mock: newMockClient(t).expectQuery(t,
`SELECT creation_date, event_type, event_sequence, previous_aggregate_sequence, previous_aggregate_type_sequence, event_data, editor_service, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE \( aggregate_type = \$1 \) ORDER BY event_sequence DESC`,
`SELECT creation_date, event_type, event_sequence, previous_aggregate_sequence, previous_aggregate_type_sequence, event_data, editor_service, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE \( aggregate_type = \$1 \) ORDER BY creation_date DESC, event_sequence DESC`,
[]driver.Value{repository.AggregateType("user")},
&repository.Event{Sequence: 100}),
},
Expand Down Expand Up @@ -809,7 +809,7 @@ func Test_query_events_mocked(t *testing.T) {
},
fields: fields{
mock: newMockClient(t).expectQuery(t,
`SELECT creation_date, event_type, event_sequence, previous_aggregate_sequence, previous_aggregate_type_sequence, event_data, editor_service, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE \( aggregate_type = \$1 \) OR \( aggregate_type = \$2 AND aggregate_id = \$3 \) ORDER BY event_sequence DESC LIMIT \$4`,
`SELECT creation_date, event_type, event_sequence, previous_aggregate_sequence, previous_aggregate_type_sequence, event_data, editor_service, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE \( aggregate_type = \$1 \) OR \( aggregate_type = \$2 AND aggregate_id = \$3 \) ORDER BY creation_date DESC, event_sequence DESC LIMIT \$4`,
[]driver.Value{repository.AggregateType("user"), repository.AggregateType("org"), "asdf42", uint64(5)},
),
},
Expand Down
10 changes: 5 additions & 5 deletions internal/eventstore/v1/internal/repository/sql/db_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ const (

var (
eventColumns = []string{"creation_date", "event_type", "event_sequence", "previous_aggregate_sequence", "event_data", "editor_service", "editor_user", "resource_owner", "instance_id", "aggregate_type", "aggregate_id", "aggregate_version"}
expectedFilterEventsLimitFormat = regexp.MustCompile(selectEscaped + ` \) ORDER BY event_sequence LIMIT \$2`).String()
expectedFilterEventsDescFormat = regexp.MustCompile(selectEscaped + ` \) ORDER BY event_sequence DESC`).String()
expectedFilterEventsAggregateIDLimit = regexp.MustCompile(selectEscaped + ` AND aggregate_id = \$2 \) ORDER BY event_sequence LIMIT \$3`).String()
expectedFilterEventsAggregateIDTypeLimit = regexp.MustCompile(selectEscaped + ` AND aggregate_id = \$2 \) ORDER BY event_sequence LIMIT \$3`).String()
expectedGetAllEvents = regexp.MustCompile(selectEscaped + ` \) ORDER BY event_sequence`).String()
expectedFilterEventsLimitFormat = regexp.MustCompile(selectEscaped + ` \) ORDER BY creation_date, event_sequence LIMIT \$2`).String()
expectedFilterEventsDescFormat = regexp.MustCompile(selectEscaped + ` \) ORDER BY creation_date DESC, event_sequence DESC`).String()
expectedFilterEventsAggregateIDLimit = regexp.MustCompile(selectEscaped + ` AND aggregate_id = \$2 \) ORDER BY creation_date, event_sequence LIMIT \$3`).String()
expectedFilterEventsAggregateIDTypeLimit = regexp.MustCompile(selectEscaped + ` AND aggregate_id = \$2 \) ORDER BY creation_date, event_sequence LIMIT \$3`).String()
expectedGetAllEvents = regexp.MustCompile(selectEscaped + ` \) ORDER BY creation_date, event_sequence`).String()

expectedInsertStatement = regexp.MustCompile(`INSERT INTO eventstore\.events ` +
`\(event_type, aggregate_type, aggregate_id, aggregate_version, creation_date, event_data, editor_user, editor_service, resource_owner, instance_id, previous_aggregate_sequence, previous_aggregate_type_sequence\) ` +
Expand Down
5 changes: 3 additions & 2 deletions internal/eventstore/v1/internal/repository/sql/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,11 @@ func buildQuery(ctx context.Context, db dialect.Database, queryFactory *es_model
query += where

if searchQuery.Columns == es_models.Columns_Event {
query += " ORDER BY event_sequence"
order := " ORDER BY creation_date, event_sequence"
if searchQuery.Desc {
query += " DESC"
order = " ORDER BY creation_date DESC, event_sequence DESC"
}
query += order
}

if searchQuery.Limit > 0 {
Expand Down
6 changes: 3 additions & 3 deletions internal/eventstore/v1/internal/repository/sql/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ func Test_buildQuery(t *testing.T) {
queryFactory: es_models.NewSearchQueryFactory().OrderDesc().AddQuery().AggregateTypes("user").Factory(),
},
res: res{
query: "SELECT creation_date, event_type, event_sequence, previous_aggregate_sequence, event_data, editor_service, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events AS OF SYSTEM TIME '-1 ms' WHERE ( aggregate_type = $1 ) ORDER BY event_sequence DESC",
query: "SELECT creation_date, event_type, event_sequence, previous_aggregate_sequence, event_data, editor_service, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events AS OF SYSTEM TIME '-1 ms' WHERE ( aggregate_type = $1 ) ORDER BY creation_date DESC, event_sequence DESC",
rowScanner: true,
values: []interface{}{es_models.AggregateType("user")},
},
Expand All @@ -447,7 +447,7 @@ func Test_buildQuery(t *testing.T) {
queryFactory: es_models.NewSearchQueryFactory().Limit(5).AddQuery().AggregateTypes("user").Factory(),
},
res: res{
query: "SELECT creation_date, event_type, event_sequence, previous_aggregate_sequence, event_data, editor_service, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events AS OF SYSTEM TIME '-1 ms' WHERE ( aggregate_type = $1 ) ORDER BY event_sequence LIMIT $2",
query: "SELECT creation_date, event_type, event_sequence, previous_aggregate_sequence, event_data, editor_service, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events AS OF SYSTEM TIME '-1 ms' WHERE ( aggregate_type = $1 ) ORDER BY creation_date, event_sequence LIMIT $2",
rowScanner: true,
values: []interface{}{es_models.AggregateType("user"), uint64(5)},
limit: 5,
Expand All @@ -459,7 +459,7 @@ func Test_buildQuery(t *testing.T) {
queryFactory: es_models.NewSearchQueryFactory().Limit(5).OrderDesc().AddQuery().AggregateTypes("user").Factory(),
},
res: res{
query: "SELECT creation_date, event_type, event_sequence, previous_aggregate_sequence, event_data, editor_service, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events AS OF SYSTEM TIME '-1 ms' WHERE ( aggregate_type = $1 ) ORDER BY event_sequence DESC LIMIT $2",
query: "SELECT creation_date, event_type, event_sequence, previous_aggregate_sequence, event_data, editor_service, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events AS OF SYSTEM TIME '-1 ms' WHERE ( aggregate_type = $1 ) ORDER BY creation_date DESC, event_sequence DESC LIMIT $2",
rowScanner: true,
values: []interface{}{es_models.AggregateType("user"), uint64(5)},
limit: 5,
Expand Down

1 comment on commit 4c1169b

@vercel
Copy link

@vercel vercel bot commented on 4c1169b Apr 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

docs – ./

docs-zitadel.vercel.app
zitadel-docs.vercel.app
docs-git-main-zitadel.vercel.app

Please sign in to comment.