diff --git a/enterprise/internal/insights/resolvers/insight_series_resolver_test.go b/enterprise/internal/insights/resolvers/insight_series_resolver_test.go index 2c143b9be85b..b8100f9cfafe 100644 --- a/enterprise/internal/insights/resolvers/insight_series_resolver_test.go +++ b/enterprise/internal/insights/resolvers/insight_series_resolver_test.go @@ -110,6 +110,6 @@ func TestResolver_InsightSeries(t *testing.T) { if err != nil { t.Fatal(err) } - autogold.Want("insights[0][0].Points mocked", "[{p:{Time:{wall:0 ext:63271811045 loc:} Value:1}} {p:{Time:{wall:0 ext:63271811045 loc:} Value:2}} {p:{Time:{wall:0 ext:63271811045 loc:} Value:3}}]").Equal(t, fmt.Sprintf("%+v", points)) + autogold.Want("insights[0][0].Points mocked", "[{p:{Time:{wall:0 ext:63271811045 loc:} Value:1 Metadata:[]}} {p:{Time:{wall:0 ext:63271811045 loc:} Value:2 Metadata:[]}} {p:{Time:{wall:0 ext:63271811045 loc:} Value:3 Metadata:[]}}]").Equal(t, fmt.Sprintf("%+v", points)) }) } diff --git a/enterprise/internal/insights/store/mock_store_interface.go b/enterprise/internal/insights/store/mock_store_interface.go index 0ef935976bd6..8af2c0e0777e 100644 --- a/enterprise/internal/insights/store/mock_store_interface.go +++ b/enterprise/internal/insights/store/mock_store_interface.go @@ -12,6 +12,9 @@ import ( // github.com/sourcegraph/sourcegraph/enterprise/internal/insights/store) // used for unit testing. type MockInterface struct { + // RecordSeriesPointFunc is an instance of a mock function object + // controlling the behavior of the method RecordSeriesPoint. + RecordSeriesPointFunc *InterfaceRecordSeriesPointFunc // SeriesPointsFunc is an instance of a mock function object controlling // the behavior of the method SeriesPoints. SeriesPointsFunc *InterfaceSeriesPointsFunc @@ -21,6 +24,11 @@ type MockInterface struct { // methods return zero values for all results, unless overwritten. func NewMockInterface() *MockInterface { return &MockInterface{ + RecordSeriesPointFunc: &InterfaceRecordSeriesPointFunc{ + defaultHook: func(context.Context, RecordSeriesPointArgs) error { + return nil + }, + }, SeriesPointsFunc: &InterfaceSeriesPointsFunc{ defaultHook: func(context.Context, SeriesPointsOpts) ([]SeriesPoint, error) { return nil, nil @@ -33,12 +41,121 @@ func NewMockInterface() *MockInterface { // All methods delegate to the given implementation, unless overwritten. func NewMockInterfaceFrom(i Interface) *MockInterface { return &MockInterface{ + RecordSeriesPointFunc: &InterfaceRecordSeriesPointFunc{ + defaultHook: i.RecordSeriesPoint, + }, SeriesPointsFunc: &InterfaceSeriesPointsFunc{ defaultHook: i.SeriesPoints, }, } } +// InterfaceRecordSeriesPointFunc describes the behavior when the +// RecordSeriesPoint method of the parent MockInterface instance is invoked. +type InterfaceRecordSeriesPointFunc struct { + defaultHook func(context.Context, RecordSeriesPointArgs) error + hooks []func(context.Context, RecordSeriesPointArgs) error + history []InterfaceRecordSeriesPointFuncCall + mutex sync.Mutex +} + +// RecordSeriesPoint delegates to the next hook function in the queue and +// stores the parameter and result values of this invocation. +func (m *MockInterface) RecordSeriesPoint(v0 context.Context, v1 RecordSeriesPointArgs) error { + r0 := m.RecordSeriesPointFunc.nextHook()(v0, v1) + m.RecordSeriesPointFunc.appendCall(InterfaceRecordSeriesPointFuncCall{v0, v1, r0}) + return r0 +} + +// SetDefaultHook sets function that is called when the RecordSeriesPoint +// method of the parent MockInterface instance is invoked and the hook queue +// is empty. +func (f *InterfaceRecordSeriesPointFunc) SetDefaultHook(hook func(context.Context, RecordSeriesPointArgs) error) { + f.defaultHook = hook +} + +// PushHook adds a function to the end of hook queue. Each invocation of the +// RecordSeriesPoint method of the parent MockInterface instance invokes the +// hook at the front of the queue and discards it. After the queue is empty, +// the default hook function is invoked for any future action. +func (f *InterfaceRecordSeriesPointFunc) PushHook(hook func(context.Context, RecordSeriesPointArgs) error) { + f.mutex.Lock() + f.hooks = append(f.hooks, hook) + f.mutex.Unlock() +} + +// SetDefaultReturn calls SetDefaultDefaultHook with a function that returns +// the given values. +func (f *InterfaceRecordSeriesPointFunc) SetDefaultReturn(r0 error) { + f.SetDefaultHook(func(context.Context, RecordSeriesPointArgs) error { + return r0 + }) +} + +// PushReturn calls PushDefaultHook with a function that returns the given +// values. +func (f *InterfaceRecordSeriesPointFunc) PushReturn(r0 error) { + f.PushHook(func(context.Context, RecordSeriesPointArgs) error { + return r0 + }) +} + +func (f *InterfaceRecordSeriesPointFunc) nextHook() func(context.Context, RecordSeriesPointArgs) error { + f.mutex.Lock() + defer f.mutex.Unlock() + + if len(f.hooks) == 0 { + return f.defaultHook + } + + hook := f.hooks[0] + f.hooks = f.hooks[1:] + return hook +} + +func (f *InterfaceRecordSeriesPointFunc) appendCall(r0 InterfaceRecordSeriesPointFuncCall) { + f.mutex.Lock() + f.history = append(f.history, r0) + f.mutex.Unlock() +} + +// History returns a sequence of InterfaceRecordSeriesPointFuncCall objects +// describing the invocations of this function. +func (f *InterfaceRecordSeriesPointFunc) History() []InterfaceRecordSeriesPointFuncCall { + f.mutex.Lock() + history := make([]InterfaceRecordSeriesPointFuncCall, len(f.history)) + copy(history, f.history) + f.mutex.Unlock() + + return history +} + +// InterfaceRecordSeriesPointFuncCall is an object that describes an +// invocation of method RecordSeriesPoint on an instance of MockInterface. +type InterfaceRecordSeriesPointFuncCall struct { + // Arg0 is the value of the 1st argument passed to this method + // invocation. + Arg0 context.Context + // Arg1 is the value of the 2nd argument passed to this method + // invocation. + Arg1 RecordSeriesPointArgs + // Result0 is the value of the 1st result returned from this method + // invocation. + Result0 error +} + +// Args returns an interface slice containing the arguments of this +// invocation. +func (c InterfaceRecordSeriesPointFuncCall) Args() []interface{} { + return []interface{}{c.Arg0, c.Arg1} +} + +// Results returns an interface slice containing the results of this +// invocation. +func (c InterfaceRecordSeriesPointFuncCall) Results() []interface{} { + return []interface{}{c.Result0} +} + // InterfaceSeriesPointsFunc describes the behavior when the SeriesPoints // method of the parent MockInterface instance is invoked. type InterfaceSeriesPointsFunc struct { diff --git a/enterprise/internal/insights/store/store.go b/enterprise/internal/insights/store/store.go index 6662d77c0e61..1f05fbe2509c 100644 --- a/enterprise/internal/insights/store/store.go +++ b/enterprise/internal/insights/store/store.go @@ -3,11 +3,14 @@ package store import ( "context" "database/sql" + "encoding/json" "fmt" "time" "github.com/keegancsmith/sqlf" + "github.com/pkg/errors" + "github.com/sourcegraph/sourcegraph/internal/api" "github.com/sourcegraph/sourcegraph/internal/database/basestore" "github.com/sourcegraph/sourcegraph/internal/database/dbutil" "github.com/sourcegraph/sourcegraph/internal/timeutil" @@ -17,6 +20,7 @@ import ( // for actual API usage. type Interface interface { SeriesPoints(ctx context.Context, opts SeriesPointsOpts) ([]SeriesPoint, error) + RecordSeriesPoint(ctx context.Context, v RecordSeriesPointArgs) error } var _ Interface = &Store{} @@ -55,15 +59,27 @@ func (s *Store) With(other basestore.ShareableStore) *Store { var _ Interface = &Store{} // SeriesPoint describes a single insights' series data point. +// +// Some fields that could be queried (series ID, repo ID/names) are omitted as they are primarily +// only useful for filtering the data you get back, and would inflate the data size considerably +// otherwise. type SeriesPoint struct { - Time time.Time - Value float64 + Time time.Time + Value float64 + Metadata []byte +} + +func (s *SeriesPoint) String() string { + return fmt.Sprintf("SeriesPoint{Time: %q, Value: %v, Metadata: %s}", s.Time, s.Value, s.Metadata) } // SeriesPointsOpts describes options for querying insights' series data points. type SeriesPointsOpts struct { // SeriesID is the unique series ID to query, if non-nil. - SeriesID *int32 + SeriesID *string + + // TODO(slimsag): Add ability to filter based on repo ID, name, original name. + // TODO(slimsag): Add ability to do limited filtering based on metadata. // Time ranges to query from/to, if non-nil. From, To *time.Time @@ -80,6 +96,7 @@ func (s *Store) SeriesPoints(ctx context.Context, opts SeriesPointsOpts) ([]Seri err := sc.Scan( &point.Time, &point.Value, + &point.Metadata, ) if err != nil { return err @@ -91,8 +108,12 @@ func (s *Store) SeriesPoints(ctx context.Context, opts SeriesPointsOpts) ([]Seri } var seriesPointsQueryFmtstr = ` --- source: enterprise/internal/insights/store/series_points.go -SELECT time, value FROM series_points +-- source: enterprise/internal/insights/store/store.go:SeriesPoints +SELECT time, + value, + m.metadata +FROM series_points p +INNER JOIN metadata m ON p.metadata_id = m.id WHERE %s ORDER BY time DESC ` @@ -123,6 +144,127 @@ func seriesPointsQuery(opts SeriesPointsOpts) *sqlf.Query { ) } +// RecordSeriesPointArgs describes arguments for the RecordSeriesPoint method. +type RecordSeriesPointArgs struct { + // SeriesID is the unique series ID to query. It should describe the series of data uniquely, + // but is not a DB table primary key ID. + SeriesID string + + // Point is the actual data point recorded and at what time. + Point SeriesPoint + + // Repository name and DB ID to associate with this data point, if any. + // + // Both must be specified if one is specified. + RepoName *string + RepoID *api.RepoID + + // Metadata contains arbitrary JSON metadata to associate with the data point, if any. + // + // See the DB schema comments for intended use cases. This should generally be small, + // low-cardinality data to avoid inflating the table. + Metadata interface{} +} + +// RecordSeriesPoint records a data point for the specfied series ID (which is a unique ID for the +// series, not a DB table primary key ID). +func (s *Store) RecordSeriesPoint(ctx context.Context, v RecordSeriesPointArgs) (err error) { + // Start transaction. + var txStore *basestore.Store + txStore, err = s.Transact(ctx) + if err != nil { + return err + } + defer func() { err = txStore.Done(err) }() + + if (v.RepoName != nil && v.RepoID == nil) || (v.RepoID != nil && v.RepoName == nil) { + return errors.New("RepoName and RepoID must be mutually specified") + } + + // Upsert the repository name into a separate table, so we get a small ID we can reference + // many times from the series_points table without storing the repo name multiple times. + var repoNameID *int + if v.RepoName != nil { + repoNameIDValue, ok, err := basestore.ScanFirstInt(txStore.Query(ctx, sqlf.Sprintf(upsertRepoNameFmtStr, *v.RepoName, *v.RepoName))) + if err != nil { + return errors.Wrap(err, "upserting repo name ID") + } + if !ok { + return errors.Wrap(err, "repo name ID not found (this should never happen)") + } + repoNameID = &repoNameIDValue + } + + // Upsert the metadata into a separate table, so we get a small ID we can reference many times + // from the series_points table without storing the metadata multiple times. + var metadataID *int + if v.Metadata != nil { + jsonMetadata, err := json.Marshal(v.Metadata) + if err != nil { + return errors.Wrap(err, "upserting: encoding metadata") + } + metadataIDValue, ok, err := basestore.ScanFirstInt(txStore.Query(ctx, sqlf.Sprintf(upsertMetadataFmtStr, jsonMetadata, jsonMetadata))) + if err != nil { + return errors.Wrap(err, "upserting metadata ID") + } + if !ok { + return errors.Wrap(err, "metadata ID not found (this should never happen)") + } + metadataID = &metadataIDValue + } + + // Insert the actual data point. + return txStore.Exec(ctx, sqlf.Sprintf( + recordSeriesPointFmtstr, + v.SeriesID, // series_id + v.Point.Time, // time + v.Point.Value, // value + metadataID, // metadata_id + v.RepoID, // repo_id + repoNameID, // repo_name_id + repoNameID, // original_repo_name_id + )) +} + +const upsertRepoNameFmtStr = ` +-- source: enterprise/internal/insights/store/store.go:RecordSeriesPoint +WITH e AS( + INSERT INTO repo_names(name) + VALUES (%s) + ON CONFLICT DO NOTHING + RETURNING id +) +SELECT * FROM e +UNION + SELECT id FROM repo_names WHERE name = %s; +` + +const upsertMetadataFmtStr = ` +-- source: enterprise/internal/insights/store/store.go:RecordSeriesPoint +WITH e AS( + INSERT INTO metadata(metadata) + VALUES (%s) + ON CONFLICT DO NOTHING + RETURNING id +) +SELECT * FROM e +UNION + SELECT id FROM metadata WHERE metadata = %s; +` + +const recordSeriesPointFmtstr = ` +-- source: enterprise/internal/insights/store/store.go:RecordSeriesPoint +INSERT INTO series_points( + series_id, + time, + value, + metadata_id, + repo_id, + repo_name_id, + original_repo_name_id) +VALUES (%s, %s, %s, %s, %s, %s, %s); +` + func (s *Store) query(ctx context.Context, q *sqlf.Query, sc scanFunc) error { rows, err := s.Store.Query(ctx, q) if err != nil { diff --git a/enterprise/internal/insights/store/store_test.go b/enterprise/internal/insights/store/store_test.go index bff028c047cb..8f5219f9f42e 100644 --- a/enterprise/internal/insights/store/store_test.go +++ b/enterprise/internal/insights/store/store_test.go @@ -2,13 +2,13 @@ package store import ( "context" - "fmt" "testing" "time" "github.com/hexops/autogold" "github.com/sourcegraph/sourcegraph/enterprise/internal/insights/dbtesting" + "github.com/sourcegraph/sourcegraph/internal/api" "github.com/sourcegraph/sourcegraph/internal/timeutil" ) @@ -73,8 +73,8 @@ SELECT time, t.Fatal(err) } autogold.Want("SeriesPoints(2).len", int(913)).Equal(t, len(points)) - autogold.Want("SeriesPoints(2)[len()-1]", "{Time:2020-01-01 00:00:00 +0000 UTC Value:-20.00716650672132}").Equal(t, fmt.Sprintf("%+v", points[len(points)-1])) - autogold.Want("SeriesPoints(2)[0]", "{Time:2020-06-01 00:00:00 +0000 UTC Value:-37.8750440811433}").Equal(t, fmt.Sprintf("%+v", points[0])) + autogold.Want("SeriesPoints(2)[len()-1].String()", `SeriesPoint{Time: "2020-01-01 00:00:00 +0000 UTC", Value: -20.00716650672132, Metadata: {"hello": "world", "languages": ["Go", "Python", "Java"]}}`).Equal(t, points[len(points)-1].String()) + autogold.Want("SeriesPoints(2)[0].String()", `SeriesPoint{Time: "2020-06-01 00:00:00 +0000 UTC", Value: -37.8750440811433, Metadata: {"hello": "world", "languages": ["Go", "Python", "Java"]}}`).Equal(t, points[0].String()) }) t.Run("subset of data", func(t *testing.T) { @@ -87,8 +87,8 @@ SELECT time, t.Fatal(err) } autogold.Want("SeriesPoints(3).len", int(551)).Equal(t, len(points)) - autogold.Want("SeriesPoints(3)[0]", "{Time:2020-05-31 20:00:00 +0000 UTC Value:-11.269436460802638}").Equal(t, fmt.Sprintf("%+v", points[0])) - autogold.Want("SeriesPoints(3)[len()-1]", "{Time:2020-03-01 04:00:00 +0000 UTC Value:35.85710033014749}").Equal(t, fmt.Sprintf("%+v", points[len(points)-1])) + autogold.Want("SeriesPoints(3)[0].String()", `SeriesPoint{Time: "2020-05-31 20:00:00 +0000 UTC", Value: -11.269436460802638, Metadata: {"hello": "world", "languages": ["Go", "Python", "Java"]}}`).Equal(t, points[0].String()) + autogold.Want("SeriesPoints(3)[len()-1].String()", `SeriesPoint{Time: "2020-03-01 04:00:00 +0000 UTC", Value: 35.85710033014749, Metadata: {"hello": "world", "languages": ["Go", "Python", "Java"]}}`).Equal(t, points[len(points)-1].String()) }) t.Run("latest 3 points", func(t *testing.T) { @@ -100,9 +100,64 @@ SELECT time, t.Fatal(err) } autogold.Want("SeriesPoints(4).len", int(3)).Equal(t, len(points)) - autogold.Want("SeriesPoints(4)[0]", "{Time:2020-06-01 00:00:00 +0000 UTC Value:-37.8750440811433}").Equal(t, fmt.Sprintf("%+v", points[0])) - autogold.Want("SeriesPoints(4)[1]", "{Time:2020-05-31 20:00:00 +0000 UTC Value:-11.269436460802638}").Equal(t, fmt.Sprintf("%+v", points[1])) - autogold.Want("SeriesPoints(4)[2]", "{Time:2020-05-31 16:00:00 +0000 UTC Value:17.838503552871998}").Equal(t, fmt.Sprintf("%+v", points[2])) + autogold.Want("SeriesPoints(4)[0].String()", `SeriesPoint{Time: "2020-06-01 00:00:00 +0000 UTC", Value: -37.8750440811433, Metadata: {"hello": "world", "languages": ["Go", "Python", "Java"]}}`).Equal(t, points[0].String()) + autogold.Want("SeriesPoints(4)[1].String()", `SeriesPoint{Time: "2020-05-31 20:00:00 +0000 UTC", Value: -11.269436460802638, Metadata: {"hello": "world", "languages": ["Go", "Python", "Java"]}}`).Equal(t, points[1].String()) + autogold.Want("SeriesPoints(4)[2].String()", `SeriesPoint{Time: "2020-05-31 16:00:00 +0000 UTC", Value: 17.838503552871998, Metadata: {"hello": "world", "languages": ["Go", "Python", "Java"]}}`).Equal(t, points[2].String()) }) } + +func TestRecordSeriesPoints(t *testing.T) { + if testing.Short() { + t.Skip() + } + t.Parallel() + + ctx := context.Background() + clock := timeutil.Now + timescale, cleanup := dbtesting.TimescaleDB(t) + defer cleanup() + store := NewWithClock(timescale, clock) + + time := func(s string) time.Time { + v, err := time.Parse(time.RFC3339, s) + if err != nil { + t.Fatal(err) + } + return v + } + optionalString := func(v string) *string { return &v } + optionalRepoID := func(v api.RepoID) *api.RepoID { return &v } + + // Record some data points. + for _, record := range []RecordSeriesPointArgs{ + { + SeriesID: "one", + Point: SeriesPoint{Time: time("2020-03-01T00:00:00Z"), Value: 1.1}, + RepoName: optionalString("repo1"), + RepoID: optionalRepoID(3), + Metadata: map[string]interface{}{"some": "data"}, + }, + { + SeriesID: "two", + Point: SeriesPoint{Time: time("2020-03-02T00:00:00Z"), Value: 2.2}, + Metadata: []interface{}{"some", "data", "two"}, + }, + } { + if err := store.RecordSeriesPoint(ctx, record); err != nil { + t.Fatal(err) + } + } + + // Confirm we get the expected data back. + points, err := store.SeriesPoints(ctx, SeriesPointsOpts{}) + if err != nil { + t.Fatal(err) + } + autogold.Want("len(points)", int(2)).Equal(t, len(points)) + autogold.Want("points[0].String()", `SeriesPoint{Time: "2020-03-02 00:00:00 +0000 UTC", Value: 2.2, Metadata: ["some", "data", "two"]}`).Equal(t, points[0].String()) + autogold.Want("points[1].String()", `SeriesPoint{Time: "2020-03-01 00:00:00 +0000 UTC", Value: 1.1, Metadata: {"some": "data"}}`).Equal(t, points[1].String()) + + // Confirm the data point with repository name got recorded correctly. + // TODO(slimsag): future: once we support querying by repo ID/names, add tests to ensure that information is inserted properly here. +}