Skip to content

Commit

Permalink
plan select: build aggregate
Browse files Browse the repository at this point in the history
  • Loading branch information
Yaiba committed Mar 19, 2024
1 parent 22b5a25 commit 9d41a51
Show file tree
Hide file tree
Showing 21 changed files with 1,978 additions and 340 deletions.
34 changes: 18 additions & 16 deletions internal/engine/cost/datasource/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"os"
"strconv"
"strings"

"github.com/kwilteam/kwil-db/internal/engine/cost/datatypes"
)

// ColumnValue
Expand Down Expand Up @@ -57,7 +59,7 @@ func newRowPipeline(rows []Row) RowPipeline {
}

type Result struct {
schema *Schema
schema *datatypes.Schema
stream RowPipeline
}

Expand Down Expand Up @@ -88,16 +90,16 @@ func (r *Result) ToCsv() string {
return sb.String()
}

func ResultFromStream(s *Schema, rows RowPipeline) *Result {
func ResultFromStream(s *datatypes.Schema, rows RowPipeline) *Result {
return &Result{schema: s, stream: rows}
}

func ResultFromRaw(s *Schema, rows []Row) *Result {
func ResultFromRaw(s *datatypes.Schema, rows []Row) *Result {
// TODO: use RowPipeline all the way
return &Result{schema: s, stream: newRowPipeline(rows)}
}

func (r *Result) Schema() *Schema {
func (r *Result) Schema() *datatypes.Schema {
return r.schema
}

Expand All @@ -111,7 +113,7 @@ type SourceType string
// DataSource represents a data source.
type DataSource interface {
// Schema returns the schema for the underlying data source
Schema() *Schema
Schema() *datatypes.Schema

// SourceType returns the type of the data source.
SourceType() SourceType
Expand All @@ -129,7 +131,7 @@ type DataSource interface {

// dsScan read the data source, return selected columns.
// TODO: use channel to return the result, e.g. iterator model.
func dsScan(dsSchema *Schema, dsRecords []Row, projection []string) *Result {
func dsScan(dsSchema *datatypes.Schema, dsRecords []Row, projection []string) *Result {
if len(projection) == 0 {
return ResultFromRaw(dsSchema, dsRecords)
}
Expand All @@ -148,14 +150,14 @@ func dsScan(dsSchema *Schema, dsRecords []Row, projection []string) *Result {
// }
//}

fieldIndex := dsSchema.mapProjection(projection)
fieldIndex := dsSchema.MapProjection(projection)

newFields := make([]Field, len(projection))
newFields := make([]datatypes.Field, len(projection))
for i, idx := range fieldIndex {
newFields[i] = dsSchema.Fields[idx]
}

newSchema := NewSchema(newFields...)
newSchema := datatypes.NewSchema(newFields...)

out := make(RowPipeline)
go func() {
Expand All @@ -175,15 +177,15 @@ func dsScan(dsSchema *Schema, dsRecords []Row, projection []string) *Result {

// memDataSource is a data source that reads data from memory.
type memDataSource struct {
schema *Schema
schema *datatypes.Schema
records []Row
}

func NewMemDataSource(s *Schema, data []Row) *memDataSource {
func NewMemDataSource(s *datatypes.Schema, data []Row) *memDataSource {
return &memDataSource{schema: s, records: data}
}

func (ds *memDataSource) Schema() *Schema {
func (ds *memDataSource) Schema() *datatypes.Schema {
return ds.schema
}

Expand All @@ -199,11 +201,11 @@ func (ds *memDataSource) SourceType() SourceType {
type csvDataSource struct {
path string
records []Row
schema *Schema
schema *datatypes.Schema
}

func NewCSVDataSource(path string) (*csvDataSource, error) {
ds := &csvDataSource{path: path, schema: &Schema{}}
ds := &csvDataSource{path: path, schema: &datatypes.Schema{}}
if err := ds.load(); err != nil {
return nil, err
}
Expand Down Expand Up @@ -270,13 +272,13 @@ func (ds *csvDataSource) load() error {

for i, name := range header {
ds.schema.Fields = append(ds.schema.Fields,
Field{Name: name, Type: columnTypes[i]})
datatypes.Field{Name: name, Type: columnTypes[i]})
}

return nil
}

func (ds *csvDataSource) Schema() *Schema {
func (ds *csvDataSource) Schema() *datatypes.Schema {
return ds.schema
}

Expand Down
29 changes: 15 additions & 14 deletions internal/engine/cost/datasource/datasource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,31 @@ package datasource

import (
"fmt"
"github.com/kwilteam/kwil-db/internal/engine/cost/datatypes"
"testing"

"github.com/stretchr/testify/assert"
)

// testSchemaUsers is the same as first line of ../../testdata/users.csv
var testSchemaUsers = NewSchema(
Field{
var testSchemaUsers = datatypes.NewSchema(
datatypes.Field{
Name: "id",
Type: "int",
},
Field{
datatypes.Field{
Name: "username",
Type: "string",
},
Field{
datatypes.Field{
Name: "age",
Type: "int",
},
Field{
datatypes.Field{
Name: "state",
Type: "string",
},
Field{
datatypes.Field{
Name: "wallet",
Type: "string",
},
Expand Down Expand Up @@ -70,7 +71,7 @@ var testDataUsers = []Row{
},
}

func checkRecords(t *testing.T, result *Result, expectedSchema *Schema, expectedData []Row) {
func checkRecords(t *testing.T, result *Result, expectedSchema *datatypes.Schema, expectedData []Row) {
t.Helper()

s := result.Schema()
Expand Down Expand Up @@ -113,12 +114,12 @@ func TestMemDataSource_scanWithProjection(t *testing.T) {
ds := NewMemDataSource(testSchemaUsers, testDataUsers)

// Test filtered result
expectedSchema := NewSchema(
Field{
expectedSchema := datatypes.NewSchema(
datatypes.Field{
Name: "username",
Type: "string",
},
Field{
datatypes.Field{
Name: "age",
Type: "int",
})
Expand Down Expand Up @@ -191,16 +192,16 @@ func TestCSVDataSource_scanWithProjection(t *testing.T) {
assert.NoError(t, err)

// Test filtered result
expectedSchema := NewSchema(
Field{
expectedSchema := datatypes.NewSchema(
datatypes.Field{
Name: "id",
Type: "int",
},
Field{
datatypes.Field{
Name: "username",
Type: "string",
},
Field{
datatypes.Field{
Name: "state",
Type: "string",
},
Expand Down
61 changes: 0 additions & 61 deletions internal/engine/cost/datasource/schema.go

This file was deleted.

14 changes: 14 additions & 0 deletions internal/engine/cost/datatypes/column.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package datatypes

type ColumnDef struct {
Relation *TableRef
Name string
}

func ColumnUnqualified(name string) *ColumnDef {
return &ColumnDef{Name: name}
}

func Column(table *TableRef, name string) *ColumnDef {
return &ColumnDef{Relation: table, Name: name}
}
Loading

0 comments on commit 9d41a51

Please sign in to comment.