Skip to content

Commit

Permalink
Where as interface
Browse files Browse the repository at this point in the history
  • Loading branch information
latolukasz committed Jan 8, 2024
1 parent 2929a54 commit 19d8a81
Showing 1 changed file with 13 additions and 3 deletions.
16 changes: 13 additions & 3 deletions flush_async_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ func ConsumeAsyncBuffer(orm ORM, errF func(err error)) (stop func()) {
func consumeAsyncTempEvent(orm ORM, schema *entitySchema, errF func(err error)) {
r := orm.Engine().Redis(schema.getForcedRedisCode())
buffer := make([]any, redisRPushPackSize)
var values []any
var ok bool
for {
res := func() bool {
defer func() {
Expand All @@ -75,12 +77,20 @@ func consumeAsyncTempEvent(orm ORM, schema *entitySchema, errF func(err error))
time.Sleep(time.Second * 3)
}
}()
e := schema.asyncTemporaryQueue.Dequeue()
if e == nil {
for {
values, ok = schema.asyncTemporaryQueue.TryDequeue()
if !ok {
time.Sleep(time.Millisecond * 200)
continue
}
break
}

if values == nil {
return false
}
rows := 1
asJSON, _ := jsoniter.ConfigFastest.MarshalToString(e)
asJSON, _ := jsoniter.ConfigFastest.MarshalToString(values)
buffer[0] = asJSON
breakMe := false
for i := 1; i < redisRPushPackSize; i++ {
Expand Down

0 comments on commit 19d8a81

Please sign in to comment.