Skip to content

Commit

Permalink
steaming api tmp
Browse files Browse the repository at this point in the history
  • Loading branch information
Yaiba committed Mar 27, 2024
1 parent 7e74326 commit 4367a13
Show file tree
Hide file tree
Showing 8 changed files with 244 additions and 54 deletions.
9 changes: 5 additions & 4 deletions internal/engine/cost/datasource/csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (ds *CsvDataSource) load() error {
}

columnTypes := make([]string, len(header))
columnTypesInfered := false
columnTypesInferred := false

for {
columns, err := r.Read()
Expand All @@ -54,7 +54,7 @@ func (ds *CsvDataSource) load() error {
newRow := make(Row, len(header))
for i, col := range columns {
colType, colValue := colTypeCast(col)
if columnTypesInfered {
if columnTypesInferred {
// check if the column type is consistent
if columnTypes[i] != colType {
return fmt.Errorf("inconsistent column type at column %d, got %s, want %s",
Expand All @@ -69,15 +69,16 @@ func (ds *CsvDataSource) load() error {

ds.records = append(ds.records, newRow)

columnTypesInfered = true
columnTypesInferred = true
}

for i, name := range header {
ds.schema.Fields = append(ds.schema.Fields,
datatypes.Field{Name: name, Type: columnTypes[i]})
}

slices.Clip(ds.schema.Fields)
ds.records = slices.Clip(ds.records)
ds.schema.Fields = slices.Clip(ds.schema.Fields)

return nil
}
Expand Down
167 changes: 167 additions & 0 deletions internal/engine/cost/datasource/data.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package datasource

import (
"context"
"fmt"
"strings"

Expand Down Expand Up @@ -102,3 +103,169 @@ func (r *Result) ToCsv() string {
}
return sb.String()
}

// tap converts an array to a channel(stream of data)
func tap[T any](ctx context.Context, in []T) <-chan T {
out := make(chan T)
go func() {
defer close(out)
for _, element := range in {
select {
case <-ctx.Done():
return
case out <- element:
}
}
}()
return out
}

// filter applies a function to each element in the input channel
func filter[T any](ctx context.Context, in <-chan T, fn func(T) bool) <-chan T {
out := make(chan T)
go func() {
defer close(out)
for i := range in {
if !fn(i) {
continue
}

select {
case <-ctx.Done():
return
case out <- i:
}
}
}()
return out
}

// smap applies a function to each element in the input channel
func smap[T any](ctx context.Context, in <-chan T, fn func(T) T) <-chan T {
out := make(chan T)
go func() {
defer close(out)
for i := range in {
select {
case <-ctx.Done():
return
case out <- fn(i):
}
}
}()
return out
}

// transform applies a function to each element in the input channel
func transform[I any, O any](ctx context.Context, in <-chan I, fn func(I) O) <-chan O {
out := make(chan O)
go func() {
defer close(out)
for i := range in {
select {
case <-ctx.Done():
return
case out <- fn(i):
}
}
}()
return out
}

// collect collects all the elements in the input channel
func collect[T any](ctx context.Context, in <-chan T) []T {
out := make([]T, 0)
for element := range in {
select {
case <-ctx.Done():
return out
default:
out = append(out, element)
}
}
return out
}

// SteamingAPI defines a streaming API.
type SteamingAPI[T any] interface {
Transform(ctx context.Context, fn func(T) T) SteamingAPI[T]
Filter(ctx context.Context, fn func(T) bool) SteamingAPI[T]
Collect(ctx context.Context) []T
}

// SAI is a StreamingApi Implementation.
type SAI[T any] struct {
dataIn <-chan T
}

func Tap[T any](ctx context.Context, in []T) SteamingAPI[T] {
return &SAI[T]{dataIn: tap(ctx, in)}
}

func (s SAI[T]) Transform(ctx context.Context, fn func(T) T) SteamingAPI[T] {
return &SAI[T]{dataIn: transform(ctx, s.dataIn, fn)}
}

func (s SAI[T]) Filter(ctx context.Context, fn func(T) bool) SteamingAPI[T] {
return &SAI[T]{dataIn: filter(ctx, s.dataIn, fn)}
}

func (s SAI[T]) Collect(ctx context.Context) []T {
var out []T
for {
select {
case <-ctx.Done():
return out
case v, ok := <-s.dataIn:
if !ok {
return out
}
out = append(out, v)
}
}
}

//// SteamingAPI defines a streaming API.
//type SteamingAPI[I any, O any] interface {
// //Transform(ctx context.Context, fn func(I) O) SteamingAPI[I, O]
// //Map(ctx context.Context, fn func(I) I) SteamingAPI[I, O]
// //Filter(ctx context.Context, fn func(I) bool) SteamingAPI[I, O]
// Collect(ctx context.Context) []I
//}
//
//// SAI is a StreamingApi Implementation.
//type SAI[I any, O any] struct {
// dataIn <-chan I
//}
//
//func Tap[I any, O any](ctx context.Context, in []I) SteamingAPI[I, O] {
// return &SAI[I, O]{dataIn: tap(ctx, in)}
//}
//
////func (s *SAI[I any, O any]) Transform(ctx context.Context, fn func(I) O) SteamingAPI[I, O] {
//// return &SAI[I,O]{dataIn: transform(ctx, s.dataIn, fn)}
////}
////
////
////func (s *SAI[I any, O any]) Map(ctx context.Context, fn func(I) I) SteamingAPI[I, O] {
//// return &SAI[I,O]{dataIn: smap(ctx, s.dataIn, fn)}
////}
//
////func (s *SAI[I any, O any]) Filter(ctx context.Context, fn func(I) bool) SteamingAPI[I, O] {
//// return &SAI[I, O]{dataIn: filter(ctx, s.dataIn, fn)}
////}
//
//func (s SAI[I any, O any]) Collect(ctx context.Context) []I {
// var out []I
// for {
// select {
// case <-ctx.Done():
// return out
// case v, ok := <-s.dataIn:
// if !ok {
// return out
// }
// out = append(out, v)
// }
// }
//}
19 changes: 19 additions & 0 deletions internal/engine/cost/datasource/data_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package datasource

import (
"context"
"fmt"
"testing"
)

func TestStreamingAPI(t *testing.T) {
data := []int{0, 1, 2, 3, 4, 5, 6, 8}
ctx := context.TODO()

newData := Tap(ctx, data).
Filter(ctx, func(x int) bool { return x%2 == 0 }).
Transform(ctx, func(x int) int { return x * 2 }).
Collect(ctx)

fmt.Println(newData)
}
48 changes: 48 additions & 0 deletions internal/engine/cost/logical_plan/dataframe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package logical_plan

import "github.com/kwilteam/kwil-db/internal/engine/cost/datatypes"

type DataFrameAPI interface {
// Project applies a projection
Project(expr ...LogicalExpr) DataFrameAPI

// Filter applies a filter
Filter(expr LogicalExpr) DataFrameAPI

// Aggregate applies an aggregation
Aggregate(groupBy []LogicalExpr, aggregateExpr []LogicalExpr) DataFrameAPI

// Schema returns the schema of the data that will be produced by this DataFrameAPI.
Schema() *datatypes.Schema

// LogicalPlan returns the logical plan
LogicalPlan() LogicalPlan
}

type DataFrame struct {
plan LogicalPlan
}

func (df *DataFrame) Project(exprs ...LogicalExpr) DataFrameAPI {
return &DataFrame{Projection(df.plan, exprs...)}
}

func (df *DataFrame) Filter(expr LogicalExpr) DataFrameAPI {
return &DataFrame{Filter(df.plan, expr)}
}

func (df *DataFrame) Aggregate(groupBy []LogicalExpr, aggregateExpr []LogicalExpr) DataFrameAPI {
return &DataFrame{Aggregate(df.plan, groupBy, aggregateExpr)}
}

func (df *DataFrame) Schema() *datatypes.Schema {
return df.plan.Schema()
}

func (df *DataFrame) LogicalPlan() LogicalPlan {
return df.plan
}

func NewDataFrame(plan LogicalPlan) *DataFrame {
return &DataFrame{plan: plan}
}
2 changes: 1 addition & 1 deletion internal/engine/cost/logical_plan/expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ type LiteralTextExpr struct {
}

func (e *LiteralTextExpr) String() string {
return e.Value
return fmt.Sprintf("'%s'", e.Value)
}

func (e *LiteralTextExpr) Resolve(*dt.Schema) dt.Field {
Expand Down
45 changes: 0 additions & 45 deletions internal/engine/cost/logical_plan/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,51 +21,6 @@ type LogicalPlan interface {
//Accept(visitor LogicalOperatorVisitor) any
}

type DataFrameAPI interface {
// Project applies a projection
Project(expr ...LogicalExpr) DataFrameAPI

// Filter applies a filter
Filter(expr LogicalExpr) DataFrameAPI

// Aggregate applies an aggregation
Aggregate(groupBy []LogicalExpr, aggregateExpr []LogicalExpr) DataFrameAPI

// Schema returns the schema of the data that will be produced by this DataFrameAPI.
Schema() *datatypes.Schema

// LogicalPlan returns the logical plan
LogicalPlan() LogicalPlan
}

type DataFrame struct {
plan LogicalPlan
}

func (df *DataFrame) Project(exprs ...LogicalExpr) DataFrameAPI {
return &DataFrame{Projection(df.plan, exprs...)}
}

func (df *DataFrame) Filter(expr LogicalExpr) DataFrameAPI {
return &DataFrame{Filter(df.plan, expr)}
}

func (df *DataFrame) Aggregate(groupBy []LogicalExpr, aggregateExpr []LogicalExpr) DataFrameAPI {
return &DataFrame{Aggregate(df.plan, groupBy, aggregateExpr)}
}

func (df *DataFrame) Schema() *datatypes.Schema {
return df.plan.Schema()
}

func (df *DataFrame) LogicalPlan() LogicalPlan {
return df.plan
}

func NewDataFrame(plan LogicalPlan) *DataFrame {
return &DataFrame{plan: plan}
}

func Format(plan LogicalPlan, indent int) string {
var msg bytes.Buffer
for i := 0; i < indent; i++ {
Expand Down
6 changes: 3 additions & 3 deletions internal/engine/cost/virtual_plan/expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@ func (e *VLiteralStringExpr) evaluate(_ datasource.Row) datasource.ColumnValue {
return datasource.NewLiteralColumnValue(e.value)
}

type VLiteralNumericxpr struct {
type VLiteralNumericExpr struct {
value int64
}

func (e *VLiteralNumericxpr) Resolve(_ VirtualPlan) string {
func (e *VLiteralNumericExpr) Resolve(_ VirtualPlan) string {
return fmt.Sprintf("%d", e.value)
}

func (e *VLiteralNumericxpr) evaluate(_ datasource.Row) datasource.ColumnValue {
func (e *VLiteralNumericExpr) evaluate(_ datasource.Row) datasource.ColumnValue {
return datasource.NewLiteralColumnValue(e.value)
}

Expand Down
2 changes: 1 addition & 1 deletion internal/engine/cost/virtual_plan/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (q *defaultVirtualPlanner) ToExpr(expr logical_plan.LogicalExpr,
input logical_plan.LogicalPlan) VirtualExpr {
switch e := expr.(type) {
case *logical_plan.LiteralNumericExpr:
return &VLiteralNumericxpr{e.Value}
return &VLiteralNumericExpr{e.Value}
case *logical_plan.LiteralTextExpr:
return &VLiteralStringExpr{e.Value}
case *logical_plan.AliasExpr:
Expand Down

0 comments on commit 4367a13

Please sign in to comment.