Skip to content

Commit

Permalink
KAN-53 - future deps (#16)
Browse files Browse the repository at this point in the history
* partial changes for review

* KAN-53 - some refactoring

* KAN-54 - rider job manager

* KAN-53 - bug fix

* KAN-53 - some changes
  • Loading branch information
ischenkx committed Jun 21, 2023
1 parent 1cec900 commit 026f374
Show file tree
Hide file tree
Showing 119 changed files with 1,954 additions and 1,003 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
testing/tmp
.idea
unused/stand/*/bin/*
backend/stand/*/bin/*
/tmp/
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Kantoku

A platform for distributed task execution

# Kantoku Framework Components
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package evexec

import (
"kantoku/platform"
"kantoku/unused/backend/executor/common"
"kantoku/backend/executor/common"
"kantoku/kernel/platform"
)

// Builder is a cosmetic structure to create executor with named args
Expand Down
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package evexec

import (
"context"
"kantoku/platform"
"kantoku/unused/backend/executor/common"
"kantoku/backend/executor/common"
"kantoku/kernel/platform"
"log"
)

Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
echo Hello World
echo Building Test Stand Binaries...
set GOOS=linux
go build -o "client/bin/entry" client/main.go
go build -o "executor/bin/entry" executor/main.go
go build -o "depot/bin/entry" depot/main.go
go build -o "deps/bin/entry" deps/main.go
go build -o "taskdep/bin/entry" taskdep/main.go
go build -o "futdep/bin/entry" futdep/main.go
5 changes: 5 additions & 0 deletions backend/stand/client/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
FROM golang:1.20

COPY bin/entry entry

CMD ./entry
104 changes: 104 additions & 0 deletions backend/stand/client/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package main

import (
"context"
"fmt"
"kantoku"
"kantoku/backend/stand/common"
"log"
"math/rand"
"strconv"
"time"
)

func fact(x int) int {
if x <= 1 {
return 1
}
return x * fact(x-1)
}

func main() {
rand.Seed(time.Now().UnixNano())
kan := common.MakeKantoku()

x, err := kan.Futures().Make(context.Background(), "resource", nil)
if err != nil {
log.Fatal("failed to make a future:", err)
}
y, err := kan.Futures().Make(context.Background(), "resource", nil)
if err != nil {
log.Fatal("failed to make a future:", err)
}
z, err := kan.Futures().Make(context.Background(), "resource", nil)
if err != nil {
log.Fatal("failed to make a future:", err)
}
p, err := kan.Futures().Make(context.Background(), "resource", nil)
if err != nil {
log.Fatal("failed to make a future:", err)
}
q, err := kan.Futures().Make(context.Background(), "resource", nil)
if err != nil {
log.Fatal("failed to make a future:", err)
}
r, err := kan.Futures().Make(context.Background(), "resource", nil)
if err != nil {
log.Fatal("failed to make a future:", err)
}

mulTask, err := kan.Spawn(context.Background(),
kantoku.Describe("mul").WithInputs(y.ID, z.ID).WithOutputs(p.ID),
)
if err != nil {
log.Fatal("failed to spawn a task:", err)
}

factorialTask, err := kan.Spawn(context.Background(),
kantoku.Describe("factorial").WithInputs(x.ID).WithOutputs(y.ID),
)
if err != nil {
log.Fatal("failed to spawn a task:", err)
}

mulTask1, err := kan.Spawn(context.Background(),
kantoku.Describe("mul").WithInputs(p.ID, q.ID).WithOutputs(r.ID),
)
if err != nil {
log.Fatal("failed to spawn a task:", err)
}

fmt.Println("Factorial task:", factorialTask.Task)
fmt.Println("Mul task:", mulTask.Task)
fmt.Println("Mul task 1:", mulTask1.Task)
fmt.Println("Resolving inputs")

xVal := rand.Intn(20)
zVal := rand.Intn(40)
qVal := rand.Intn(40)

fmt.Println("X:", xVal)
fmt.Println("Z:", zVal)
fmt.Println("EXPECTED:", fact(xVal)*zVal*qVal)

if err := kan.Futures().Resolve(context.Background(), x.ID, []byte(strconv.Itoa(xVal))); err != nil {
log.Fatal("failed to resolve x:", err)
}
if err := kan.Futures().Resolve(context.Background(), z.ID, []byte(strconv.Itoa(zVal))); err != nil {
log.Fatal("failed to resolve z:", err)
}
if err := kan.Futures().Resolve(context.Background(), q.ID, []byte(strconv.Itoa(zVal))); err != nil {
log.Fatal("failed to resolve q:", err)
}
for {
time.Sleep(time.Second * 2)
fmt.Println("Fetching results...")
resolution, err := kan.Futures().Load(context.Background(), p.ID)
if err != nil {
fmt.Println("failed to load resolution:", err)
continue
}
fmt.Println("Resolution:", string(resolution.Resource))
break
}
}
167 changes: 167 additions & 0 deletions backend/stand/common/platform.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
package common

import (
"context"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/redis/go-redis/v9"
"kantoku"
"kantoku/common/data/bimap"
"kantoku/common/data/kv"
"kantoku/common/data/pool"
"kantoku/framework/future"
"kantoku/framework/plugins/depot"
"kantoku/impl/common/codec/jsoncodec"
"kantoku/impl/common/codec/strcodec"
rebimap "kantoku/impl/common/data/bimap/redis"
redikv "kantoku/impl/common/data/kv/redis"
redipool "kantoku/impl/common/data/pool/redis"
redivent "kantoku/impl/platform/event/redis"
"kantoku/impl/plugins/deps/postgres"
redismeta "kantoku/impl/plugins/meta/redis"
"kantoku/kernel"
"kantoku/kernel/platform"
"log"
)

type futureRunner struct {
queue pool.Writer[future.ID]
}

func (f futureRunner) Run(ctx context.Context, resolution future.Resolution) {
if err := f.queue.Write(ctx, resolution.Future.ID); err != nil {
log.Println("failed to put a resolution in the queue:", err)
}
}

var redisClient redis.UniversalClient = nil

func MakeRedisClient() redis.UniversalClient {
if redisClient != nil {
return redisClient
}
client := redis.NewClient(&redis.Options{
Addr: "redis:6379", // Redis server address
Password: "", // Redis server password (leave empty if not set)
DB: 0, // Redis database index
})

if cmd := client.Ping(context.Background()); cmd.Err() != nil {
panic("failed to ping the redis client: " + cmd.Err().Error())
}

redisClient = client

return client
}

func MakePostgresClient(ctx context.Context) *pgxpool.Pool {
client, err := pgxpool.New(ctx, "postgres://postgres:51413@postgres:5432/")

if err != nil {
panic("failed to create postgres deps: " + err.Error())
}

if err := client.Ping(ctx); err != nil {
panic("failed to make ping postgres: " + err.Error())
}

return client
}

func MakeDeps() *postgredeps.Deps {
pg := MakePostgresClient(context.Background())
deps := postgredeps.New(
pg,
redipool.New[string](MakeRedisClient(), strcodec.Codec{}, "depot_groups"),
)

if err := deps.InitTables(context.Background()); err != nil {
log.Println("failed to init tables:", err)
}

return deps
}

func MakeDepotBimap() bimap.Bimap[string, string] {
return rebimap.NewBimap[string, string](
"keys___",
"values___",
strcodec.Codec{},
strcodec.Codec{},
MakeRedisClient(),
)
}

func MakeInputs() *depot.Depot {
return depot.New(
MakeDeps(),
MakeDepotBimap(),
redipool.New[string](MakeRedisClient(), strcodec.Codec{}, "TEST_STAND_INPUTS"),
)
}

func MakeOutputs() platform.Outputs {
return redikv.New[platform.Result](MakeRedisClient(), jsoncodec.New[platform.Result](), "TEST_STAND_OUTPUTS")
}

func MakeBroker() platform.Broker {
return redivent.New(jsoncodec.New[platform.Event](), MakeRedisClient())
}

func MakeDB() platform.DB[kernel.Task] {
return redikv.New[kernel.Task](MakeRedisClient(), jsoncodec.New[kernel.Task](), "TEST_STAND_TASKS_DB")
}

func MakePlatform() platform.Platform[kernel.Task] {
return platform.New[kernel.Task](
MakeDB(),
MakeInputs(),
MakeOutputs(),
MakeBroker(),
)
}

func MakeFutureResolutionQueue() pool.Pool[future.ID] {
return redipool.New[future.ID](MakeRedisClient(), strcodec.Codec{}, "TEST_STAND_FUTURE_RESOLUTIONS_QUEUE")
}

func MakeFuturesManager() *future.Manager {
return future.NewManager(
redikv.New[future.Future](MakeRedisClient(), jsoncodec.New[future.Future](), "TEST_STAND_FUTURES"),
redikv.New[future.Resource](MakeRedisClient(), jsoncodec.New[future.Resource](), "TEST_STAND_FUTURE_RESOLUTIONS"),
futureRunner{queue: MakeFutureResolutionQueue()},
)
}

func MakeTaskDepDB() kv.Database[string, string] {
return redikv.New[string](
MakeRedisClient(),
strcodec.Codec{},
"TEST_STAND_TASK_DEPS",
)
}

func MakeFutDepDB() kv.Database[string, string] {
return redikv.New[string](
MakeRedisClient(),
strcodec.Codec{},
"TEST_STAND_FUT_DEPS",
)
}

func MakeKantoku() *kantoku.Kantoku {
return kantoku.NewBuilder().
ConfigureParametrizationCodec(jsoncodec.New[kantoku.Parametrization]()).
ConfigureSettings(
kantoku.Settings{AutoInputDependencies: true},
).
ConfigurePlatform(MakePlatform()).
//ConfigureContexts().
ConfigureFutures(MakeFuturesManager()).
ConfigureTaskdep(MakeTaskDepDB()).
ConfigureFutdep(MakeFutDepDB()).
ConfigureDepot(MakeDepotBimap()).
ConfigureDeps(MakeDeps()).
ConfigureMeta(redismeta.NewDB("META", MakeRedisClient()), jsoncodec.Dynamic{}).
Build()
}
5 changes: 5 additions & 0 deletions backend/stand/depot/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
FROM ubuntu:22.04

COPY bin/entry ./entry

CMD ./entry
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package main

import (
"context"
"kantoku/unused/backend/stand/common"
"kantoku/backend/stand/common"
"log"
)

Expand Down
5 changes: 5 additions & 0 deletions backend/stand/deps/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
FROM ubuntu:22.04

COPY bin/entry entry

CMD ./entry
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package main

import (
"context"
"kantoku/unused/backend/stand/common"
"kantoku/backend/stand/common"
)

func main() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,22 @@ services:
- postgres
- redis
- depot
taskdep:
build:
context: taskdep
dockerfile: Dockerfile
depends_on:
- postgres
- redis
- deps
futdep:
build:
context: futdep
dockerfile: Dockerfile
depends_on:
- postgres
- redis
- deps

depot:
build:
Expand Down
5 changes: 5 additions & 0 deletions backend/stand/executor/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
FROM ubuntu:22.04

COPY bin/entry entry

CMD ./entry
Loading

0 comments on commit 026f374

Please sign in to comment.