From a5e4044e370a2a042eaada408fe2fbfccf750bcc Mon Sep 17 00:00:00 2001 From: Stephen Gutekanst Date: Mon, 15 Feb 2021 19:24:17 -0700 Subject: [PATCH 1/6] insights: discovery: add series ID hashing function Signed-off-by: Stephen Gutekanst --- .../internal/insights/discovery/series_id.go | 30 +++++++++++++ .../insights/discovery/series_id_test.go | 42 +++++++++++++++++++ 2 files changed, 72 insertions(+) create mode 100644 enterprise/internal/insights/discovery/series_id.go create mode 100644 enterprise/internal/insights/discovery/series_id_test.go diff --git a/enterprise/internal/insights/discovery/series_id.go b/enterprise/internal/insights/discovery/series_id.go new file mode 100644 index 000000000000..b6249f7c461d --- /dev/null +++ b/enterprise/internal/insights/discovery/series_id.go @@ -0,0 +1,30 @@ +package discovery + +import ( + "crypto/sha256" + "fmt" + + "github.com/sourcegraph/sourcegraph/schema" +) + +// EncodeSeriesID hashes the hashes the input series to return a string which uniquely identifies +// the data series being described. It is possible the same series is described in multiple user's +// settings, e.g. if multiple users declare an insight with the same search query - in which case +// we have an opportunity to deduplicate them. +// +// Note that since the series ID hash is stored in the database, it must remain stable or else past +// data will not be queryable. +func EncodeSeriesID(series *schema.InsightSeries) (string, error) { + switch { + case series.Search != "": + return fmt.Sprintf("s:%s", sha256String(series.Search)), nil + case series.Webhook != "": + return fmt.Sprintf("w:%s", sha256String(series.Webhook)), nil + default: + return "", fmt.Errorf("invalid series %+v", series) + } +} + +func sha256String(s string) string { + return fmt.Sprintf("%X", sha256.Sum256([]byte(s))) +} diff --git a/enterprise/internal/insights/discovery/series_id_test.go b/enterprise/internal/insights/discovery/series_id_test.go new file mode 100644 index 000000000000..a902b6a17bb0 --- /dev/null +++ b/enterprise/internal/insights/discovery/series_id_test.go @@ -0,0 +1,42 @@ +package discovery + +import ( + "fmt" + "testing" + + "github.com/hexops/autogold" + + "github.com/sourcegraph/sourcegraph/schema" +) + +func TestEncodeSeriesID(t *testing.T) { + testCases := []struct { + input *schema.InsightSeries + want autogold.Value + }{ + { + input: &schema.InsightSeries{Search: "fmt.Errorf repo:github.com/golang/go"}, + want: autogold.Want("basic_search", [2]interface{}{ + "s:6CB26B840C8EEBFB03DDB44A23FFBD4D7AD864B47D9AA1E975E69FCF0EE2A67E", + "", + }), + }, + { + input: &schema.InsightSeries{Webhook: "https://example.com/getData?foo=bar"}, + want: autogold.Want("basic_webhook", [2]interface{}{ + "w:CDAA477D902F8572B92EAC827A0E5FC27537BFC825EE358427E0DC04D22E0E25", + "", + }), + }, + { + input: &schema.InsightSeries{}, + want: autogold.Want("invalid", [2]interface{}{"", "invalid series &{Label: RepositoriesList:[] Search: Webhook:}"}), + }, + } + for _, tc := range testCases { + t.Run(tc.want.Name(), func(t *testing.T) { + got, err := EncodeSeriesID(tc.input) + tc.want.Equal(t, [2]interface{}{got, fmt.Sprint(err)}) + }) + } +} From ee8de4b17165bc132f36875688aa0610472a2057 Mon Sep 17 00:00:00 2001 From: Stephen Gutekanst Date: Mon, 15 Feb 2021 19:48:11 -0700 Subject: [PATCH 2/6] insights: discovery: fix settings parsing bug caught by test Signed-off-by: Stephen Gutekanst --- enterprise/internal/insights/discovery/discovery.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/enterprise/internal/insights/discovery/discovery.go b/enterprise/internal/insights/discovery/discovery.go index 350a5cb54663..1d13f781eea3 100644 --- a/enterprise/internal/insights/discovery/discovery.go +++ b/enterprise/internal/insights/discovery/discovery.go @@ -25,6 +25,9 @@ func Discover(ctx context.Context, settingStore SettingStore) ([]*schema.Insight return nil, err } globalSettings, err := parseUserSettings(globalSettingsRaw) + if err != nil { + return nil, err + } return globalSettings.Insights, nil } From d803906b99cd91142254264ea66a40031b19e12f Mon Sep 17 00:00:00 2001 From: Stephen Gutekanst Date: Mon, 15 Feb 2021 19:49:11 -0700 Subject: [PATCH 3/6] insights: background: enqueue insights from global settings Signed-off-by: Stephen Gutekanst --- .../insights/background/background.go | 4 +- .../insights/background/insight_enqueuer.go | 74 ++++++++- .../background/insight_enqueuer_test.go | 146 ++++++++++++++++++ 3 files changed, 215 insertions(+), 9 deletions(-) create mode 100644 enterprise/internal/insights/background/insight_enqueuer_test.go diff --git a/enterprise/internal/insights/background/background.go b/enterprise/internal/insights/background/background.go index 02d92c95ae60..cd046eeaf44c 100644 --- a/enterprise/internal/insights/background/background.go +++ b/enterprise/internal/insights/background/background.go @@ -12,6 +12,7 @@ import ( "github.com/sourcegraph/sourcegraph/enterprise/internal/insights" "github.com/sourcegraph/sourcegraph/enterprise/internal/insights/background/queryrunner" "github.com/sourcegraph/sourcegraph/enterprise/internal/insights/store" + "github.com/sourcegraph/sourcegraph/internal/database" "github.com/sourcegraph/sourcegraph/internal/database/basestore" "github.com/sourcegraph/sourcegraph/internal/goroutine" "github.com/sourcegraph/sourcegraph/internal/observation" @@ -43,6 +44,7 @@ func StartBackgroundJobs(ctx context.Context, mainAppDB *sql.DB) { // Create a base store to be used for storing worker state. We store this in the main app Postgres // DB, not the TimescaleDB (which we use only for storing insights data.) workerBaseStore := basestore.NewWithDB(mainAppDB, sql.TxOptions{}) + settingStore := database.Settings(mainAppDB) // Create basic metrics for recording information about background jobs. observationContext := &observation.Context{ @@ -55,7 +57,7 @@ func StartBackgroundJobs(ctx context.Context, mainAppDB *sql.DB) { // Start background goroutines for all of our workers. go goroutine.MonitorBackgroundRoutines(ctx, []goroutine.BackgroundRoutine{ // Register the background goroutine which discovers and enqueues insights work. - newInsightEnqueuer(ctx, workerBaseStore, observationContext), + newInsightEnqueuer(ctx, workerBaseStore, settingStore, observationContext), // Register the query-runner worker and resetter, which executes search queries and records // results to TimescaleDB. diff --git a/enterprise/internal/insights/background/insight_enqueuer.go b/enterprise/internal/insights/background/insight_enqueuer.go index 17ce69869afe..0576d2407707 100644 --- a/enterprise/internal/insights/background/insight_enqueuer.go +++ b/enterprise/internal/insights/background/insight_enqueuer.go @@ -5,16 +5,22 @@ import ( "fmt" "time" + "github.com/hashicorp/go-multierror" + "github.com/pkg/errors" + + "github.com/sourcegraph/sourcegraph/enterprise/internal/insights/background/queryrunner" + "github.com/sourcegraph/sourcegraph/enterprise/internal/insights/discovery" "github.com/sourcegraph/sourcegraph/internal/database/basestore" "github.com/sourcegraph/sourcegraph/internal/goroutine" "github.com/sourcegraph/sourcegraph/internal/metrics" "github.com/sourcegraph/sourcegraph/internal/observation" + "github.com/sourcegraph/sourcegraph/schema" ) // newInsightEnqueuer returns a background goroutine which will periodically find all of the search // and webhook insights across all user settings, and enqueue work for the query runner and webhook // runner workers to perform. -func newInsightEnqueuer(ctx context.Context, workerBaseStore *basestore.Store, observationContext *observation.Context) goroutine.BackgroundRoutine { +func newInsightEnqueuer(ctx context.Context, workerBaseStore *basestore.Store, settingStore discovery.SettingStore, observationContext *observation.Context) goroutine.BackgroundRoutine { metrics := metrics.NewOperationMetrics( observationContext.Registerer, "insights_enqueuer", @@ -34,13 +40,65 @@ func newInsightEnqueuer(ctx context.Context, workerBaseStore *basestore.Store, o return goroutine.NewPeriodicGoroutineWithMetrics(ctx, 10*time.Minute, goroutine.NewHandlerWithErrorMessage( "insights_enqueuer", func(ctx context.Context) error { - // TODO(slimsag): future: discover insights from settings store and enqueue them here. - // _, err := queryrunner.EnqueueJob(ctx, workerBaseStore, &queryrunner.Job{ - // SeriesID: "abcdefg", // TODO(slimsag) - // SearchQuery: "errorf", // TODO(slimsag) - // State: "queued", - // }) - return nil + queryRunnerEnqueueJob := func(ctx context.Context, job *queryrunner.Job) error { + _, err := queryrunner.EnqueueJob(ctx, workerBaseStore, job) + return err + } + return discoverAndEnqueueInsights(ctx, time.Now, settingStore, queryRunnerEnqueueJob) }, ), operation) } + +// discoverAndEnqueueInsights discovers insights defined in the given setting store from user/org/global +// settings and enqueues them to be executed and have insights recorded. +func discoverAndEnqueueInsights( + ctx context.Context, + now func() time.Time, + settingStore discovery.SettingStore, + enqueueQueryRunnerJob func(ctx context.Context, job *queryrunner.Job) error, +) error { + insights, err := discovery.Discover(ctx, settingStore) + if err != nil { + return errors.Wrap(err, "Discover") + } + + // Deduplicate series that may be unique (e.g. different name/description) but do not have + // unique data (i.e. use the same exact search query or webhook URL.) + var ( + uniqueSeries = map[string]*schema.InsightSeries{} + multi error + offset time.Duration + ) + for _, insight := range insights { + for _, series := range insight.Series { + seriesID, err := discovery.EncodeSeriesID(series) + if err != nil { + multi = multierror.Append(multi, err) + continue + } + _, enqueuedAlready := uniqueSeries[seriesID] + if enqueuedAlready { + continue + } + uniqueSeries[seriesID] = series + + // Enqueue jobs for each unique series, offsetting each job execution by a minute so we + // don't execute all queries at once and harm search performance in general. + if series.Webhook != "" { + continue // TODO(slimsag): future: add support for webhook insights + } + processAfter := now().Add(offset) + offset += 30 * time.Second + err = enqueueQueryRunnerJob(ctx, &queryrunner.Job{ + SeriesID: seriesID, + SearchQuery: series.Search, + ProcessAfter: &processAfter, + State: "queued", + }) + if err != nil { + multi = multierror.Append(multi, err) + } + } + } + return multi +} diff --git a/enterprise/internal/insights/background/insight_enqueuer_test.go b/enterprise/internal/insights/background/insight_enqueuer_test.go new file mode 100644 index 000000000000..701f61b90e37 --- /dev/null +++ b/enterprise/internal/insights/background/insight_enqueuer_test.go @@ -0,0 +1,146 @@ +package background + +import ( + "context" + "encoding/json" + "testing" + "time" + + "github.com/hexops/autogold" + + "github.com/sourcegraph/sourcegraph/enterprise/internal/insights/background/queryrunner" + "github.com/sourcegraph/sourcegraph/enterprise/internal/insights/discovery" + "github.com/sourcegraph/sourcegraph/internal/api" +) + +var testRealGlobalSettings = &api.Settings{ID: 1, Contents: `{ + "insights": [ + { + "title": "fmt usage", + "description": "fmt.Errorf/fmt.Printf usage", + "series": [ + { + "label": "fmt.Errorf", + "search": "errorf", + }, + { + "label": "printf", + "search": "fmt.Printf", + }, + { + "label": "duplicate", + "search": "errorf", + } + ] + }, + { + "title": "gitserver usage", + "description": "gitserver exec & close usage", + "series": [ + { + "label": "exec", + "search": "gitserver.Exec", + }, + { + "label": "close", + "search": "gitserver.Close", + }, + { + "label": "duplicate", + "search": "gitserver.Close", + } + ] + } + ] + } +`} + +// Test_discoverAndEnqueueInsights tests that insight discovery and job enqueueing works and +// adheres to a few properties: +// +// 1. Webhook insights are not enqueued (not yet supported.) +// 2. Duplicate insights are deduplicated / do not submit multiple jobs. +// 3. Jobs are scheduled not to all run at the same time. +// +func Test_discoverAndEnqueueInsights(t *testing.T) { + // Setup the setting store and job enqueuer mocks. + ctx := context.Background() + settingStore := discovery.NewMockSettingStore() + settingStore.GetLatestFunc.SetDefaultReturn(testRealGlobalSettings, nil) + var enqueued []*queryrunner.Job + enqueueQueryRunnerJob := func(ctx context.Context, job *queryrunner.Job) error { + enqueued = append(enqueued, job) + return nil + } + + // Create a fake clock so the times reported in our test data do not change and can be easily verified. + now, err := time.Parse(time.RFC3339, "2020-03-01T00:00:00Z") + if err != nil { + t.Fatal(err) + } + clock := func() time.Time { return now } + + if err := discoverAndEnqueueInsights(ctx, clock, settingStore, enqueueQueryRunnerJob); err != nil { + t.Fatal(err) + } + + // JSON marshal to keep times formatted nicely. + enqueuedJSON, err := json.MarshalIndent(enqueued, "", " ") + if err != nil { + t.Fatal(err) + } + autogold.Want("0", `[ + { + "SeriesID": "s:087855E6A24440837303FD8A252E9893E8ABDFECA55B61AC83DA1B521906626E", + "SearchQuery": "errorf", + "ID": 0, + "State": "queued", + "FailureMessage": null, + "StartedAt": null, + "FinishedAt": null, + "ProcessAfter": "2020-03-01T00:00:00Z", + "NumResets": 0, + "NumFailures": 0, + "ExecutionLogs": null + }, + { + "SeriesID": "s:7FBD292BF97936C4B6397688CFFB05DEA95E650C3D5B653AAEA8F77BBD25CE93", + "SearchQuery": "fmt.Printf", + "ID": 0, + "State": "queued", + "FailureMessage": null, + "StartedAt": null, + "FinishedAt": null, + "ProcessAfter": "2020-03-01T00:00:30Z", + "NumResets": 0, + "NumFailures": 0, + "ExecutionLogs": null + }, + { + "SeriesID": "s:FB8CFBB7C7C28834957FBE1B830EDD79C5E710FD55B0ACF246C0D7267C5462B4", + "SearchQuery": "gitserver.Exec", + "ID": 0, + "State": "queued", + "FailureMessage": null, + "StartedAt": null, + "FinishedAt": null, + "ProcessAfter": "2020-03-01T00:01:00Z", + "NumResets": 0, + "NumFailures": 0, + "ExecutionLogs": null + }, + { + "SeriesID": "s:2B55C7CE2EB30BFFAF1F0276E525B36BB71908E3893A27F416F62A3E23542566", + "SearchQuery": "gitserver.Close", + "ID": 0, + "State": "queued", + "FailureMessage": null, + "StartedAt": null, + "FinishedAt": null, + "ProcessAfter": "2020-03-01T00:01:30Z", + "NumResets": 0, + "NumFailures": 0, + "ExecutionLogs": null + } +]`).Equal(t, string(enqueuedJSON)) +} From ea788d4a3598c42309f01ff28d9637190119c97a Mon Sep 17 00:00:00 2001 From: Stephen Gutekanst Date: Tue, 16 Feb 2021 17:47:04 -0700 Subject: [PATCH 4/6] use constant Signed-off-by: Stephen Gutekanst --- enterprise/internal/insights/background/insight_enqueuer.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/enterprise/internal/insights/background/insight_enqueuer.go b/enterprise/internal/insights/background/insight_enqueuer.go index 0576d2407707..1837a052cca6 100644 --- a/enterprise/internal/insights/background/insight_enqueuer.go +++ b/enterprise/internal/insights/background/insight_enqueuer.go @@ -49,6 +49,8 @@ func newInsightEnqueuer(ctx context.Context, workerBaseStore *basestore.Store, s ), operation) } +const queryJobOffsetTime = 30 * time.Second + // discoverAndEnqueueInsights discovers insights defined in the given setting store from user/org/global // settings and enqueues them to be executed and have insights recorded. func discoverAndEnqueueInsights( @@ -88,7 +90,7 @@ func discoverAndEnqueueInsights( continue // TODO(slimsag): future: add support for webhook insights } processAfter := now().Add(offset) - offset += 30 * time.Second + offset += queryJobOffsetTime err = enqueueQueryRunnerJob(ctx, &queryrunner.Job{ SeriesID: seriesID, SearchQuery: series.Search, From 72d21553391325dcf58d875944c4be43fd4baf57 Mon Sep 17 00:00:00 2001 From: Stephen Gutekanst Date: Tue, 16 Feb 2021 17:47:34 -0700 Subject: [PATCH 5/6] raise enqueuement time to 3h https://sourcegraph.slack.com/archives/C014ZCKMCAV/p1613521223145400 Signed-off-by: Stephen Gutekanst --- enterprise/internal/insights/background/insight_enqueuer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/enterprise/internal/insights/background/insight_enqueuer.go b/enterprise/internal/insights/background/insight_enqueuer.go index 1837a052cca6..9cda11b6dece 100644 --- a/enterprise/internal/insights/background/insight_enqueuer.go +++ b/enterprise/internal/insights/background/insight_enqueuer.go @@ -37,7 +37,7 @@ func newInsightEnqueuer(ctx context.Context, workerBaseStore *basestore.Store, s // // See also https://github.com/sourcegraph/sourcegraph/pull/17227#issuecomment-779515187 for some very rough // data retention / scale concerns. - return goroutine.NewPeriodicGoroutineWithMetrics(ctx, 10*time.Minute, goroutine.NewHandlerWithErrorMessage( + return goroutine.NewPeriodicGoroutineWithMetrics(ctx, 3*time.Hour, goroutine.NewHandlerWithErrorMessage( "insights_enqueuer", func(ctx context.Context) error { queryRunnerEnqueueJob := func(ctx context.Context, job *queryrunner.Job) error { From 13cdc5610120fd581385ee830376ecca49a36a90 Mon Sep 17 00:00:00 2001 From: Stephen Gutekanst Date: Tue, 16 Feb 2021 17:49:20 -0700 Subject: [PATCH 6/6] one data point every 12h Relaxing further based on https://sourcegraph.slack.com/archives/C014ZCKMCAV/p1613522873146200?thread_ts=1613521223.145400&cid=C014ZCKMCAV Signed-off-by: Stephen Gutekanst --- enterprise/internal/insights/background/insight_enqueuer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/enterprise/internal/insights/background/insight_enqueuer.go b/enterprise/internal/insights/background/insight_enqueuer.go index 9cda11b6dece..bc983d2bb7ae 100644 --- a/enterprise/internal/insights/background/insight_enqueuer.go +++ b/enterprise/internal/insights/background/insight_enqueuer.go @@ -37,7 +37,7 @@ func newInsightEnqueuer(ctx context.Context, workerBaseStore *basestore.Store, s // // See also https://github.com/sourcegraph/sourcegraph/pull/17227#issuecomment-779515187 for some very rough // data retention / scale concerns. - return goroutine.NewPeriodicGoroutineWithMetrics(ctx, 3*time.Hour, goroutine.NewHandlerWithErrorMessage( + return goroutine.NewPeriodicGoroutineWithMetrics(ctx, 12*time.Hour, goroutine.NewHandlerWithErrorMessage( "insights_enqueuer", func(ctx context.Context) error { queryRunnerEnqueueJob := func(ctx context.Context, job *queryrunner.Job) error {