Skip to content
This repository was archived by the owner on Sep 30, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion enterprise/internal/insights/background/background.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/sourcegraph/sourcegraph/enterprise/internal/insights"
"github.com/sourcegraph/sourcegraph/enterprise/internal/insights/background/queryrunner"
"github.com/sourcegraph/sourcegraph/enterprise/internal/insights/store"
"github.com/sourcegraph/sourcegraph/internal/database"
"github.com/sourcegraph/sourcegraph/internal/database/basestore"
"github.com/sourcegraph/sourcegraph/internal/goroutine"
"github.com/sourcegraph/sourcegraph/internal/observation"
Expand Down Expand Up @@ -43,6 +44,7 @@ func StartBackgroundJobs(ctx context.Context, mainAppDB *sql.DB) {
// Create a base store to be used for storing worker state. We store this in the main app Postgres
// DB, not the TimescaleDB (which we use only for storing insights data.)
workerBaseStore := basestore.NewWithDB(mainAppDB, sql.TxOptions{})
settingStore := database.Settings(mainAppDB)

// Create basic metrics for recording information about background jobs.
observationContext := &observation.Context{
Expand All @@ -55,7 +57,7 @@ func StartBackgroundJobs(ctx context.Context, mainAppDB *sql.DB) {
// Start background goroutines for all of our workers.
go goroutine.MonitorBackgroundRoutines(ctx, []goroutine.BackgroundRoutine{
// Register the background goroutine which discovers and enqueues insights work.
newInsightEnqueuer(ctx, workerBaseStore, observationContext),
newInsightEnqueuer(ctx, workerBaseStore, settingStore, observationContext),

// Register the query-runner worker and resetter, which executes search queries and records
// results to TimescaleDB.
Expand Down
78 changes: 69 additions & 9 deletions enterprise/internal/insights/background/insight_enqueuer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,22 @@ import (
"fmt"
"time"

"github.com/hashicorp/go-multierror"
"github.com/pkg/errors"

"github.com/sourcegraph/sourcegraph/enterprise/internal/insights/background/queryrunner"
"github.com/sourcegraph/sourcegraph/enterprise/internal/insights/discovery"
"github.com/sourcegraph/sourcegraph/internal/database/basestore"
"github.com/sourcegraph/sourcegraph/internal/goroutine"
"github.com/sourcegraph/sourcegraph/internal/metrics"
"github.com/sourcegraph/sourcegraph/internal/observation"
"github.com/sourcegraph/sourcegraph/schema"
)

// newInsightEnqueuer returns a background goroutine which will periodically find all of the search
// and webhook insights across all user settings, and enqueue work for the query runner and webhook
// runner workers to perform.
func newInsightEnqueuer(ctx context.Context, workerBaseStore *basestore.Store, observationContext *observation.Context) goroutine.BackgroundRoutine {
func newInsightEnqueuer(ctx context.Context, workerBaseStore *basestore.Store, settingStore discovery.SettingStore, observationContext *observation.Context) goroutine.BackgroundRoutine {
metrics := metrics.NewOperationMetrics(
observationContext.Registerer,
"insights_enqueuer",
Expand All @@ -31,16 +37,70 @@ func newInsightEnqueuer(ctx context.Context, workerBaseStore *basestore.Store, o
//
// See also https://github.com/sourcegraph/sourcegraph/pull/17227#issuecomment-779515187 for some very rough
// data retention / scale concerns.
return goroutine.NewPeriodicGoroutineWithMetrics(ctx, 10*time.Minute, goroutine.NewHandlerWithErrorMessage(
return goroutine.NewPeriodicGoroutineWithMetrics(ctx, 12*time.Hour, goroutine.NewHandlerWithErrorMessage(
"insights_enqueuer",
func(ctx context.Context) error {
// TODO(slimsag): future: discover insights from settings store and enqueue them here.
// _, err := queryrunner.EnqueueJob(ctx, workerBaseStore, &queryrunner.Job{
// SeriesID: "abcdefg", // TODO(slimsag)
// SearchQuery: "errorf", // TODO(slimsag)
// State: "queued",
// })
return nil
queryRunnerEnqueueJob := func(ctx context.Context, job *queryrunner.Job) error {
_, err := queryrunner.EnqueueJob(ctx, workerBaseStore, job)
return err
}
return discoverAndEnqueueInsights(ctx, time.Now, settingStore, queryRunnerEnqueueJob)
},
), operation)
}

const queryJobOffsetTime = 30 * time.Second

// discoverAndEnqueueInsights discovers insights defined in the given setting store from user/org/global
// settings and enqueues them to be executed and have insights recorded.
func discoverAndEnqueueInsights(
ctx context.Context,
now func() time.Time,
settingStore discovery.SettingStore,
enqueueQueryRunnerJob func(ctx context.Context, job *queryrunner.Job) error,
) error {
insights, err := discovery.Discover(ctx, settingStore)
if err != nil {
return errors.Wrap(err, "Discover")
}

// Deduplicate series that may be unique (e.g. different name/description) but do not have
// unique data (i.e. use the same exact search query or webhook URL.)
var (
uniqueSeries = map[string]*schema.InsightSeries{}
multi error
offset time.Duration
)
for _, insight := range insights {
for _, series := range insight.Series {
seriesID, err := discovery.EncodeSeriesID(series)
if err != nil {
multi = multierror.Append(multi, err)
continue
}
_, enqueuedAlready := uniqueSeries[seriesID]
if enqueuedAlready {
continue
}
uniqueSeries[seriesID] = series

// Enqueue jobs for each unique series, offsetting each job execution by a minute so we
// don't execute all queries at once and harm search performance in general.
if series.Webhook != "" {
continue // TODO(slimsag): future: add support for webhook insights
}
processAfter := now().Add(offset)
offset += queryJobOffsetTime
err = enqueueQueryRunnerJob(ctx, &queryrunner.Job{
SeriesID: seriesID,
SearchQuery: series.Search,
ProcessAfter: &processAfter,
State: "queued",
})
if err != nil {
multi = multierror.Append(multi, err)
}
}
}
return multi
}
146 changes: 146 additions & 0 deletions enterprise/internal/insights/background/insight_enqueuer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package background

import (
"context"
"encoding/json"
"testing"
"time"

"github.com/hexops/autogold"

"github.com/sourcegraph/sourcegraph/enterprise/internal/insights/background/queryrunner"
"github.com/sourcegraph/sourcegraph/enterprise/internal/insights/discovery"
"github.com/sourcegraph/sourcegraph/internal/api"
)

var testRealGlobalSettings = &api.Settings{ID: 1, Contents: `{
"insights": [
{
"title": "fmt usage",
"description": "fmt.Errorf/fmt.Printf usage",
"series": [
{
"label": "fmt.Errorf",
"search": "errorf",
},
{
"label": "printf",
"search": "fmt.Printf",
},
{
"label": "duplicate",
"search": "errorf",
}
]
},
{
"title": "gitserver usage",
"description": "gitserver exec & close usage",
"series": [
{
"label": "exec",
"search": "gitserver.Exec",
},
{
"label": "close",
"search": "gitserver.Close",
},
{
"label": "duplicate",
"search": "gitserver.Close",
}
]
}
]
}
`}

// Test_discoverAndEnqueueInsights tests that insight discovery and job enqueueing works and
// adheres to a few properties:
//
// 1. Webhook insights are not enqueued (not yet supported.)
// 2. Duplicate insights are deduplicated / do not submit multiple jobs.
// 3. Jobs are scheduled not to all run at the same time.
//
func Test_discoverAndEnqueueInsights(t *testing.T) {
// Setup the setting store and job enqueuer mocks.
ctx := context.Background()
settingStore := discovery.NewMockSettingStore()
settingStore.GetLatestFunc.SetDefaultReturn(testRealGlobalSettings, nil)
var enqueued []*queryrunner.Job
enqueueQueryRunnerJob := func(ctx context.Context, job *queryrunner.Job) error {
enqueued = append(enqueued, job)
return nil
}

// Create a fake clock so the times reported in our test data do not change and can be easily verified.
now, err := time.Parse(time.RFC3339, "2020-03-01T00:00:00Z")
if err != nil {
t.Fatal(err)
}
clock := func() time.Time { return now }

if err := discoverAndEnqueueInsights(ctx, clock, settingStore, enqueueQueryRunnerJob); err != nil {
t.Fatal(err)
}

// JSON marshal to keep times formatted nicely.
enqueuedJSON, err := json.MarshalIndent(enqueued, "", " ")
if err != nil {
t.Fatal(err)
}
autogold.Want("0", `[
{
"SeriesID": "s:087855E6A24440837303FD8A252E9893E8ABDFECA55B61AC83DA1B521906626E",
"SearchQuery": "errorf",
"ID": 0,
"State": "queued",
"FailureMessage": null,
"StartedAt": null,
"FinishedAt": null,
"ProcessAfter": "2020-03-01T00:00:00Z",
"NumResets": 0,
"NumFailures": 0,
"ExecutionLogs": null
},
{
"SeriesID": "s:7FBD292BF97936C4B6397688CFFB05DEA95E650C3D5B653AAEA8F77BBD25CE93",
"SearchQuery": "fmt.Printf",
"ID": 0,
"State": "queued",
"FailureMessage": null,
"StartedAt": null,
"FinishedAt": null,
"ProcessAfter": "2020-03-01T00:00:30Z",
"NumResets": 0,
"NumFailures": 0,
"ExecutionLogs": null
},
{
"SeriesID": "s:FB8CFBB7C7C28834957FBE1B830EDD79C5E710FD55B0ACF246C0D7267C5462B4",
"SearchQuery": "gitserver.Exec",
"ID": 0,
"State": "queued",
"FailureMessage": null,
"StartedAt": null,
"FinishedAt": null,
"ProcessAfter": "2020-03-01T00:01:00Z",
"NumResets": 0,
"NumFailures": 0,
"ExecutionLogs": null
},
{
"SeriesID": "s:2B55C7CE2EB30BFFAF1F0276E525B36BB71908E3893A27F416F62A3E23542566",
"SearchQuery": "gitserver.Close",
"ID": 0,
"State": "queued",
"FailureMessage": null,
"StartedAt": null,
"FinishedAt": null,
"ProcessAfter": "2020-03-01T00:01:30Z",
"NumResets": 0,
"NumFailures": 0,
"ExecutionLogs": null
}
]`).Equal(t, string(enqueuedJSON))
}
3 changes: 3 additions & 0 deletions enterprise/internal/insights/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ func Discover(ctx context.Context, settingStore SettingStore) ([]*schema.Insight
return nil, err
}
globalSettings, err := parseUserSettings(globalSettingsRaw)
if err != nil {
return nil, err
}
return globalSettings.Insights, nil
}

Expand Down
30 changes: 30 additions & 0 deletions enterprise/internal/insights/discovery/series_id.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package discovery

import (
"crypto/sha256"
"fmt"

"github.com/sourcegraph/sourcegraph/schema"
)

// EncodeSeriesID hashes the hashes the input series to return a string which uniquely identifies
// the data series being described. It is possible the same series is described in multiple user's
// settings, e.g. if multiple users declare an insight with the same search query - in which case
// we have an opportunity to deduplicate them.
//
// Note that since the series ID hash is stored in the database, it must remain stable or else past
// data will not be queryable.
func EncodeSeriesID(series *schema.InsightSeries) (string, error) {
switch {
case series.Search != "":
return fmt.Sprintf("s:%s", sha256String(series.Search)), nil
case series.Webhook != "":
return fmt.Sprintf("w:%s", sha256String(series.Webhook)), nil
default:
return "", fmt.Errorf("invalid series %+v", series)
}
}

func sha256String(s string) string {
return fmt.Sprintf("%X", sha256.Sum256([]byte(s)))
}
42 changes: 42 additions & 0 deletions enterprise/internal/insights/discovery/series_id_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package discovery

import (
"fmt"
"testing"

"github.com/hexops/autogold"

"github.com/sourcegraph/sourcegraph/schema"
)

func TestEncodeSeriesID(t *testing.T) {
testCases := []struct {
input *schema.InsightSeries
want autogold.Value
}{
{
input: &schema.InsightSeries{Search: "fmt.Errorf repo:github.com/golang/go"},
want: autogold.Want("basic_search", [2]interface{}{
"s:6CB26B840C8EEBFB03DDB44A23FFBD4D7AD864B47D9AA1E975E69FCF0EE2A67E",
"<nil>",
}),
},
{
input: &schema.InsightSeries{Webhook: "https://example.com/getData?foo=bar"},
want: autogold.Want("basic_webhook", [2]interface{}{
"w:CDAA477D902F8572B92EAC827A0E5FC27537BFC825EE358427E0DC04D22E0E25",
"<nil>",
}),
},
{
input: &schema.InsightSeries{},
want: autogold.Want("invalid", [2]interface{}{"", "invalid series &{Label: RepositoriesList:[] Search: Webhook:}"}),
},
}
for _, tc := range testCases {
t.Run(tc.want.Name(), func(t *testing.T) {
got, err := EncodeSeriesID(tc.input)
tc.want.Equal(t, [2]interface{}{got, fmt.Sprint(err)})
})
}
}