Skip to content

Commit

Permalink
Merge pull request #15 from ischenkx/KAN-51
Browse files Browse the repository at this point in the history
KAN-51 - instant deps introduced
  • Loading branch information
ischenkx committed Jun 20, 2023
2 parents ab50a15 + ae9a715 commit 1cec900
Show file tree
Hide file tree
Showing 14 changed files with 706 additions and 165 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
testing/tmp
.idea
unused/stand/*/bin/*
unused/stand/*/bin/*
/tmp/
61 changes: 58 additions & 3 deletions common/data/pool/pool.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,71 @@
package pool

import "context"
import (
"context"
"fmt"
"kantoku/common/data/transactional"
)

//type Reader[Item any] interface {
// Read(ctx context.Context) (<-chan Item, error)
//}
//
//type Writer[Item any] interface {
// Write(ctx context.Context, item Item) error
//}
//
//type Pool[Item any] interface {
// Reader[Item]
// Writer[Item]
//}

type Reader[Item any] interface {
Read(ctx context.Context) (<-chan Item, error)
Read(ctx context.Context) (<-chan transactional.Object[Item], error)
//pool.Reader[Transaction[Item]]
}

// Writer probably should have NewTransaction method
type Writer[Item any] interface {
Write(ctx context.Context, item Item) error
// Write *must* write all items in a transaction!
Write(ctx context.Context, items ...Item) error
//pool.Writer[Item]
}

type Pool[Item any] interface {
Reader[Item]
Writer[Item]
}

func ReadAutoCommit[Item any](ctx context.Context, reader Reader[Item],
f func(ctx context.Context, item Item) error) error {
ch, err := reader.Read(ctx)
if err != nil {
return fmt.Errorf("failed to open read channel: %s", err)
}
loop:
for {
select {
case <-ctx.Done():
break loop
case tx := <-ch:
err = func(tx transactional.Object[Item]) error {
item, err := tx.Get(ctx)
defer tx.Rollback(ctx)
if err != nil {
return fmt.Errorf("failed to get item from transaction: %s", err)
}
if err := f(ctx, item); err != nil {
return err
}
if err := tx.Commit(ctx); err != nil {
return fmt.Errorf("failed to commit: %s", err)
}
return nil
}(tx)
if err != nil {
return err
}
}
}
return nil
}
9 changes: 9 additions & 0 deletions common/data/transactional/object.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package transactional

import "context"

type Object[Item any] interface {
Get(ctx context.Context) (Item, error)
Commit(ctx context.Context) error
Rollback(ctx context.Context) error
}
67 changes: 45 additions & 22 deletions impl/common/data/pool/mem/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,46 @@ package mempool

import (
"context"
"go/types"
"kantoku/common/data/pool"
"kantoku/common/data/transactional"
"math/rand"
"sync"
"time"
)

func New[T any]() *Pool[T] {
pool := &Pool[T]{}
go pool.runFlusher()
return pool
func New[T any](config Config) *Pool[T] {
p := &Pool[T]{config: config}
go p.runFlusher()
return p
}

var _ pool.Pool[types.Object] = &Pool[types.Object]{}

type Pool[T any] struct {
buffer []T
readers []chan T
readers []chan transactional.Object[T]
perm []int
closed bool
mu sync.RWMutex
config Config
}

type Config struct {
BufferSize int
FlushPeriod time.Duration
}

var DefaultConfig = Config{
BufferSize: 0,
FlushPeriod: time.Second,
}

func (p *Pool[T]) Read(ctx context.Context) (<-chan T, error) {
func (p *Pool[T]) Read(ctx context.Context) (<-chan transactional.Object[T], error) {
p.mu.Lock()
defer p.mu.Unlock()

channel := make(chan T, 128)
channel := make(chan transactional.Object[T], p.config.BufferSize)

go func() {
<-ctx.Done()
Expand All @@ -46,14 +62,12 @@ func (p *Pool[T]) Read(ctx context.Context) (<-chan T, error) {
return channel, nil
}

func (p *Pool[T]) Write(ctx context.Context, item T) error {
func (p *Pool[T]) Write(_ context.Context, items ...T) error {
p.mu.Lock()
defer p.mu.Unlock()
p.buffer = append(p.buffer, items...)

if !p.write(item) {
p.buffer = append(p.buffer, item)
}

p.lockedFlush() // doing that to guarantee no delay if someone is ready to read
return nil
}

Expand All @@ -70,7 +84,7 @@ func (p *Pool[T]) Close() {
}

func (p *Pool[T]) runFlusher() {
ticker := time.NewTicker(time.Second * 1)
ticker := time.NewTicker(p.config.FlushPeriod)
defer ticker.Stop()

for range ticker.C {
Expand All @@ -80,30 +94,39 @@ func (p *Pool[T]) runFlusher() {
}
}

func (p *Pool[T]) flush() bool {
p.mu.Lock()
defer p.mu.Unlock()

func (p *Pool[T]) lockedFlush() bool {
if p.closed {
return false
}

for len(p.buffer) != 0 {
index := 0
if !p.write(p.buffer[index]) {
if !p.write() {
break
}
p.buffer = p.buffer[1:]
}

return true
}

func (p *Pool[T]) write(item T) bool {
func (p *Pool[T]) flush() bool {
p.mu.Lock()
defer p.mu.Unlock()

return p.lockedFlush()
}

func (p *Pool[T]) write() bool {
for _, index := range p.permute() {
reader := p.readers[index]
statusChan := make(chan bool)
select {
case reader <- item:
case reader <- &Transaction[T]{data: p.buffer[0], success: statusChan}:
// might want to add context with deadline
success := <-statusChan
if success {
p.buffer = p.buffer[1:]
}

return true
default:
}
Expand Down
28 changes: 28 additions & 0 deletions impl/common/data/pool/mem/transactional.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package mempool

import (
"context"
"go/types"
"kantoku/common/data/transactional"
)

var _ transactional.Object[types.Object] = &Transaction[types.Object]{}

type Transaction[T any] struct {
data T
success chan bool
}

func (t *Transaction[T]) Get(ctx context.Context) (T, error) {
return t.data, nil
}

func (t *Transaction[T]) Commit(ctx context.Context) error {
t.success <- true
return nil
}

func (t *Transaction[T]) Rollback(ctx context.Context) error {
t.success <- false
return nil
}
Loading

0 comments on commit 1cec900

Please sign in to comment.