diff --git a/extensions/tn_local/create_stream_test.go b/extensions/tn_local/create_stream_test.go new file mode 100644 index 00000000..58336407 --- /dev/null +++ b/extensions/tn_local/create_stream_test.go @@ -0,0 +1,194 @@ +package tn_local + +import ( + "context" + "fmt" + "testing" + + "github.com/jackc/pgx/v5/pgconn" + "github.com/stretchr/testify/require" + jsonrpc "github.com/trufnetwork/kwil-db/core/rpc/json" + kwilsql "github.com/trufnetwork/kwil-db/node/types/sql" + "github.com/trufnetwork/node/tests/utils" +) + +func TestCreateStream_NilRequest(t *testing.T) { + ext := newTestExtension(&utils.MockDB{}) + + resp, rpcErr := ext.CreateStream(context.Background(), nil) + require.Nil(t, resp) + require.NotNil(t, rpcErr) + require.Equal(t, jsonrpc.ErrorCode(jsonrpc.ErrorInvalidParams), rpcErr.Code) + require.Contains(t, rpcErr.Message, "missing request") +} + +func TestCreateStream_Success(t *testing.T) { + var capturedStmt string + var capturedArgs []any + mockDB := &utils.MockDB{ + ExecuteFn: func(ctx context.Context, stmt string, args ...any) (*kwilsql.ResultSet, error) { + capturedStmt = stmt + capturedArgs = args + return &kwilsql.ResultSet{}, nil + }, + } + ext := newTestExtension(mockDB) + + resp, rpcErr := ext.CreateStream(context.Background(), &CreateStreamRequest{ + DataProvider: "0xEC36224A679218Ae28FCeCe8d3c68595B87Dd832", + StreamID: "st00000000000000000000000000test", + StreamType: "primitive", + }) + + require.Nil(t, rpcErr, "expected no error") + require.NotNil(t, resp) + require.Contains(t, capturedStmt, "INSERT INTO "+SchemaName+".streams") + require.Len(t, capturedArgs, 4, "INSERT should have 4 parameters") + // data_provider should be lowercased (matching consensus behavior) + require.Equal(t, "0xec36224a679218ae28fcece8d3c68595b87dd832", capturedArgs[0]) + require.Equal(t, "st00000000000000000000000000test", capturedArgs[1]) + require.Equal(t, "primitive", capturedArgs[2]) + // created_at should be a non-zero unix timestamp + createdAt, ok := capturedArgs[3].(int64) + require.True(t, ok, "created_at should be int64") + require.NotZero(t, createdAt, "created_at should be non-zero") +} + +func TestCreateStream_ComposedType(t *testing.T) { + mockDB := &utils.MockDB{ + ExecuteFn: func(ctx context.Context, stmt string, args ...any) (*kwilsql.ResultSet, error) { + return &kwilsql.ResultSet{}, nil + }, + } + ext := newTestExtension(mockDB) + + resp, rpcErr := ext.CreateStream(context.Background(), &CreateStreamRequest{ + DataProvider: "0xEC36224A679218Ae28FCeCe8d3c68595B87Dd832", + StreamID: "st00000000000000000000000000test", + StreamType: "composed", + }) + + require.Nil(t, rpcErr) + require.NotNil(t, resp) +} + +func TestCreateStream_InvalidStreamID(t *testing.T) { + ext := newTestExtension(&utils.MockDB{}) + + tests := []struct { + name string + streamID string + wantMsg string + }{ + {"too short", "st00", "must be exactly 32 characters"}, + {"too long", "st000000000000000000000000000test1", "must be exactly 32 characters"}, + {"wrong prefix", "xx00000000000000000000000000test", "must start with 'st'"}, + {"empty", "", "must be exactly 32 characters"}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + _, rpcErr := ext.CreateStream(context.Background(), &CreateStreamRequest{ + DataProvider: "0xEC36224A679218Ae28FCeCe8d3c68595B87Dd832", + StreamID: tt.streamID, + StreamType: "primitive", + }) + require.NotNil(t, rpcErr) + require.Equal(t, jsonrpc.ErrorCode(jsonrpc.ErrorInvalidParams), rpcErr.Code) + require.Contains(t, rpcErr.Message, tt.wantMsg) + }) + } +} + +func TestCreateStream_InvalidStreamType(t *testing.T) { + ext := newTestExtension(&utils.MockDB{}) + + _, rpcErr := ext.CreateStream(context.Background(), &CreateStreamRequest{ + DataProvider: "0xEC36224A679218Ae28FCeCe8d3c68595B87Dd832", + StreamID: "st00000000000000000000000000test", + StreamType: "invalid", + }) + require.NotNil(t, rpcErr) + require.Equal(t, jsonrpc.ErrorCode(jsonrpc.ErrorInvalidParams), rpcErr.Code) + require.Contains(t, rpcErr.Message, "must be 'primitive' or 'composed'") +} + +func TestCreateStream_InvalidDataProvider(t *testing.T) { + ext := newTestExtension(&utils.MockDB{}) + + tests := []struct { + name string + dataProvider string + }{ + {"no 0x prefix", "EC36224A679218Ae28FCeCe8d3c68595B87Dd832"}, + {"too short", "0xEC36224A679218Ae28"}, + {"too long", "0xEC36224A679218Ae28FCeCe8d3c68595B87Dd832FF"}, + {"invalid chars", "0xGG36224A679218Ae28FCeCe8d3c68595B87Dd832"}, + {"empty", ""}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + _, rpcErr := ext.CreateStream(context.Background(), &CreateStreamRequest{ + DataProvider: tt.dataProvider, + StreamID: "st00000000000000000000000000test", + StreamType: "primitive", + }) + require.NotNil(t, rpcErr) + require.Equal(t, jsonrpc.ErrorCode(jsonrpc.ErrorInvalidParams), rpcErr.Code) + require.Contains(t, rpcErr.Message, "data_provider must be a valid Ethereum address") + }) + } +} + +func TestCreateStream_DuplicateStream(t *testing.T) { + mockDB := &utils.MockDB{ + ExecuteFn: func(ctx context.Context, stmt string, args ...any) (*kwilsql.ResultSet, error) { + return nil, fmt.Errorf("duplicate key value violates unique constraint") + }, + } + ext := newTestExtension(mockDB) + + _, rpcErr := ext.CreateStream(context.Background(), &CreateStreamRequest{ + DataProvider: "0xEC36224A679218Ae28FCeCe8d3c68595B87Dd832", + StreamID: "st00000000000000000000000000test", + StreamType: "primitive", + }) + require.NotNil(t, rpcErr) + require.Equal(t, jsonrpc.ErrorCode(jsonrpc.ErrorInvalidParams), rpcErr.Code) + require.Contains(t, rpcErr.Message, "stream already exists") +} + +func TestCreateStream_DuplicateStream_PgError(t *testing.T) { + mockDB := &utils.MockDB{ + ExecuteFn: func(ctx context.Context, stmt string, args ...any) (*kwilsql.ResultSet, error) { + return nil, &pgconn.PgError{Code: pgUniqueViolation, Message: "unique_violation"} + }, + } + ext := newTestExtension(mockDB) + + _, rpcErr := ext.CreateStream(context.Background(), &CreateStreamRequest{ + DataProvider: "0xEC36224A679218Ae28FCeCe8d3c68595B87Dd832", + StreamID: "st00000000000000000000000000test", + StreamType: "primitive", + }) + require.NotNil(t, rpcErr) + require.Equal(t, jsonrpc.ErrorCode(jsonrpc.ErrorInvalidParams), rpcErr.Code) + require.Contains(t, rpcErr.Message, "stream already exists") +} + +func TestCreateStream_DBError(t *testing.T) { + mockDB := &utils.MockDB{ + ExecuteFn: func(ctx context.Context, stmt string, args ...any) (*kwilsql.ResultSet, error) { + return nil, fmt.Errorf("connection refused") + }, + } + ext := newTestExtension(mockDB) + + _, rpcErr := ext.CreateStream(context.Background(), &CreateStreamRequest{ + DataProvider: "0xEC36224A679218Ae28FCeCe8d3c68595B87Dd832", + StreamID: "st00000000000000000000000000test", + StreamType: "primitive", + }) + require.NotNil(t, rpcErr) + require.Equal(t, jsonrpc.ErrorCode(jsonrpc.ErrorInternal), rpcErr.Code) + require.Contains(t, rpcErr.Message, "failed to create stream") +} diff --git a/extensions/tn_local/db_ops.go b/extensions/tn_local/db_ops.go index e173e330..1fae2ae0 100644 --- a/extensions/tn_local/db_ops.go +++ b/extensions/tn_local/db_ops.go @@ -29,6 +29,53 @@ func (ext *Extension) dbCreateStream(ctx context.Context, dataProvider, streamID return err } +// dbLookupStreamRef looks up a stream by data_provider and stream_id. +// Returns (id, stream_type, nil) if found, or (0, "", nil) if not found. +func (ext *Extension) dbLookupStreamRef(ctx context.Context, dataProvider, streamID string) (int64, string, error) { + rs, err := ext.db.Execute(ctx, fmt.Sprintf( + `SELECT id, stream_type FROM %s.streams WHERE data_provider = $1 AND stream_id = $2`, SchemaName), + dataProvider, streamID) + if err != nil { + return 0, "", err + } + if len(rs.Rows) == 0 { + return 0, "", nil + } + id, ok := rs.Rows[0][0].(int64) + if !ok { + return 0, "", fmt.Errorf("unexpected id type: %T", rs.Rows[0][0]) + } + streamType, ok := rs.Rows[0][1].(string) + if !ok { + return 0, "", fmt.Errorf("unexpected stream_type type: %T", rs.Rows[0][1]) + } + return id, streamType, nil +} + +// dbInsertRecords batch-inserts resolved records into ext_tn_local.primitive_events +// within a transaction. Mirrors the consensus INSERT in 003-primitive-insertion.sql. +func (ext *Extension) dbInsertRecords(ctx context.Context, streamRefs []int64, eventTimes []int64, values []string) error { + createdAt := time.Now().Unix() + + tx, err := ext.db.BeginTx(ctx) + if err != nil { + return fmt.Errorf("begin transaction: %w", err) + } + defer func() { _ = tx.Rollback(ctx) }() + + for i := range streamRefs { + _, err := tx.Execute(ctx, fmt.Sprintf( + `INSERT INTO %s.primitive_events (stream_ref, event_time, value, created_at) + VALUES ($1, $2, $3, $4)`, SchemaName), + streamRefs[i], eventTimes[i], values[i], createdAt) + if err != nil { + return err + } + } + + return tx.Commit(ctx) +} + // SetupSchema creates the ext_tn_local schema and all tables within a single transaction. func (l *LocalDB) SetupSchema(ctx context.Context) error { l.logger.Info("setting up local storage schema") diff --git a/extensions/tn_local/handlers.go b/extensions/tn_local/handlers.go index 28c6f919..57130623 100644 --- a/extensions/tn_local/handlers.go +++ b/extensions/tn_local/handlers.go @@ -4,7 +4,9 @@ import ( "context" "errors" "fmt" + "math" "regexp" + "strconv" "strings" "github.com/jackc/pgx/v5/pgconn" @@ -88,9 +90,92 @@ func (ext *Extension) CreateStream(ctx context.Context, req *CreateStreamRequest return &CreateStreamResponse{}, nil } -// InsertRecords inserts records into a local primitive stream. (Task 4) +// InsertRecords inserts records into local primitive streams. +// Mirrors the consensus insert_records action (003-primitive-insertion.sql): +// - Parallel arrays: data_provider[], stream_id[], event_time[], value[] +// - Zero values are silently filtered (WHERE value != 0) +// - Multiple rows per (stream_ref, event_time) allowed (created_at versioning) +// - Returns empty response (consensus returns nothing) func (ext *Extension) InsertRecords(ctx context.Context, req *InsertRecordsRequest) (*InsertRecordsResponse, *jsonrpc.Error) { - return nil, jsonrpc.NewError(jsonrpc.ErrorInternal, "not implemented", nil) + if req == nil { + return nil, jsonrpc.NewError(jsonrpc.ErrorInvalidParams, "missing request", nil) + } + + n := len(req.DataProvider) + if n == 0 { + return nil, jsonrpc.NewError(jsonrpc.ErrorInvalidParams, "records must not be empty", nil) + } + if n != len(req.StreamID) || n != len(req.EventTime) || n != len(req.Value) { + return nil, jsonrpc.NewError(jsonrpc.ErrorInvalidParams, "array lengths mismatch", nil) + } + + // Normalize data_providers to lowercase (consensus uses LOWER() in 001-common-actions.sql). + for i := range req.DataProvider { + req.DataProvider[i] = strings.ToLower(req.DataProvider[i]) + } + + // Validate all inputs upfront. + for i := 0; i < n; i++ { + if err := validateDataProvider(req.DataProvider[i]); err != nil { + return nil, jsonrpc.NewError(jsonrpc.ErrorInvalidParams, fmt.Sprintf("record %d: %v", i, err), nil) + } + if err := validateStreamID(req.StreamID[i]); err != nil { + return nil, jsonrpc.NewError(jsonrpc.ErrorInvalidParams, fmt.Sprintf("record %d: %v", i, err), nil) + } + f, parseErr := strconv.ParseFloat(req.Value[i], 64) + if parseErr != nil { + return nil, jsonrpc.NewError(jsonrpc.ErrorInvalidParams, fmt.Sprintf("invalid record value at index %d: %v", i, parseErr), nil) + } + if math.IsNaN(f) || math.IsInf(f, 0) { + return nil, jsonrpc.NewError(jsonrpc.ErrorInvalidParams, fmt.Sprintf("invalid record value at index %d: must be a finite number", i), nil) + } + } + + // Resolve stream refs for unique (data_provider, stream_id) pairs. + type streamKey struct{ dp, sid string } + streamRefMap := make(map[streamKey]int64) + for i := 0; i < n; i++ { + key := streamKey{req.DataProvider[i], req.StreamID[i]} + if _, ok := streamRefMap[key]; ok { + continue + } + ref, stype, err := ext.dbLookupStreamRef(ctx, key.dp, key.sid) + if err != nil { + ext.logger.Error("failed to look up stream", "error", err) + return nil, jsonrpc.NewError(jsonrpc.ErrorInternal, "failed to look up stream", nil) + } + if ref == 0 { + return nil, jsonrpc.NewError(jsonrpc.ErrorInvalidParams, fmt.Sprintf("stream not found: %s/%s", key.dp, key.sid), nil) + } + if stype != "primitive" { + return nil, jsonrpc.NewError(jsonrpc.ErrorInvalidParams, fmt.Sprintf("stream %s/%s is not a primitive stream", key.dp, key.sid), nil) + } + streamRefMap[key] = ref + } + + // Build resolved records, filtering zero values (mirrors consensus WHERE value != 0). + streamRefs := make([]int64, 0, n) + eventTimes := make([]int64, 0, n) + values := make([]string, 0, n) + for i := 0; i < n; i++ { + f, _ := strconv.ParseFloat(req.Value[i], 64) + if f == 0 { + continue + } + key := streamKey{req.DataProvider[i], req.StreamID[i]} + streamRefs = append(streamRefs, streamRefMap[key]) + eventTimes = append(eventTimes, req.EventTime[i]) + values = append(values, req.Value[i]) + } + + if len(streamRefs) > 0 { + if err := ext.dbInsertRecords(ctx, streamRefs, eventTimes, values); err != nil { + ext.logger.Error("failed to insert records", "error", err) + return nil, jsonrpc.NewError(jsonrpc.ErrorInternal, "failed to insert records", nil) + } + } + + return &InsertRecordsResponse{}, nil } // InsertTaxonomy adds a taxonomy entry to a local composed stream. (Task 5) diff --git a/extensions/tn_local/insert_records_test.go b/extensions/tn_local/insert_records_test.go new file mode 100644 index 00000000..78c2de09 --- /dev/null +++ b/extensions/tn_local/insert_records_test.go @@ -0,0 +1,311 @@ +package tn_local + +import ( + "context" + "fmt" + "strings" + "testing" + + "github.com/stretchr/testify/require" + jsonrpc "github.com/trufnetwork/kwil-db/core/rpc/json" + kwilsql "github.com/trufnetwork/kwil-db/node/types/sql" + "github.com/trufnetwork/node/tests/utils" +) + +// mockDBWithStream returns a MockDB that simulates a stream lookup returning the given +// streamRef and streamType, and captures INSERT statements via executeFn. +func mockDBWithStream(streamRef int64, streamType string, executeFn func(ctx context.Context, stmt string, args ...any) (*kwilsql.ResultSet, error)) *utils.MockDB { + return &utils.MockDB{ + ExecuteFn: func(ctx context.Context, stmt string, args ...any) (*kwilsql.ResultSet, error) { + if strings.Contains(stmt, "SELECT") { + if streamRef == 0 { + return &kwilsql.ResultSet{Rows: [][]any{}}, nil + } + return &kwilsql.ResultSet{ + Columns: []string{"id", "stream_type"}, + Rows: [][]any{{streamRef, streamType}}, + }, nil + } + if executeFn != nil { + return executeFn(ctx, stmt, args...) + } + return &kwilsql.ResultSet{}, nil + }, + BeginTxFn: func(ctx context.Context) (kwilsql.Tx, error) { + return &utils.MockTx{ + ExecuteFn: func(ctx context.Context, stmt string, args ...any) (*kwilsql.ResultSet, error) { + if executeFn != nil { + return executeFn(ctx, stmt, args...) + } + return &kwilsql.ResultSet{}, nil + }, + }, nil + }, + } +} + +const testDP = "0xEC36224A679218Ae28FCeCe8d3c68595B87Dd832" +const testSID = "st00000000000000000000000000test" + +func TestInsertRecords_NilRequest(t *testing.T) { + ext := newTestExtension(&utils.MockDB{}) + + resp, rpcErr := ext.InsertRecords(context.Background(), nil) + require.Nil(t, resp) + require.NotNil(t, rpcErr) + require.Equal(t, jsonrpc.ErrorCode(jsonrpc.ErrorInvalidParams), rpcErr.Code) + require.Contains(t, rpcErr.Message, "missing request") +} + +func TestInsertRecords_EmptyArrays(t *testing.T) { + ext := newTestExtension(&utils.MockDB{}) + + _, rpcErr := ext.InsertRecords(context.Background(), &InsertRecordsRequest{ + DataProvider: []string{}, + StreamID: []string{}, + EventTime: []int64{}, + Value: []string{}, + }) + require.NotNil(t, rpcErr) + require.Equal(t, jsonrpc.ErrorCode(jsonrpc.ErrorInvalidParams), rpcErr.Code) + require.Contains(t, rpcErr.Message, "records must not be empty") +} + +func TestInsertRecords_ArrayLengthMismatch(t *testing.T) { + ext := newTestExtension(&utils.MockDB{}) + + _, rpcErr := ext.InsertRecords(context.Background(), &InsertRecordsRequest{ + DataProvider: []string{testDP, testDP}, + StreamID: []string{testSID}, + EventTime: []int64{1000}, + Value: []string{"1.0"}, + }) + require.NotNil(t, rpcErr) + require.Equal(t, jsonrpc.ErrorCode(jsonrpc.ErrorInvalidParams), rpcErr.Code) + require.Contains(t, rpcErr.Message, "array lengths mismatch") +} + +func TestInsertRecords_Success(t *testing.T) { + var capturedStmts []string + var capturedArgs [][]any + mockDB := mockDBWithStream(42, "primitive", func(ctx context.Context, stmt string, args ...any) (*kwilsql.ResultSet, error) { + capturedStmts = append(capturedStmts, stmt) + capturedArgs = append(capturedArgs, args) + return &kwilsql.ResultSet{}, nil + }) + ext := newTestExtension(mockDB) + + resp, rpcErr := ext.InsertRecords(context.Background(), &InsertRecordsRequest{ + DataProvider: []string{testDP, testDP}, + StreamID: []string{testSID, testSID}, + EventTime: []int64{1000, 2000}, + Value: []string{"123.456", "789.012"}, + }) + + require.Nil(t, rpcErr, "expected no error") + require.NotNil(t, resp) + + // Two INSERT statements (one per record) + require.Len(t, capturedStmts, 2) + for _, stmt := range capturedStmts { + require.Contains(t, stmt, "INSERT INTO "+SchemaName+".primitive_events") + } + + // Each INSERT has 4 args: stream_ref, event_time, value, created_at + require.Len(t, capturedArgs[0], 4) + require.Equal(t, int64(42), capturedArgs[0][0]) // stream_ref + require.Equal(t, int64(1000), capturedArgs[0][1]) + require.Equal(t, "123.456", capturedArgs[0][2]) + createdAt, ok := capturedArgs[0][3].(int64) + require.True(t, ok, "created_at should be int64") + require.NotZero(t, createdAt) + + require.Equal(t, int64(42), capturedArgs[1][0]) + require.Equal(t, int64(2000), capturedArgs[1][1]) + require.Equal(t, "789.012", capturedArgs[1][2]) +} + +func TestInsertRecords_ZeroValuesFiltered(t *testing.T) { + var capturedArgs [][]any + mockDB := mockDBWithStream(42, "primitive", func(ctx context.Context, stmt string, args ...any) (*kwilsql.ResultSet, error) { + capturedArgs = append(capturedArgs, args) + return &kwilsql.ResultSet{}, nil + }) + ext := newTestExtension(mockDB) + + resp, rpcErr := ext.InsertRecords(context.Background(), &InsertRecordsRequest{ + DataProvider: []string{testDP, testDP, testDP}, + StreamID: []string{testSID, testSID, testSID}, + EventTime: []int64{1000, 2000, 3000}, + Value: []string{"1.5", "0", "2.5"}, + }) + + require.Nil(t, rpcErr) + require.NotNil(t, resp) + + // Only 2 inserts — the "0" record is filtered like consensus + require.Len(t, capturedArgs, 2) + require.Equal(t, int64(1000), capturedArgs[0][1]) + require.Equal(t, "1.5", capturedArgs[0][2]) + require.Equal(t, int64(3000), capturedArgs[1][1]) + require.Equal(t, "2.5", capturedArgs[1][2]) +} + +func TestInsertRecords_AllZerosNoInsert(t *testing.T) { + insertCalled := false + mockDB := mockDBWithStream(42, "primitive", func(ctx context.Context, stmt string, args ...any) (*kwilsql.ResultSet, error) { + insertCalled = true + return &kwilsql.ResultSet{}, nil + }) + ext := newTestExtension(mockDB) + + resp, rpcErr := ext.InsertRecords(context.Background(), &InsertRecordsRequest{ + DataProvider: []string{testDP}, + StreamID: []string{testSID}, + EventTime: []int64{1000}, + Value: []string{"0"}, + }) + + require.Nil(t, rpcErr) + require.NotNil(t, resp) + require.False(t, insertCalled, "should not call dbInsertRecords when all values are zero") +} + +func TestInsertRecords_StreamNotFound(t *testing.T) { + mockDB := mockDBWithStream(0, "", nil) + ext := newTestExtension(mockDB) + + _, rpcErr := ext.InsertRecords(context.Background(), &InsertRecordsRequest{ + DataProvider: []string{testDP}, + StreamID: []string{testSID}, + EventTime: []int64{1000}, + Value: []string{"1.0"}, + }) + require.NotNil(t, rpcErr) + require.Equal(t, jsonrpc.ErrorCode(jsonrpc.ErrorInvalidParams), rpcErr.Code) + require.Contains(t, rpcErr.Message, "stream not found") +} + +func TestInsertRecords_ComposedStreamRejected(t *testing.T) { + mockDB := mockDBWithStream(42, "composed", nil) + ext := newTestExtension(mockDB) + + _, rpcErr := ext.InsertRecords(context.Background(), &InsertRecordsRequest{ + DataProvider: []string{testDP}, + StreamID: []string{testSID}, + EventTime: []int64{1000}, + Value: []string{"1.0"}, + }) + require.NotNil(t, rpcErr) + require.Equal(t, jsonrpc.ErrorCode(jsonrpc.ErrorInvalidParams), rpcErr.Code) + require.Contains(t, rpcErr.Message, "is not a primitive stream") +} + +func TestInsertRecords_DBError(t *testing.T) { + mockDB := mockDBWithStream(42, "primitive", func(ctx context.Context, stmt string, args ...any) (*kwilsql.ResultSet, error) { + return nil, fmt.Errorf("connection refused") + }) + ext := newTestExtension(mockDB) + + _, rpcErr := ext.InsertRecords(context.Background(), &InsertRecordsRequest{ + DataProvider: []string{testDP}, + StreamID: []string{testSID}, + EventTime: []int64{1000}, + Value: []string{"1.0"}, + }) + require.NotNil(t, rpcErr) + require.Equal(t, jsonrpc.ErrorCode(jsonrpc.ErrorInternal), rpcErr.Code) + require.Contains(t, rpcErr.Message, "failed to insert records") +} + +func TestInsertRecords_InvalidDataProvider(t *testing.T) { + ext := newTestExtension(&utils.MockDB{}) + + _, rpcErr := ext.InsertRecords(context.Background(), &InsertRecordsRequest{ + DataProvider: []string{"invalid"}, + StreamID: []string{testSID}, + EventTime: []int64{1000}, + Value: []string{"1.0"}, + }) + require.NotNil(t, rpcErr) + require.Equal(t, jsonrpc.ErrorCode(jsonrpc.ErrorInvalidParams), rpcErr.Code) + require.Contains(t, rpcErr.Message, "data_provider must be a valid Ethereum address") +} + +func TestInsertRecords_InvalidStreamID(t *testing.T) { + ext := newTestExtension(&utils.MockDB{}) + + _, rpcErr := ext.InsertRecords(context.Background(), &InsertRecordsRequest{ + DataProvider: []string{testDP}, + StreamID: []string{"bad"}, + EventTime: []int64{1000}, + Value: []string{"1.0"}, + }) + require.NotNil(t, rpcErr) + require.Equal(t, jsonrpc.ErrorCode(jsonrpc.ErrorInvalidParams), rpcErr.Code) + require.Contains(t, rpcErr.Message, "stream_id must be exactly 32 characters") +} + +func TestInsertRecords_InvalidValue(t *testing.T) { + mockDB := mockDBWithStream(42, "primitive", nil) + ext := newTestExtension(mockDB) + + tests := []struct { + name string + value string + wantMsg string + }{ + {"non-numeric", "hello", "invalid record value at index 0"}, + {"empty", "", "invalid record value at index 0"}, + {"NaN", "NaN", "must be a finite number"}, + {"Inf", "Inf", "must be a finite number"}, + {"-Inf", "-Inf", "must be a finite number"}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + _, rpcErr := ext.InsertRecords(context.Background(), &InsertRecordsRequest{ + DataProvider: []string{testDP}, + StreamID: []string{testSID}, + EventTime: []int64{1000}, + Value: []string{tt.value}, + }) + require.NotNil(t, rpcErr) + require.Equal(t, jsonrpc.ErrorCode(jsonrpc.ErrorInvalidParams), rpcErr.Code) + require.Contains(t, rpcErr.Message, tt.wantMsg) + }) + } +} + +func TestInsertRecords_InvalidValueAtIndex(t *testing.T) { + mockDB := mockDBWithStream(42, "primitive", nil) + ext := newTestExtension(mockDB) + + _, rpcErr := ext.InsertRecords(context.Background(), &InsertRecordsRequest{ + DataProvider: []string{testDP, testDP}, + StreamID: []string{testSID, testSID}, + EventTime: []int64{1000, 2000}, + Value: []string{"1.0", "bad"}, + }) + require.NotNil(t, rpcErr) + require.Equal(t, jsonrpc.ErrorCode(jsonrpc.ErrorInvalidParams), rpcErr.Code) + require.Contains(t, rpcErr.Message, "invalid record value at index 1") +} + +func TestInsertRecords_LookupDBError(t *testing.T) { + mockDB := &utils.MockDB{ + ExecuteFn: func(ctx context.Context, stmt string, args ...any) (*kwilsql.ResultSet, error) { + return nil, fmt.Errorf("connection refused") + }, + } + ext := newTestExtension(mockDB) + + _, rpcErr := ext.InsertRecords(context.Background(), &InsertRecordsRequest{ + DataProvider: []string{testDP}, + StreamID: []string{testSID}, + EventTime: []int64{1000}, + Value: []string{"1.0"}, + }) + require.NotNil(t, rpcErr) + require.Equal(t, jsonrpc.ErrorCode(jsonrpc.ErrorInternal), rpcErr.Code) + require.Contains(t, rpcErr.Message, "failed to look up stream") +} diff --git a/extensions/tn_local/schema.go b/extensions/tn_local/schema.go index d043a277..03d27ae9 100644 --- a/extensions/tn_local/schema.go +++ b/extensions/tn_local/schema.go @@ -65,6 +65,8 @@ func ensurePrimitiveEventsTable(ctx context.Context, tx sql.Tx) error { return fmt.Errorf("create primitive_events table: %w", err) } + // Non-unique index mirrors consensus primitive_events_query_idx (017-normalize-tables.sql:105). + // Multiple rows per (stream_ref, event_time) are allowed — created_at provides versioning. createIndex := fmt.Sprintf(` CREATE INDEX IF NOT EXISTS local_pe_stream_time_idx ON %s.primitive_events (stream_ref, event_time)`, SchemaName) diff --git a/extensions/tn_local/tn_local_test.go b/extensions/tn_local/tn_local_test.go index ffd5eb4e..2f784afd 100644 --- a/extensions/tn_local/tn_local_test.go +++ b/extensions/tn_local/tn_local_test.go @@ -2,13 +2,11 @@ package tn_local import ( "context" - "fmt" "io" "strings" "sync" "testing" - "github.com/jackc/pgx/v5/pgconn" "github.com/stretchr/testify/require" "github.com/trufnetwork/kwil-db/core/log" jsonrpc "github.com/trufnetwork/kwil-db/core/rpc/json" @@ -146,187 +144,6 @@ func newTestExtension(db kwilsql.DB) *Extension { return ext } -func TestCreateStream_NilRequest(t *testing.T) { - ext := newTestExtension(&utils.MockDB{}) - - resp, rpcErr := ext.CreateStream(context.Background(), nil) - require.Nil(t, resp) - require.NotNil(t, rpcErr) - require.Equal(t, jsonrpc.ErrorCode(jsonrpc.ErrorInvalidParams), rpcErr.Code) - require.Contains(t, rpcErr.Message, "missing request") -} - -func TestCreateStream_Success(t *testing.T) { - var capturedStmt string - var capturedArgs []any - mockDB := &utils.MockDB{ - ExecuteFn: func(ctx context.Context, stmt string, args ...any) (*kwilsql.ResultSet, error) { - capturedStmt = stmt - capturedArgs = args - return &kwilsql.ResultSet{}, nil - }, - } - ext := newTestExtension(mockDB) - - resp, rpcErr := ext.CreateStream(context.Background(), &CreateStreamRequest{ - DataProvider: "0xEC36224A679218Ae28FCeCe8d3c68595B87Dd832", - StreamID: "st00000000000000000000000000test", - StreamType: "primitive", - }) - - require.Nil(t, rpcErr, "expected no error") - require.NotNil(t, resp) - require.Contains(t, capturedStmt, "INSERT INTO "+SchemaName+".streams") - require.Len(t, capturedArgs, 4, "INSERT should have 4 parameters") - // data_provider should be lowercased (matching consensus behavior) - require.Equal(t, "0xec36224a679218ae28fcece8d3c68595b87dd832", capturedArgs[0]) - require.Equal(t, "st00000000000000000000000000test", capturedArgs[1]) - require.Equal(t, "primitive", capturedArgs[2]) - // created_at should be a non-zero unix timestamp - createdAt, ok := capturedArgs[3].(int64) - require.True(t, ok, "created_at should be int64") - require.NotZero(t, createdAt, "created_at should be non-zero") -} - -func TestCreateStream_ComposedType(t *testing.T) { - mockDB := &utils.MockDB{ - ExecuteFn: func(ctx context.Context, stmt string, args ...any) (*kwilsql.ResultSet, error) { - return &kwilsql.ResultSet{}, nil - }, - } - ext := newTestExtension(mockDB) - - resp, rpcErr := ext.CreateStream(context.Background(), &CreateStreamRequest{ - DataProvider: "0xEC36224A679218Ae28FCeCe8d3c68595B87Dd832", - StreamID: "st00000000000000000000000000test", - StreamType: "composed", - }) - - require.Nil(t, rpcErr) - require.NotNil(t, resp) -} - -func TestCreateStream_InvalidStreamID(t *testing.T) { - ext := newTestExtension(&utils.MockDB{}) - - tests := []struct { - name string - streamID string - wantMsg string - }{ - {"too short", "st00", "must be exactly 32 characters"}, - {"too long", "st000000000000000000000000000test1", "must be exactly 32 characters"}, - {"wrong prefix", "xx00000000000000000000000000test", "must start with 'st'"}, - {"empty", "", "must be exactly 32 characters"}, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - _, rpcErr := ext.CreateStream(context.Background(), &CreateStreamRequest{ - DataProvider: "0xEC36224A679218Ae28FCeCe8d3c68595B87Dd832", - StreamID: tt.streamID, - StreamType: "primitive", - }) - require.NotNil(t, rpcErr) - require.Equal(t, jsonrpc.ErrorCode(jsonrpc.ErrorInvalidParams), rpcErr.Code) - require.Contains(t, rpcErr.Message, tt.wantMsg) - }) - } -} - -func TestCreateStream_InvalidStreamType(t *testing.T) { - ext := newTestExtension(&utils.MockDB{}) - - _, rpcErr := ext.CreateStream(context.Background(), &CreateStreamRequest{ - DataProvider: "0xEC36224A679218Ae28FCeCe8d3c68595B87Dd832", - StreamID: "st00000000000000000000000000test", - StreamType: "invalid", - }) - require.NotNil(t, rpcErr) - require.Equal(t, jsonrpc.ErrorCode(jsonrpc.ErrorInvalidParams), rpcErr.Code) - require.Contains(t, rpcErr.Message, "must be 'primitive' or 'composed'") -} - -func TestCreateStream_InvalidDataProvider(t *testing.T) { - ext := newTestExtension(&utils.MockDB{}) - - tests := []struct { - name string - dataProvider string - }{ - {"no 0x prefix", "EC36224A679218Ae28FCeCe8d3c68595B87Dd832"}, - {"too short", "0xEC36224A679218Ae28"}, - {"too long", "0xEC36224A679218Ae28FCeCe8d3c68595B87Dd832FF"}, - {"invalid chars", "0xGG36224A679218Ae28FCeCe8d3c68595B87Dd832"}, - {"empty", ""}, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - _, rpcErr := ext.CreateStream(context.Background(), &CreateStreamRequest{ - DataProvider: tt.dataProvider, - StreamID: "st00000000000000000000000000test", - StreamType: "primitive", - }) - require.NotNil(t, rpcErr) - require.Equal(t, jsonrpc.ErrorCode(jsonrpc.ErrorInvalidParams), rpcErr.Code) - require.Contains(t, rpcErr.Message, "data_provider must be a valid Ethereum address") - }) - } -} - -func TestCreateStream_DuplicateStream(t *testing.T) { - mockDB := &utils.MockDB{ - ExecuteFn: func(ctx context.Context, stmt string, args ...any) (*kwilsql.ResultSet, error) { - return nil, fmt.Errorf("duplicate key value violates unique constraint") - }, - } - ext := newTestExtension(mockDB) - - _, rpcErr := ext.CreateStream(context.Background(), &CreateStreamRequest{ - DataProvider: "0xEC36224A679218Ae28FCeCe8d3c68595B87Dd832", - StreamID: "st00000000000000000000000000test", - StreamType: "primitive", - }) - require.NotNil(t, rpcErr) - require.Equal(t, jsonrpc.ErrorCode(jsonrpc.ErrorInvalidParams), rpcErr.Code) - require.Contains(t, rpcErr.Message, "stream already exists") -} - -func TestCreateStream_DuplicateStream_PgError(t *testing.T) { - mockDB := &utils.MockDB{ - ExecuteFn: func(ctx context.Context, stmt string, args ...any) (*kwilsql.ResultSet, error) { - return nil, &pgconn.PgError{Code: pgUniqueViolation, Message: "unique_violation"} - }, - } - ext := newTestExtension(mockDB) - - _, rpcErr := ext.CreateStream(context.Background(), &CreateStreamRequest{ - DataProvider: "0xEC36224A679218Ae28FCeCe8d3c68595B87Dd832", - StreamID: "st00000000000000000000000000test", - StreamType: "primitive", - }) - require.NotNil(t, rpcErr) - require.Equal(t, jsonrpc.ErrorCode(jsonrpc.ErrorInvalidParams), rpcErr.Code) - require.Contains(t, rpcErr.Message, "stream already exists") -} - -func TestCreateStream_DBError(t *testing.T) { - mockDB := &utils.MockDB{ - ExecuteFn: func(ctx context.Context, stmt string, args ...any) (*kwilsql.ResultSet, error) { - return nil, fmt.Errorf("connection refused") - }, - } - ext := newTestExtension(mockDB) - - _, rpcErr := ext.CreateStream(context.Background(), &CreateStreamRequest{ - DataProvider: "0xEC36224A679218Ae28FCeCe8d3c68595B87Dd832", - StreamID: "st00000000000000000000000000test", - StreamType: "primitive", - }) - require.NotNil(t, rpcErr) - require.Equal(t, jsonrpc.ErrorCode(jsonrpc.ErrorInternal), rpcErr.Code) - require.Contains(t, rpcErr.Message, "failed to create stream") -} - func containsSQL(statements []string, substr string) bool { for _, s := range statements { if strings.Contains(s, substr) { diff --git a/extensions/tn_local/types.go b/extensions/tn_local/types.go index 6d10be55..d0695510 100644 --- a/extensions/tn_local/types.go +++ b/extensions/tn_local/types.go @@ -11,22 +11,18 @@ type CreateStreamRequest struct { type CreateStreamResponse struct{} // InsertRecordsRequest is the JSON-RPC request for local.insert_records. +// Mirrors the consensus insert_records($data_provider TEXT[], $stream_id TEXT[], +// $event_time INT8[], $value NUMERIC(36,18)[]) signature — parallel arrays. type InsertRecordsRequest struct { - DataProvider string `json:"data_provider"` - StreamID string `json:"stream_id"` - Records []RecordInput `json:"records"` -} - -// RecordInput is a single record to insert. -type RecordInput struct { - EventTime int64 `json:"event_time"` - Value string `json:"value"` + DataProvider []string `json:"data_provider"` + StreamID []string `json:"stream_id"` + EventTime []int64 `json:"event_time"` + Value []string `json:"value"` } // InsertRecordsResponse is the JSON-RPC response for local.insert_records. -type InsertRecordsResponse struct { - Count int `json:"count"` -} +// Mirrors consensus which returns nothing. +type InsertRecordsResponse struct{} // InsertTaxonomyRequest is the JSON-RPC request for local.insert_taxonomy. type InsertTaxonomyRequest struct {