-
Notifications
You must be signed in to change notification settings - Fork 1.3k
insights: add background workers which execute search queries and store insights #18267
Changes from all commits
4e1193a
54f637d
375e50a
100f9e5
a6e95ba
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,103 @@ | ||
| package background | ||
|
|
||
| import ( | ||
| "context" | ||
| "database/sql" | ||
| "log" | ||
|
|
||
| "github.com/inconshreveable/log15" | ||
| "github.com/opentracing/opentracing-go" | ||
| "github.com/prometheus/client_golang/prometheus" | ||
|
|
||
| "github.com/sourcegraph/sourcegraph/enterprise/internal/insights" | ||
| "github.com/sourcegraph/sourcegraph/enterprise/internal/insights/background/queryrunner" | ||
| "github.com/sourcegraph/sourcegraph/enterprise/internal/insights/store" | ||
| "github.com/sourcegraph/sourcegraph/internal/database/basestore" | ||
| "github.com/sourcegraph/sourcegraph/internal/goroutine" | ||
| "github.com/sourcegraph/sourcegraph/internal/observation" | ||
| "github.com/sourcegraph/sourcegraph/internal/trace" | ||
| "github.com/sourcegraph/sourcegraph/internal/workerutil" | ||
| "github.com/sourcegraph/sourcegraph/internal/workerutil/dbworker" | ||
| ) | ||
|
|
||
| // StartBackgroundJobs is the main entrypoint which starts background jobs for code insights. It is | ||
| // called from the repo-updater service, currently. | ||
| func StartBackgroundJobs(ctx context.Context, mainAppDB *sql.DB) { | ||
| if !insights.IsEnabled() { | ||
| return | ||
| } | ||
|
|
||
| // Create a connection to TimescaleDB, so we can record results. | ||
| timescale, err := insights.InitializeCodeInsightsDB() | ||
| if err != nil { | ||
| // e.g. migration failed, DB unavailable, etc. code insights is non-functional so we do not | ||
| // want to continue. | ||
| // | ||
| // In some situations (i.e. if the frontend is running migrations), this will be expected | ||
| // and we should restart until the frontend finishes - the exact same way repo updater would | ||
| // behave if the frontend had not yet migrated the main app DB. | ||
| log.Fatal("failed to initialize code insights (set DISABLE_CODE_INSIGHTS=true if needed)", err) | ||
| } | ||
| store := store.New(timescale) | ||
|
|
||
| // Create a base store to be used for storing worker state. We store this in the main app Postgres | ||
| // DB, not the TimescaleDB (which we use only for storing insights data.) | ||
| workerBaseStore := basestore.NewWithDB(mainAppDB, sql.TxOptions{}) | ||
|
|
||
| // Create basic metrics for recording information about background jobs. | ||
| observationContext := &observation.Context{ | ||
| Logger: log15.Root(), | ||
| Tracer: &trace.Tracer{Tracer: opentracing.GlobalTracer()}, | ||
| Registerer: prometheus.DefaultRegisterer, | ||
| } | ||
| queryRunnerWorkerMetrics, queryRunnerResetterMetrics := newWorkerMetrics(observationContext, "query_runner_worker") | ||
|
|
||
| // Start background goroutines for all of our workers. | ||
| go goroutine.MonitorBackgroundRoutines(ctx, []goroutine.BackgroundRoutine{ | ||
| // Register the background goroutine which discovers and enqueues insights work. | ||
| newInsightEnqueuer(ctx, workerBaseStore, observationContext), | ||
|
|
||
| // Register the query-runner worker and resetter, which executes search queries and records | ||
| // results to TimescaleDB. | ||
| queryrunner.NewWorker(ctx, workerBaseStore, store, queryRunnerWorkerMetrics), | ||
| queryrunner.NewResetter(ctx, workerBaseStore, queryRunnerResetterMetrics), | ||
|
|
||
| // TODO(slimsag): future: register another worker here for webhook querying. | ||
| }...) | ||
| } | ||
|
|
||
| // newWorkerMetrics returns a basic set of metrics to be used for a worker and its resetter: | ||
| // | ||
| // * WorkerMetrics records worker operations & number of jobs. | ||
| // * ResetterMetrics records the number of jobs that got reset because workers timed out / took too | ||
| // long. | ||
| // | ||
| // Individual insights workers may then _also_ want to register their own metrics, if desired, in | ||
| // their NewWorker functions. | ||
| func newWorkerMetrics(observationContext *observation.Context, workerName string) (workerutil.WorkerMetrics, dbworker.ResetterMetrics) { | ||
| workerResets := prometheus.NewCounter(prometheus.CounterOpts{ | ||
| Name: "src_insights_" + workerName + "_resets_total", | ||
| Help: "The number of times work took too long and was reset for retry later.", | ||
| }) | ||
| observationContext.Registerer.MustRegister(workerResets) | ||
|
|
||
| workerResetFailures := prometheus.NewCounter(prometheus.CounterOpts{ | ||
| Name: "src_insights_" + workerName + "_reset_failures_total", | ||
| Help: "The number of times work took too long so many times that retries will no longer happen.", | ||
| }) | ||
| observationContext.Registerer.MustRegister(workerResetFailures) | ||
|
|
||
| workerErrors := prometheus.NewCounter(prometheus.CounterOpts{ | ||
| Name: "src_insights_" + workerName + "_errors_total", | ||
| Help: "The number of errors that occurred during a worker job.", | ||
| }) | ||
| observationContext.Registerer.MustRegister(workerErrors) | ||
|
|
||
| workerMetrics := workerutil.NewMetrics(observationContext, "insights_"+workerName, nil) | ||
| resetterMetrics := dbworker.ResetterMetrics{ | ||
| RecordResets: workerResets, | ||
| RecordResetFailures: workerResetFailures, | ||
| Errors: workerErrors, | ||
| } | ||
| return workerMetrics, resetterMetrics | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,46 @@ | ||
| package background | ||
|
|
||
| import ( | ||
| "context" | ||
| "fmt" | ||
| "time" | ||
|
|
||
| "github.com/sourcegraph/sourcegraph/internal/database/basestore" | ||
| "github.com/sourcegraph/sourcegraph/internal/goroutine" | ||
| "github.com/sourcegraph/sourcegraph/internal/metrics" | ||
| "github.com/sourcegraph/sourcegraph/internal/observation" | ||
| ) | ||
|
|
||
| // newInsightEnqueuer returns a background goroutine which will periodically find all of the search | ||
| // and webhook insights across all user settings, and enqueue work for the query runner and webhook | ||
| // runner workers to perform. | ||
| func newInsightEnqueuer(ctx context.Context, workerBaseStore *basestore.Store, observationContext *observation.Context) goroutine.BackgroundRoutine { | ||
| metrics := metrics.NewOperationMetrics( | ||
| observationContext.Registerer, | ||
| "insights_enqueuer", | ||
| metrics.WithCountHelp("Total number of insights enqueuer executions"), | ||
| ) | ||
| operation := observationContext.Operation(observation.Op{ | ||
| Name: fmt.Sprintf("Enqueuer.Run"), | ||
| Metrics: metrics, | ||
| }) | ||
|
|
||
| // Note: We run this goroutine once every 10 minutes, and StalledMaxAge in queryrunner/ is | ||
| // set to 60s. If you change this, make sure the StalledMaxAge is less than this period | ||
| // otherwise there is a fair chance we could enqueue work faster than it can be completed. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds like this comment is asking you to implement some form of backpressure. Plans or issues for that?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Definitely, I've filed https://github.com/sourcegraph/sourcegraph/issues/18308 just now and may address this myself soon, but also plan to file a more extensive set of follow-up issues in general around the insights backend for the next person who starts work on this. |
||
| // | ||
| // See also https://github.com/sourcegraph/sourcegraph/pull/17227#issuecomment-779515187 for some very rough | ||
| // data retention / scale concerns. | ||
| return goroutine.NewPeriodicGoroutineWithMetrics(ctx, 10*time.Minute, goroutine.NewHandlerWithErrorMessage( | ||
| "insights_enqueuer", | ||
| func(ctx context.Context) error { | ||
| // TODO(slimsag): future: discover insights from settings store and enqueue them here. | ||
| // _, err := queryrunner.EnqueueJob(ctx, workerBaseStore, &queryrunner.Job{ | ||
| // SeriesID: "abcdefg", // TODO(slimsag) | ||
| // SearchQuery: "errorf", // TODO(slimsag) | ||
| // State: "queued", | ||
| // }) | ||
| return nil | ||
| }, | ||
| ), operation) | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,109 @@ | ||
| package queryrunner | ||
|
|
||
| import ( | ||
| "bytes" | ||
| "context" | ||
| "encoding/json" | ||
| "fmt" | ||
| "net/url" | ||
|
|
||
| "github.com/sourcegraph/sourcegraph/internal/api" | ||
|
|
||
| "golang.org/x/net/context/ctxhttp" | ||
|
|
||
| "github.com/pkg/errors" | ||
| ) | ||
|
|
||
| // This file contains all the methods required to execute Sourcegraph searches using our GraphQL | ||
| // API and get results back. | ||
|
|
||
| // graphQLQuery describes a general GraphQL query and its variables. | ||
| type graphQLQuery struct { | ||
| Query string `json:"query"` | ||
| Variables interface{} `json:"variables"` | ||
| } | ||
|
|
||
| const gqlSearchQuery = `query Search( | ||
| $query: String!, | ||
| ) { | ||
| search(query: $query, ) { | ||
| results { | ||
| limitHit | ||
| cloning { name } | ||
| missing { name } | ||
| timedout { name } | ||
| matchCount | ||
| alert { | ||
| title | ||
| description | ||
| } | ||
| } | ||
| } | ||
| }` | ||
|
|
||
| type gqlSearchVars struct { | ||
| Query string `json:"query"` | ||
| } | ||
|
|
||
| type gqlSearchResponse struct { | ||
| Data struct { | ||
| Search struct { | ||
| Results struct { | ||
| LimitHit bool | ||
| Cloning []*api.Repo | ||
| Missing []*api.Repo | ||
| Timedout []*api.Repo | ||
| MatchCount int | ||
| Alert *struct { | ||
| Title string | ||
| Description string | ||
| } | ||
| } | ||
| } | ||
| } | ||
| Errors []interface{} | ||
| } | ||
|
|
||
| // search executes the given search query. | ||
| func search(ctx context.Context, query string) (*gqlSearchResponse, error) { | ||
| var buf bytes.Buffer | ||
| err := json.NewEncoder(&buf).Encode(graphQLQuery{ | ||
| Query: gqlSearchQuery, | ||
| Variables: gqlSearchVars{Query: query}, | ||
| }) | ||
| if err != nil { | ||
| return nil, errors.Wrap(err, "Encode") | ||
| } | ||
|
|
||
| url, err := gqlURL("InsightsSearch") | ||
| if err != nil { | ||
| return nil, errors.Wrap(err, "constructing frontend URL") | ||
| } | ||
|
|
||
| resp, err := ctxhttp.Post(ctx, nil, url, "application/json", &buf) | ||
| if err != nil { | ||
| return nil, errors.Wrap(err, "Post") | ||
| } | ||
| defer resp.Body.Close() | ||
|
|
||
| var res *gqlSearchResponse | ||
| if err := json.NewDecoder(resp.Body).Decode(&res); err != nil { | ||
| return nil, errors.Wrap(err, "Decode") | ||
| } | ||
| if len(res.Errors) > 0 { | ||
| return res, fmt.Errorf("graphql: errors: %v", res.Errors) | ||
| } | ||
| return res, nil | ||
| } | ||
|
|
||
| // gqlURL returns the frontend's internal GraphQL API URL, with the given ?queryName parameter | ||
| // which is used to keep track of the source and type of GraphQL queries. | ||
| func gqlURL(queryName string) (string, error) { | ||
| u, err := url.Parse(api.InternalClient.URL) | ||
| if err != nil { | ||
| return "", err | ||
| } | ||
| u.Path = "/.internal/graphql" | ||
| u.RawQuery = queryName | ||
| return u.String(), nil | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,93 @@ | ||
| package queryrunner | ||
|
|
||
| import ( | ||
| "context" | ||
| "fmt" | ||
| "time" | ||
|
|
||
| "github.com/inconshreveable/log15" | ||
|
|
||
| "github.com/sourcegraph/sourcegraph/enterprise/internal/insights/store" | ||
| "github.com/sourcegraph/sourcegraph/internal/database/basestore" | ||
| "github.com/sourcegraph/sourcegraph/internal/workerutil" | ||
| "github.com/sourcegraph/sourcegraph/internal/workerutil/dbworker" | ||
| dbworkerstore "github.com/sourcegraph/sourcegraph/internal/workerutil/dbworker/store" | ||
| ) | ||
|
|
||
| var _ dbworker.Handler = &workHandler{} | ||
|
|
||
| // workHandler implements the dbworker.Handler interface by executing search queries and | ||
| // inserting insights about them to the insights Timescale database. | ||
| type workHandler struct { | ||
| workerBaseStore *basestore.Store | ||
| insightsStore *store.Store | ||
| } | ||
|
|
||
| func (r *workHandler) Handle(ctx context.Context, workerStore dbworkerstore.Store, record workerutil.Record) (err error) { | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is the most reasonable way to test this mocking out every store (
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I try to do both in a particular way: make the handler a dumb enough driver that you can mock everything it calls and ensure it calls thing in the correct order. Then you can test each part individually. If you deconstruct it right you should have confidence in this approach (in the LSIF processor we test reading, correlation, and db writes independently and ensure that mocks are called in the correct order, not called if earlier errors occur, etc). |
||
| defer func() { | ||
| if err != nil { | ||
| log15.Error("insights.queryrunner.workHandler", "error", err) | ||
| } | ||
| }() | ||
|
|
||
| // Dequeue the job to get information about it, like what search query to perform. | ||
| job, err := dequeueJob(ctx, r.workerBaseStore, record.RecordID()) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| // Actually perform the search query. | ||
| // | ||
| // 🚨 SECURITY: The request is performed without authentication, we get back results from every | ||
| // repository on Sourcegraph - so we must be careful to only record insightful information that | ||
| // is OK to expose to every user on Sourcegraph (e.g. total result counts are fine, exposing | ||
| // that a repository exists may or may not be fine, exposing individual results is definitely | ||
| // not, etc.) | ||
| var results *gqlSearchResponse | ||
| results, err = search(ctx, job.SearchQuery) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| // TODO(slimsag): future: Logs are not a good way to surface these errors to users. | ||
| if len(results.Errors) > 0 { | ||
| return fmt.Errorf("GraphQL errors: %v", results.Errors) | ||
| } | ||
| if alert := results.Data.Search.Results.Alert; alert != nil { | ||
| return fmt.Errorf("insights query issue: alert: %v query=%q", alert, job.SearchQuery) | ||
| } | ||
| if results.Data.Search.Results.LimitHit { | ||
| log15.Error("insights query issue", "problem", "limit hit", "query", job.SearchQuery) | ||
| } | ||
| if cloning := len(results.Data.Search.Results.Cloning); cloning > 0 { | ||
| log15.Error("insights query issue", "cloning_repos", cloning, "query", job.SearchQuery) | ||
| } | ||
| if missing := len(results.Data.Search.Results.Missing); missing > 0 { | ||
| log15.Error("insights query issue", "missing_repos", missing, "query", job.SearchQuery) | ||
| } | ||
| if timedout := len(results.Data.Search.Results.Timedout); timedout > 0 { | ||
| log15.Error("insights query issue", "timedout_repos", timedout, "query", job.SearchQuery) | ||
| } | ||
|
|
||
| // Record the match count to the insights DB. | ||
| var matchCount int | ||
| if results != nil { | ||
| matchCount = results.Data.Search.Results.MatchCount | ||
| } | ||
|
|
||
| log15.Info("insights query runner", "found matches", matchCount, "query", job.SearchQuery) | ||
|
|
||
| // 🚨 SECURITY: The request is performed without authentication, we get back results from every | ||
| // repository on Sourcegraph - so we must be careful to only record insightful information that | ||
| // is OK to expose to every user on Sourcegraph (e.g. total result counts are fine, exposing | ||
| // that a repository exists may or may not be fine, exposing individual results is definitely | ||
| // not, etc.) | ||
| return r.insightsStore.RecordSeriesPoint(ctx, store.RecordSeriesPointArgs{ | ||
| SeriesID: job.SeriesID, | ||
| Point: store.SeriesPoint{ | ||
| Time: time.Now(), | ||
| Value: float64(matchCount), | ||
| }, | ||
| // TODO(slimsag): future: determine match count per repository, and store RepoID/RepoName (and maybe Metadata?) | ||
| }) | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Example of a function I don't think can be easily tested or refactored in a way to be easily tested.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is what CI/integration tests are for!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, although this does introduce a new challenge because we won't be supporting code insights in the single-container deployments. Interesting. I'll think about this some more. I wonder if we have plans to replace the single-container deployment used for CI tests with docker compose or similar.