Skip to content

Commit

Permalink
Merge pull request #21 from ischenkx/KAN-60
Browse files Browse the repository at this point in the history
KAN-60 - Fix the inconsistency in Depot
  • Loading branch information
ischenkx committed Jul 3, 2023
2 parents 19ceda5 + 8365ea9 commit d5a78bc
Show file tree
Hide file tree
Showing 13 changed files with 292 additions and 202 deletions.
61 changes: 43 additions & 18 deletions framework/plugins/depot/depot.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"kantoku/common/data/bimap"
"kantoku/common/data/transactional"
"kantoku/framework/plugins/depot/deps"
"kantoku/kernel"
"kantoku/kernel/platform"
Expand Down Expand Up @@ -36,24 +37,37 @@ func (depot *Depot) GroupTaskBimap() bimap.Bimap[string, string] {
return depot.groupTaskBimap
}

func (depot *Depot) Write(ctx context.Context, id string) error {
func (depot *Depot) Write(ctx context.Context, ids ...string) error {
// actually it breaks interface
if len(ids) != 1 {
return fmt.Errorf("multiple ids are not supported")
}
taskId := ids[0]

data := kernel.GetPluginData(ctx).GetWithDefault("dependencies", &PluginData{}).(*PluginData)

group, err := depot.Deps().MakeGroup(ctx, data.Dependencies...)
group, err := depot.Deps().NewGroup(ctx)
if err != nil {
return fmt.Errorf("failed to make a dependency group: %s", err)
return fmt.Errorf("failed to make group id: %w", err)
}

// TODO: possible inconsistency
// (if the task dependency group had been resolved and processed before the following line was executed)
if err := depot.groupTaskBimap.Save(ctx, group, id); err != nil {
return fmt.Errorf("failed to save the (group, task) pair in the bimap: %s", err)
if err := depot.groupTaskBimap.Save(ctx, group, taskId); err != nil {
return fmt.Errorf("failed to save the (group, task) pair in the bimap: %w", err)
}

if err := depot.Deps().InitGroup(ctx, group, data.Dependencies...); err != nil {
returningErr := fmt.Errorf("failed to make a dependency group: %s", err)
if err := depot.groupTaskBimap.DeleteByKey(ctx, group); err != nil {
return fmt.Errorf("%w\nalso failed to remove (group, task) pair from the bimap: %w",
returningErr, err)
}
return returningErr
}

return nil
}

func (depot *Depot) Read(ctx context.Context) (<-chan string, error) {
func (depot *Depot) Read(ctx context.Context) (<-chan transactional.Object[string], error) {
return depot.inputs.Read(ctx)
}

Expand All @@ -69,16 +83,27 @@ loop:
select {
case <-ctx.Done():
break loop
case id := <-ready:
taskID, err := depot.groupTaskBimap.ByKey(ctx, id)
if err != nil {
log.Println("failed to get a task assigned to the group:", err)
continue
}
if err := depot.inputs.Write(ctx, taskID); err != nil {
log.Println("failed to schedule a task:", err)
continue
}
case tx := <-ready:
func() {
defer tx.Rollback(ctx)
id, err := tx.Get(ctx)
if err != nil {
log.Println("failed to get an id of a group:", err)
return
}
taskID, err := depot.groupTaskBimap.ByKey(ctx, id)
if err != nil {
log.Println("failed to get a task assigned to the group:", err)
return
}
if err := depot.inputs.Write(ctx, taskID); err != nil {
log.Println("failed to schedule a task:", err)
return
}
if err := tx.Commit(ctx); err != nil {
log.Println("INCONSISTENCY! failed to commit reading group:", err)
}
}()
}
}

Expand Down
8 changes: 6 additions & 2 deletions framework/plugins/depot/deps/deps.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@ type Deps interface {
Dependency(ctx context.Context, id string) (Dependency, error)
Resolve(ctx context.Context, id string) error
Group(ctx context.Context, id string) (Group, error)
Make(ctx context.Context) (Dependency, error) // creates a single dependency
MakeGroup(ctx context.Context, ids ...string) (string, error) // creates a group from a set of dependencies
NewDependency(ctx context.Context) (Dependency, error) // creates a single dependency

// NewGroup generates id for a group, which then can be passed to SaveGroup
NewGroup(ctx context.Context) (string, error)
// InitGroup saves group with given id and dependencies to Deps
InitGroup(ctx context.Context, groupId string, depIds ...string) error
Ready(ctx context.Context) (<-chan transactional.Object[string], error)
}
2 changes: 1 addition & 1 deletion framework/plugins/futdep/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (manager *Manager) Make(ctx context.Context, id future.ID) (string, error)
return "", fmt.Errorf("failed to get data from fut2dep: %s", err)
}

dep, err := manager.deps.Make(ctx)
dep, err := manager.deps.NewDependency(ctx)
if err != nil {
return "", fmt.Errorf("failed to make a dependency: %s", err)
}
Expand Down
2 changes: 1 addition & 1 deletion framework/plugins/taskdep/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (manager *Manager) SubtaskDependency(ctx context.Context, task string) (str
return "", err
}

dep, err := manager.deps.Make(ctx)
dep, err := manager.deps.NewDependency(ctx)
if err != nil {
return "", err
}
Expand Down
4 changes: 1 addition & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@ module kantoku
go 1.19

require (
github.com/go-co-op/gocron v1.19.0
github.com/google/uuid v1.3.0
github.com/jackc/pgx/v5 v5.3.0
github.com/redis/go-redis/v9 v9.0.2
github.com/samber/lo v1.37.0
github.com/satori/go.uuid v1.2.0
github.com/stretchr/testify v1.8.2
)

Expand All @@ -20,10 +18,10 @@ require (
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/puddle/v2 v2.2.0 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/klauspost/compress v1.13.6 // indirect
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/robfig/cron/v3 v3.0.1 // indirect
github.com/rogpeppe/go-internal v1.9.0 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/scram v1.1.2 // indirect
Expand Down
6 changes: 2 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ github.com/bsm/ginkgo/v2 v2.5.0 h1:aOAnND1T40wEdAtkGSkvSICWeQ8L3UASX7YVCqQx+eQ=
github.com/bsm/gomega v1.20.0 h1:JhAwLmtRzXFTx2AkALSLa8ijZafntmhSoU63Ok18Uq8=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand All @@ -26,20 +27,17 @@ github.com/klauspost/compress v1.13.6 h1:P76CopJELS0TiO2mebmnzgWaajssP/EszplttgQ
github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe h1:iruDEfMl2E6fbMZ9s0scYfZQ84/6SPL6zC8ACM2oIL0=
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/redis/go-redis/v9 v9.0.2 h1:BA426Zqe/7r56kCcvxYLWe1mkaz71LKF77GwgFzSxfE=
github.com/redis/go-redis/v9 v9.0.2/go.mod h1:/xDTe9EF1LM61hek62Poq2nzQSGj0xSrEtEHbBQevps=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
github.com/samber/lo v1.37.0 h1:XjVcB8g6tgUp8rsPsJ2CvhClfImrpL04YpQHXeHPhRw=
github.com/samber/lo v1.37.0/go.mod h1:9vaz2O4o8oOnK23pd2TrXufcbdbJIa3b6cstBWKpopA=
github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww=
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
Expand Down
File renamed without changes.
68 changes: 51 additions & 17 deletions impl/common/data/pool/redis/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,29 @@ package redipool

import (
"context"
"fmt"
"github.com/redis/go-redis/v9"
"github.com/samber/lo"
"go/types"
"kantoku/common/chutil"
"kantoku/common/codec"
"kantoku/common/data/pool"
"kantoku/common/data/transactional"
"log"
"time"
)

var _ pool.Pool[types.Object] = &Pool[types.Object]{}

// Pool with redis. This implementation does not guarantee FIFO because it never blocks queue.
type Pool[T any] struct {
client redis.UniversalClient
codec codec.Codec[T, []byte]
topicName string
}

const PollPeriod = time.Millisecond * 100

func New[T any](client redis.UniversalClient, codec codec.Codec[T, []byte], topicName string) *Pool[T] {
return &Pool[T]{
client: client,
Expand All @@ -22,50 +33,73 @@ func New[T any](client redis.UniversalClient, codec codec.Codec[T, []byte], topi
}
}

func (pool *Pool[T]) Write(ctx context.Context, item T) error {
data, err := pool.codec.Encode(item)
func (pool *Pool[T]) Write(ctx context.Context, items ...T) error {
var err error
data := make([][]byte, len(items))
for i, item := range items {
data[i], err = pool.codec.Encode(item)
if err != nil {
break
}
}
if err != nil {
return err
return fmt.Errorf("failed to encode items: %w", err)
}

cmd := pool.client.RPush(ctx, pool.topicName, data)
cmd := pool.client.RPush(ctx, pool.topicName,
lo.Map(data, func(item []byte, _ int) interface{} { return item })...)

if cmd.Err() != nil {
return cmd.Err()
}
return nil
}

func (pool *Pool[T]) Read(ctx context.Context) (<-chan T, error) {
channel := make(chan T, 1024)
func (pool *Pool[T]) Read(ctx context.Context) (<-chan transactional.Object[T], error) {
channel := make(chan transactional.Object[T], 0)
chutil.CloseWithContext(ctx, channel)

go func(ctx context.Context, outputs chan<- T) {
go func(ctx context.Context, outputs chan<- transactional.Object[T]) {
loop:
for {
select {
case <-ctx.Done():
break loop
default:
}
result, err := pool.client.BLPop(ctx, 0, pool.topicName).Result()
if err != nil {
log.Println("failed to pop a task from the queue:", err)

result, err := pool.client.LPop(ctx, pool.topicName).Result()

if err == redis.Nil {
// nothing in queue
time.Sleep(PollPeriod)
continue
}

if len(result) != 2 {
log.Println("length of the result is not equal 2")
if err != nil {
log.Println("failed to pop a task from the queue:", err)
continue
}

data := result[1]

output, err := pool.codec.Decode([]byte(data))
data := []byte(result)
output, err := pool.codec.Decode(data)
if err != nil {
log.Println("failed to decode the incoming message:", err)
continue
}
outputs <- output

select {
case <-ctx.Done():
// push results back value if context is closed
if err := pool.client.LPush(context.Background(), pool.topicName, data).Err(); err != nil {
log.Println("failed to push back value which was read after context was closed:", err)
}
break loop
case outputs <- &Transaction[T]{
data: output,
pool: pool,
}:
}

}
}(ctx, channel)

Expand Down
26 changes: 26 additions & 0 deletions impl/common/data/pool/redis/transaction.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package redipool

import (
"context"
"go/types"
"kantoku/common/data/transactional"
)

var _ transactional.Object[types.Object] = &Transaction[types.Object]{}

type Transaction[T any] struct {
data T
pool *Pool[T]
}

func (t *Transaction[T]) Get(ctx context.Context) (T, error) {
return t.data, nil
}

func (t *Transaction[T]) Commit(ctx context.Context) error {
return nil
}

func (t *Transaction[T]) Rollback(ctx context.Context) error {
return t.pool.client.LPush(ctx, t.pool.topicName, t.data).Err()
}
33 changes: 15 additions & 18 deletions impl/deps/postgres/batched/batched.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,10 @@ func (d *Deps) Group(ctx context.Context, group string) (deps.Group, error) {
return result, nil
}

// Make generates a new dependency (but it does not store the information about it in the database
// NewDependency generates a new dependency (but it does not store the information about it in the database
//
// Generation algorithm: UUID
func (d *Deps) Make(ctx context.Context) (deps.Dependency, error) {
func (d *Deps) NewDependency(ctx context.Context) (deps.Dependency, error) {
id := uuid.New().String()

return deps.Dependency{
Expand All @@ -112,38 +112,35 @@ func (d *Deps) Make(ctx context.Context) (deps.Dependency, error) {
}, nil
}

// MakeGroup creates a new dependency group
//
// NOTE: the new group's id is generated via a UUID algorithm
func (d *Deps) MakeGroup(ctx context.Context, ids ...string) (string, error) {
id := uuid.New().String()
status := InitializingStatus
func (d *Deps) NewGroup(_ context.Context) (string, error) {
return uuid.New().String(), nil
}

func (d *Deps) InitGroup(ctx context.Context, groupId string, depIds ...string) error {
status := InitializingStatus
tx, err := d.client.Begin(ctx)
if err != nil {
return "", fmt.Errorf("failed to begin a postgres transaction: %s", tx)
return fmt.Errorf("failed to begin a postgres transaction: %s", tx)
}
defer tx.Rollback(ctx)

sql := `INSERT INTO BatchedGroups (id, pending, status) VALUES ($1, $2, $3)`
_, err = tx.Exec(ctx, sql, id, -1, status)
if err != nil {
return "", err
if _, err := tx.Exec(ctx, sql, groupId, -1, status); err != nil {
return err
}

for _, dep := range ids {
for _, dep := range depIds {
sql := `INSERT INTO BatchedGroupDependencies (dependency_id, group_id) VALUES ($1, $2)`
_, err := tx.Exec(ctx, sql, dep, id)
if err != nil {
return "", err
if _, err := tx.Exec(ctx, sql, dep, groupId); err != nil {
return err
}
}

if err := tx.Commit(ctx); err != nil {
return "", fmt.Errorf("failed to commit the transaction: %s", err)
return fmt.Errorf("failed to commit the transaction: %s", err)
}

return id, nil
return nil
}

func (d *Deps) Resolve(ctx context.Context, dep string) error {
Expand Down
Loading

0 comments on commit d5a78bc

Please sign in to comment.