Skip to content

Commit

Permalink
Where as interface
Browse files Browse the repository at this point in the history
  • Loading branch information
latolukasz committed Jan 9, 2024
1 parent becfc88 commit 6ccfe5e
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 10 deletions.
6 changes: 3 additions & 3 deletions flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func (orm *ormImplementation) handleDeletes(async bool, schema *entitySchema, op
}
asJSON, _ := jsoniter.ConfigFastest.MarshalToString(bind)
data[5] = asJSON
publishAsyncEvent(schema, data)
publishAsyncEvent(logTableSchema, data)
}
for _, p := range orm.engine.pluginFlush {
if bind == nil {
Expand Down Expand Up @@ -341,7 +341,7 @@ func (orm *ormImplementation) handleInserts(async bool, schema *entitySchema, op
}
asJSON, _ := jsoniter.ConfigFastest.MarshalToString(bind)
data[5] = asJSON
publishAsyncEvent(schema, data)
publishAsyncEvent(logTableSchema, data)
}
if hasLocalCache {
orm.flushPostActions = append(orm.flushPostActions, func(_ ORM) {
Expand Down Expand Up @@ -500,7 +500,7 @@ func (orm *ormImplementation) handleUpdates(async bool, schema *entitySchema, op
data[5] = asJSON
asJSON, _ = jsoniter.ConfigFastest.MarshalToString(newBind)
data[6] = asJSON
publishAsyncEvent(schema, data)
publishAsyncEvent(logTableSchema, data)
}

if update.getEntity() == nil {
Expand Down
31 changes: 27 additions & 4 deletions read_flush_async_events.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
package beeorm

import jsoniter "github.com/json-iterator/go"
import (
"slices"

jsoniter "github.com/json-iterator/go"
)

type AsyncFlushEvents interface {
EntitySchemas() []EntitySchema
EventsCount() uint64
ErrorsCount() uint64
Events(total int) []FlushEvent
Errors(total int) []FlushEventWithError
Errors(total int, last bool) []FlushEventWithError
TrimEvents(total int)
TrimErrors(total int)
RedilPool() string
RedisList() string
}

type asyncFlushEvents struct {
Expand All @@ -33,6 +39,14 @@ func (s *asyncFlushEvents) EntitySchemas() []EntitySchema {
return s.schemas
}

func (s *asyncFlushEvents) RedilPool() string {
return s.redisPoolName
}

func (s *asyncFlushEvents) RedisList() string {
return s.listName
}

func (s *asyncFlushEvents) EventsCount() uint64 {
r := s.orm.Engine().Redis(s.redisPoolName)
return uint64(r.LLen(s.orm, s.listName))
Expand Down Expand Up @@ -60,9 +74,15 @@ func (s *asyncFlushEvents) Events(total int) []FlushEvent {
return results
}

func (s *asyncFlushEvents) Errors(total int) []FlushEventWithError {
func (s *asyncFlushEvents) Errors(total int, last bool) []FlushEventWithError {
r := s.orm.Engine().Redis(s.redisPoolName)
events := r.LRange(s.orm, s.listName+flushAsyncEventsListErrorSuffix, 0, int64(total*2-1))
var events []string

if last {
events = r.LRange(s.orm, s.listName+flushAsyncEventsListErrorSuffix, int64(-total)*2, -1)
} else {
events = r.LRange(s.orm, s.listName+flushAsyncEventsListErrorSuffix, 0, int64(total*2-1))
}
results := make([]FlushEventWithError, len(events)/2)
k := 0
for i, event := range events {
Expand All @@ -80,6 +100,9 @@ func (s *asyncFlushEvents) Errors(total int) []FlushEventWithError {
k++
}
}
if last {
slices.Reverse(results)
}
return results
}

Expand Down
6 changes: 3 additions & 3 deletions read_flush_async_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestAsync(t *testing.T) {
assert.Contains(t, events[0].QueryAttributes[1], "test 0")
assert.Len(t, events[1].QueryAttributes, 2)
assert.Contains(t, events[1].QueryAttributes[1], "test 1")
errors := stat.Errors(100)
errors := stat.Errors(100, false)
assert.Len(t, errors, 0)

stat.TrimEvents(1)
Expand All @@ -76,7 +76,7 @@ func TestAsync(t *testing.T) {

assert.Equal(t, uint64(0), stat.EventsCount())
assert.Equal(t, uint64(asyncConsumerPage+10), stat.ErrorsCount())
errors = stat.Errors(10)
errors = stat.Errors(10, false)
assert.Len(t, errors, 10)
assert.Len(t, errors[0].QueryAttributes, 2)
assert.Contains(t, errors[0].QueryAttributes[1], "test 0")
Expand All @@ -86,7 +86,7 @@ func TestAsync(t *testing.T) {
assert.Equal(t, "Error 1054 (42S22): Unknown column 'Name' in 'field list'", errors[1].Error)

stat.TrimErrors(1)
errors = stat.Errors(10)
errors = stat.Errors(10, false)
assert.Len(t, errors, 10)
assert.Contains(t, errors[0].QueryAttributes[1], "test 1")
}
Expand Down

0 comments on commit 6ccfe5e

Please sign in to comment.