diff --git a/enterprise/cmd/repo-updater/main.go b/enterprise/cmd/repo-updater/main.go index f8718be58799..48b595315e82 100644 --- a/enterprise/cmd/repo-updater/main.go +++ b/enterprise/cmd/repo-updater/main.go @@ -16,6 +16,7 @@ import ( "github.com/sourcegraph/sourcegraph/enterprise/internal/campaigns" codemonitorsBackground "github.com/sourcegraph/sourcegraph/enterprise/internal/codemonitors/background" edb "github.com/sourcegraph/sourcegraph/enterprise/internal/database" + insightsBackground "github.com/sourcegraph/sourcegraph/enterprise/internal/insights/background" "github.com/sourcegraph/sourcegraph/internal/actor" ossAuthz "github.com/sourcegraph/sourcegraph/internal/authz" "github.com/sourcegraph/sourcegraph/internal/conf" @@ -48,6 +49,7 @@ func enterpriseInit( ctx := actor.WithInternalActor(context.Background()) codemonitorsBackground.StartBackgroundJobs(ctx, db) + insightsBackground.StartBackgroundJobs(ctx, db) campaigns.InitBackgroundJobs(ctx, db, cf, server) diff --git a/enterprise/internal/insights/background/background.go b/enterprise/internal/insights/background/background.go new file mode 100644 index 000000000000..02d92c95ae60 --- /dev/null +++ b/enterprise/internal/insights/background/background.go @@ -0,0 +1,103 @@ +package background + +import ( + "context" + "database/sql" + "log" + + "github.com/inconshreveable/log15" + "github.com/opentracing/opentracing-go" + "github.com/prometheus/client_golang/prometheus" + + "github.com/sourcegraph/sourcegraph/enterprise/internal/insights" + "github.com/sourcegraph/sourcegraph/enterprise/internal/insights/background/queryrunner" + "github.com/sourcegraph/sourcegraph/enterprise/internal/insights/store" + "github.com/sourcegraph/sourcegraph/internal/database/basestore" + "github.com/sourcegraph/sourcegraph/internal/goroutine" + "github.com/sourcegraph/sourcegraph/internal/observation" + "github.com/sourcegraph/sourcegraph/internal/trace" + "github.com/sourcegraph/sourcegraph/internal/workerutil" + "github.com/sourcegraph/sourcegraph/internal/workerutil/dbworker" +) + +// StartBackgroundJobs is the main entrypoint which starts background jobs for code insights. It is +// called from the repo-updater service, currently. +func StartBackgroundJobs(ctx context.Context, mainAppDB *sql.DB) { + if !insights.IsEnabled() { + return + } + + // Create a connection to TimescaleDB, so we can record results. + timescale, err := insights.InitializeCodeInsightsDB() + if err != nil { + // e.g. migration failed, DB unavailable, etc. code insights is non-functional so we do not + // want to continue. + // + // In some situations (i.e. if the frontend is running migrations), this will be expected + // and we should restart until the frontend finishes - the exact same way repo updater would + // behave if the frontend had not yet migrated the main app DB. + log.Fatal("failed to initialize code insights (set DISABLE_CODE_INSIGHTS=true if needed)", err) + } + store := store.New(timescale) + + // Create a base store to be used for storing worker state. We store this in the main app Postgres + // DB, not the TimescaleDB (which we use only for storing insights data.) + workerBaseStore := basestore.NewWithDB(mainAppDB, sql.TxOptions{}) + + // Create basic metrics for recording information about background jobs. + observationContext := &observation.Context{ + Logger: log15.Root(), + Tracer: &trace.Tracer{Tracer: opentracing.GlobalTracer()}, + Registerer: prometheus.DefaultRegisterer, + } + queryRunnerWorkerMetrics, queryRunnerResetterMetrics := newWorkerMetrics(observationContext, "query_runner_worker") + + // Start background goroutines for all of our workers. + go goroutine.MonitorBackgroundRoutines(ctx, []goroutine.BackgroundRoutine{ + // Register the background goroutine which discovers and enqueues insights work. + newInsightEnqueuer(ctx, workerBaseStore, observationContext), + + // Register the query-runner worker and resetter, which executes search queries and records + // results to TimescaleDB. + queryrunner.NewWorker(ctx, workerBaseStore, store, queryRunnerWorkerMetrics), + queryrunner.NewResetter(ctx, workerBaseStore, queryRunnerResetterMetrics), + + // TODO(slimsag): future: register another worker here for webhook querying. + }...) +} + +// newWorkerMetrics returns a basic set of metrics to be used for a worker and its resetter: +// +// * WorkerMetrics records worker operations & number of jobs. +// * ResetterMetrics records the number of jobs that got reset because workers timed out / took too +// long. +// +// Individual insights workers may then _also_ want to register their own metrics, if desired, in +// their NewWorker functions. +func newWorkerMetrics(observationContext *observation.Context, workerName string) (workerutil.WorkerMetrics, dbworker.ResetterMetrics) { + workerResets := prometheus.NewCounter(prometheus.CounterOpts{ + Name: "src_insights_" + workerName + "_resets_total", + Help: "The number of times work took too long and was reset for retry later.", + }) + observationContext.Registerer.MustRegister(workerResets) + + workerResetFailures := prometheus.NewCounter(prometheus.CounterOpts{ + Name: "src_insights_" + workerName + "_reset_failures_total", + Help: "The number of times work took too long so many times that retries will no longer happen.", + }) + observationContext.Registerer.MustRegister(workerResetFailures) + + workerErrors := prometheus.NewCounter(prometheus.CounterOpts{ + Name: "src_insights_" + workerName + "_errors_total", + Help: "The number of errors that occurred during a worker job.", + }) + observationContext.Registerer.MustRegister(workerErrors) + + workerMetrics := workerutil.NewMetrics(observationContext, "insights_"+workerName, nil) + resetterMetrics := dbworker.ResetterMetrics{ + RecordResets: workerResets, + RecordResetFailures: workerResetFailures, + Errors: workerErrors, + } + return workerMetrics, resetterMetrics +} diff --git a/enterprise/internal/insights/background/insight_enqueuer.go b/enterprise/internal/insights/background/insight_enqueuer.go new file mode 100644 index 000000000000..17ce69869afe --- /dev/null +++ b/enterprise/internal/insights/background/insight_enqueuer.go @@ -0,0 +1,46 @@ +package background + +import ( + "context" + "fmt" + "time" + + "github.com/sourcegraph/sourcegraph/internal/database/basestore" + "github.com/sourcegraph/sourcegraph/internal/goroutine" + "github.com/sourcegraph/sourcegraph/internal/metrics" + "github.com/sourcegraph/sourcegraph/internal/observation" +) + +// newInsightEnqueuer returns a background goroutine which will periodically find all of the search +// and webhook insights across all user settings, and enqueue work for the query runner and webhook +// runner workers to perform. +func newInsightEnqueuer(ctx context.Context, workerBaseStore *basestore.Store, observationContext *observation.Context) goroutine.BackgroundRoutine { + metrics := metrics.NewOperationMetrics( + observationContext.Registerer, + "insights_enqueuer", + metrics.WithCountHelp("Total number of insights enqueuer executions"), + ) + operation := observationContext.Operation(observation.Op{ + Name: fmt.Sprintf("Enqueuer.Run"), + Metrics: metrics, + }) + + // Note: We run this goroutine once every 10 minutes, and StalledMaxAge in queryrunner/ is + // set to 60s. If you change this, make sure the StalledMaxAge is less than this period + // otherwise there is a fair chance we could enqueue work faster than it can be completed. + // + // See also https://github.com/sourcegraph/sourcegraph/pull/17227#issuecomment-779515187 for some very rough + // data retention / scale concerns. + return goroutine.NewPeriodicGoroutineWithMetrics(ctx, 10*time.Minute, goroutine.NewHandlerWithErrorMessage( + "insights_enqueuer", + func(ctx context.Context) error { + // TODO(slimsag): future: discover insights from settings store and enqueue them here. + // _, err := queryrunner.EnqueueJob(ctx, workerBaseStore, &queryrunner.Job{ + // SeriesID: "abcdefg", // TODO(slimsag) + // SearchQuery: "errorf", // TODO(slimsag) + // State: "queued", + // }) + return nil + }, + ), operation) +} diff --git a/enterprise/internal/insights/background/queryrunner/graphql.go b/enterprise/internal/insights/background/queryrunner/graphql.go new file mode 100644 index 000000000000..a8fe2e11ee96 --- /dev/null +++ b/enterprise/internal/insights/background/queryrunner/graphql.go @@ -0,0 +1,109 @@ +package queryrunner + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "net/url" + + "github.com/sourcegraph/sourcegraph/internal/api" + + "golang.org/x/net/context/ctxhttp" + + "github.com/pkg/errors" +) + +// This file contains all the methods required to execute Sourcegraph searches using our GraphQL +// API and get results back. + +// graphQLQuery describes a general GraphQL query and its variables. +type graphQLQuery struct { + Query string `json:"query"` + Variables interface{} `json:"variables"` +} + +const gqlSearchQuery = `query Search( + $query: String!, +) { + search(query: $query, ) { + results { + limitHit + cloning { name } + missing { name } + timedout { name } + matchCount + alert { + title + description + } + } + } +}` + +type gqlSearchVars struct { + Query string `json:"query"` +} + +type gqlSearchResponse struct { + Data struct { + Search struct { + Results struct { + LimitHit bool + Cloning []*api.Repo + Missing []*api.Repo + Timedout []*api.Repo + MatchCount int + Alert *struct { + Title string + Description string + } + } + } + } + Errors []interface{} +} + +// search executes the given search query. +func search(ctx context.Context, query string) (*gqlSearchResponse, error) { + var buf bytes.Buffer + err := json.NewEncoder(&buf).Encode(graphQLQuery{ + Query: gqlSearchQuery, + Variables: gqlSearchVars{Query: query}, + }) + if err != nil { + return nil, errors.Wrap(err, "Encode") + } + + url, err := gqlURL("InsightsSearch") + if err != nil { + return nil, errors.Wrap(err, "constructing frontend URL") + } + + resp, err := ctxhttp.Post(ctx, nil, url, "application/json", &buf) + if err != nil { + return nil, errors.Wrap(err, "Post") + } + defer resp.Body.Close() + + var res *gqlSearchResponse + if err := json.NewDecoder(resp.Body).Decode(&res); err != nil { + return nil, errors.Wrap(err, "Decode") + } + if len(res.Errors) > 0 { + return res, fmt.Errorf("graphql: errors: %v", res.Errors) + } + return res, nil +} + +// gqlURL returns the frontend's internal GraphQL API URL, with the given ?queryName parameter +// which is used to keep track of the source and type of GraphQL queries. +func gqlURL(queryName string) (string, error) { + u, err := url.Parse(api.InternalClient.URL) + if err != nil { + return "", err + } + u.Path = "/.internal/graphql" + u.RawQuery = queryName + return u.String(), nil +} diff --git a/enterprise/internal/insights/background/queryrunner/work_handler.go b/enterprise/internal/insights/background/queryrunner/work_handler.go new file mode 100644 index 000000000000..163905caf4a6 --- /dev/null +++ b/enterprise/internal/insights/background/queryrunner/work_handler.go @@ -0,0 +1,93 @@ +package queryrunner + +import ( + "context" + "fmt" + "time" + + "github.com/inconshreveable/log15" + + "github.com/sourcegraph/sourcegraph/enterprise/internal/insights/store" + "github.com/sourcegraph/sourcegraph/internal/database/basestore" + "github.com/sourcegraph/sourcegraph/internal/workerutil" + "github.com/sourcegraph/sourcegraph/internal/workerutil/dbworker" + dbworkerstore "github.com/sourcegraph/sourcegraph/internal/workerutil/dbworker/store" +) + +var _ dbworker.Handler = &workHandler{} + +// workHandler implements the dbworker.Handler interface by executing search queries and +// inserting insights about them to the insights Timescale database. +type workHandler struct { + workerBaseStore *basestore.Store + insightsStore *store.Store +} + +func (r *workHandler) Handle(ctx context.Context, workerStore dbworkerstore.Store, record workerutil.Record) (err error) { + defer func() { + if err != nil { + log15.Error("insights.queryrunner.workHandler", "error", err) + } + }() + + // Dequeue the job to get information about it, like what search query to perform. + job, err := dequeueJob(ctx, r.workerBaseStore, record.RecordID()) + if err != nil { + return err + } + + // Actually perform the search query. + // + // 🚨 SECURITY: The request is performed without authentication, we get back results from every + // repository on Sourcegraph - so we must be careful to only record insightful information that + // is OK to expose to every user on Sourcegraph (e.g. total result counts are fine, exposing + // that a repository exists may or may not be fine, exposing individual results is definitely + // not, etc.) + var results *gqlSearchResponse + results, err = search(ctx, job.SearchQuery) + if err != nil { + return err + } + + // TODO(slimsag): future: Logs are not a good way to surface these errors to users. + if len(results.Errors) > 0 { + return fmt.Errorf("GraphQL errors: %v", results.Errors) + } + if alert := results.Data.Search.Results.Alert; alert != nil { + return fmt.Errorf("insights query issue: alert: %v query=%q", alert, job.SearchQuery) + } + if results.Data.Search.Results.LimitHit { + log15.Error("insights query issue", "problem", "limit hit", "query", job.SearchQuery) + } + if cloning := len(results.Data.Search.Results.Cloning); cloning > 0 { + log15.Error("insights query issue", "cloning_repos", cloning, "query", job.SearchQuery) + } + if missing := len(results.Data.Search.Results.Missing); missing > 0 { + log15.Error("insights query issue", "missing_repos", missing, "query", job.SearchQuery) + } + if timedout := len(results.Data.Search.Results.Timedout); timedout > 0 { + log15.Error("insights query issue", "timedout_repos", timedout, "query", job.SearchQuery) + } + + // Record the match count to the insights DB. + var matchCount int + if results != nil { + matchCount = results.Data.Search.Results.MatchCount + } + + log15.Info("insights query runner", "found matches", matchCount, "query", job.SearchQuery) + + // 🚨 SECURITY: The request is performed without authentication, we get back results from every + // repository on Sourcegraph - so we must be careful to only record insightful information that + // is OK to expose to every user on Sourcegraph (e.g. total result counts are fine, exposing + // that a repository exists may or may not be fine, exposing individual results is definitely + // not, etc.) + return r.insightsStore.RecordSeriesPoint(ctx, store.RecordSeriesPointArgs{ + SeriesID: job.SeriesID, + Point: store.SeriesPoint{ + Time: time.Now(), + Value: float64(matchCount), + }, + // TODO(slimsag): future: determine match count per repository, and store RepoID/RepoName (and maybe Metadata?) + }) +} diff --git a/enterprise/internal/insights/background/queryrunner/worker.go b/enterprise/internal/insights/background/queryrunner/worker.go new file mode 100644 index 000000000000..620a2f7bfb3a --- /dev/null +++ b/enterprise/internal/insights/background/queryrunner/worker.go @@ -0,0 +1,214 @@ +package queryrunner + +import ( + "context" + "database/sql" + "fmt" + "time" + + "github.com/keegancsmith/sqlf" + "github.com/lib/pq" + + "github.com/sourcegraph/sourcegraph/enterprise/internal/insights/store" + "github.com/sourcegraph/sourcegraph/internal/database/basestore" + "github.com/sourcegraph/sourcegraph/internal/workerutil" + "github.com/sourcegraph/sourcegraph/internal/workerutil/dbworker" + dbworkerstore "github.com/sourcegraph/sourcegraph/internal/workerutil/dbworker/store" +) + +// This file contains all the methods required to: +// +// 1. Create the query runner worker +// 2. Enqueue jobs for the query runner to execute. +// 3. Dequeue jobs from the query runner. +// 4. Serialize jobs for the query runner into the DB. +// + +// NewWorker returns a worker that will execute search queries and insert information about the +// results into the code insights database. +func NewWorker(ctx context.Context, workerBaseStore *basestore.Store, insightsStore *store.Store, metrics workerutil.WorkerMetrics) *workerutil.Worker { + workerStore := createDBWorkerStore(workerBaseStore) + options := workerutil.WorkerOptions{ + Name: "insights_query_runner_worker", + NumHandlers: 1, + Interval: 5 * time.Second, + Metrics: metrics, + } + return dbworker.NewWorker(ctx, workerStore, &workHandler{ + workerBaseStore: workerBaseStore, + insightsStore: insightsStore, + }, options) +} + +// NewResetter returns a resetter that will reset pending query runner jobs if they take too long +// to complete. +func NewResetter(ctx context.Context, workerBaseStore *basestore.Store, metrics dbworker.ResetterMetrics) *dbworker.Resetter { + workerStore := createDBWorkerStore(workerBaseStore) + options := dbworker.ResetterOptions{ + Name: "insights_query_runner_worker_resetter", + Interval: 1 * time.Minute, + Metrics: metrics, + } + return dbworker.NewResetter(workerStore, options) +} + +// createDBWorkerStore creates the dbworker store for the query runner worker. +// +// See internal/workerutil/dbworker for more information about dbworkers. +func createDBWorkerStore(s *basestore.Store) dbworkerstore.Store { + return dbworkerstore.New(s.Handle(), dbworkerstore.Options{ + Name: "insights_query_runner_jobs_store", + TableName: "insights_query_runner_jobs", + ColumnExpressions: jobsColumns, + Scan: scanJobs, + + // We will let a search query or webhook run for up to 60s. After that, it times out and + // retries in 10s. If 3 timeouts occur, it is not retried. + // + // If you change this, be sure to adjust the interval that work is enqueued in + // enterprise/internal/insights/background:newInsightEnqueuer. + StalledMaxAge: 60 * time.Second, + RetryAfter: 10 * time.Second, + MaxNumRetries: 3, + OrderByExpression: sqlf.Sprintf("id"), + }) +} + +// EnqueueJob enqueues a job for the query runner worker to execute later. +func EnqueueJob(ctx context.Context, workerBaseStore *basestore.Store, job *Job) (id int, err error) { + id, _, err = basestore.ScanFirstInt(workerBaseStore.Query( + ctx, + sqlf.Sprintf( + enqueueJobFmtStr, + job.SeriesID, + job.SearchQuery, + job.State, + job.ProcessAfter, + ), + )) + return +} + +const enqueueJobFmtStr = ` +-- source: enterprise/internal/insights/background/queryrunner/worker.go:EnqueueJob +INSERT INTO insights_query_runner_jobs ( + series_id, + search_query, + state, + process_after +) VALUES (%s, %s, %s, %s) +RETURNING id +` + +func dequeueJob(ctx context.Context, workerBaseStore *basestore.Store, recordID int) (*Job, error) { + rows, err := workerBaseStore.Query(ctx, sqlf.Sprintf(dequeueJobFmtStr, recordID)) + if err != nil { + return nil, err + } + jobs, err := doScanJobs(rows, nil) + if err != nil { + return nil, err + } + if len(jobs) != 1 { + return nil, fmt.Errorf("expected 1 job to dequeue, found %v", len(jobs)) + } + return jobs[0], nil +} + +const dequeueJobFmtStr = ` +-- source: enterprise/internal/insights/background/queryrunner/worker.go:dequeueJob +SELECT + series_id, + search_query, + id, + state, + failure_message, + started_at, + finished_at, + process_after, + num_resets, + num_failures, + execution_logs +FROM insights_query_runner_jobs +WHERE id = %s; +` + +// Job represents a single job for the query runner worker to perform. When enqueued, it is stored +// in the insights_query_runner_jobs table - then the worker dequeues it by reading it from that +// table. +// +// See internal/workerutil/dbworker for more information about dbworkers. +type Job struct { + // Query runner fields. + SeriesID string + SearchQuery string + + // Standard/required dbworker fields. If enqueuing a job, these may all be zero values except State. + // + // See https://sourcegraph.com/github.com/sourcegraph/sourcegraph@cd0b3904c674ee3568eb2ef5d7953395b6432d20/-/blob/internal/workerutil/dbworker/store/store.go#L114-134 + ID int + State string // If enqueing a job, set to "queued" + FailureMessage *string + StartedAt *time.Time + FinishedAt *time.Time + ProcessAfter *time.Time + NumResets int32 + NumFailures int32 + ExecutionLogs []workerutil.ExecutionLogEntry +} + +// Implements the internal/workerutil.Record interface, used by the work handler to locate the job +// once executing (see work_handler.go:Handle). +func (j *Job) RecordID() int { + return j.ID +} + +func scanJobs(rows *sql.Rows, err error) (workerutil.Record, bool, error) { + records, err := doScanJobs(rows, err) + if err != nil { + return &Job{}, false, err + } + return records[0], true, nil +} + +func doScanJobs(rows *sql.Rows, err error) ([]*Job, error) { + if err != nil { + return nil, err + } + defer func() { err = basestore.CloseRows(rows, err) }() + var jobs []*Job + for rows.Next() { + j := &Job{} + if err := rows.Scan( + // Query runner fields. + &j.SeriesID, + &j.SearchQuery, + + // Standard/required dbworker fields. + &j.ID, + &j.State, + &j.FailureMessage, + &j.StartedAt, + &j.FinishedAt, + &j.ProcessAfter, + &j.NumResets, + &j.NumFailures, + pq.Array(&j.ExecutionLogs), + ); err != nil { + return nil, err + } + jobs = append(jobs, j) + } + if err != nil { + return nil, err + } + // Rows.Err will report the last error encountered by Rows.Scan. + if err := rows.Err(); err != nil { + return nil, err + } + return jobs, nil +} + +var jobsColumns = append([]*sqlf.Query{ + sqlf.Sprintf("insights_query_runner_jobs.search_query"), +}, dbworkerstore.DefaultColumnExpressions()...) diff --git a/enterprise/internal/insights/background/queryrunner/worker_test.go b/enterprise/internal/insights/background/queryrunner/worker_test.go new file mode 100644 index 000000000000..60c058544781 --- /dev/null +++ b/enterprise/internal/insights/background/queryrunner/worker_test.go @@ -0,0 +1,68 @@ +package queryrunner + +import ( + "context" + "database/sql" + "fmt" + "testing" + + "github.com/hexops/autogold" + + "github.com/sourcegraph/sourcegraph/cmd/frontend/backend" + "github.com/sourcegraph/sourcegraph/internal/database/basestore" + "github.com/sourcegraph/sourcegraph/internal/database/dbtesting" +) + +func init() { + dbtesting.DBNameSuffix = "codeinsightsbackendqueryrunner" +} + +// TestJobQueue tests that EnqueueJob and dequeueJob work mutually to transfer jobs to/from the +// database. +func TestJobQueue(t *testing.T) { + if testing.Short() { + t.Skip() + } + //t.Parallel() // TODO: dbtesting.GetDB is not parallel-safe, yuck. + + ctx := backend.WithAuthzBypass(context.Background()) + + mainAppDB := dbtesting.GetDB(t) + workerBaseStore := basestore.NewWithDB(mainAppDB, sql.TxOptions{}) + + // Check we get no dequeued job first. + recordID := 0 + job, err := dequeueJob(ctx, workerBaseStore, recordID) + autogold.Want("0", (*Job)(nil)).Equal(t, job) + autogold.Want("1", "expected 1 job to dequeue, found 0").Equal(t, fmt.Sprint(err)) + + // Now enqueue two jobs. + firstJobID, err := EnqueueJob(ctx, workerBaseStore, &Job{ + SeriesID: "job 1", + SearchQuery: "our search 1", + }) + if err != nil { + t.Fatal(err) + } + secondJobID, err := EnqueueJob(ctx, workerBaseStore, &Job{ + SeriesID: "job 2", + SearchQuery: "our search 2", + }) + if err != nil { + t.Fatal(err) + } + + // Check the information we care about got transferred properly. + firstJob, err := dequeueJob(ctx, workerBaseStore, firstJobID) + autogold.Want("2", &Job{ + SeriesID: "job 1", SearchQuery: "our search 1", + ID: 1, + }).Equal(t, firstJob) + autogold.Want("3", "").Equal(t, fmt.Sprint(err)) + secondJob, err := dequeueJob(ctx, workerBaseStore, secondJobID) + autogold.Want("4", &Job{ + SeriesID: "job 2", SearchQuery: "our search 2", + ID: 2, + }).Equal(t, secondJob) + autogold.Want("5", "").Equal(t, fmt.Sprint(err)) +} diff --git a/enterprise/internal/insights/insights.go b/enterprise/internal/insights/insights.go index 5f3cdfa09138..ecc03639bdb3 100644 --- a/enterprise/internal/insights/insights.go +++ b/enterprise/internal/insights/insights.go @@ -15,23 +15,31 @@ import ( "github.com/sourcegraph/sourcegraph/internal/database/dbutil" ) -// Init initializes the given enterpriseServices to include the required resolvers for insights. -func Init(ctx context.Context, postgres dbutil.DB, enterpriseServices *enterprise.Services) error { +// IsEnabled tells if code insights are enabled or not. +func IsEnabled() bool { if !conf.IsDev(conf.DeployType()) { // Code Insights is not yet deployed to non-dev/testing instances. We don't yet have // TimescaleDB in those deployments. https://github.com/sourcegraph/sourcegraph/issues/17218 - return nil + return false } if conf.IsDeployTypeSingleDockerContainer(conf.DeployType()) { // Code insights is not supported in single-container Docker demo deployments. - return nil + return false } if v, _ := strconv.ParseBool(os.Getenv("DISABLE_CODE_INSIGHTS")); v { // Dev option for disabling code insights. Helpful if e.g. you have issues running the // codeinsights-db or don't want to spend resources on it. + return false + } + return true +} + +// Init initializes the given enterpriseServices to include the required resolvers for insights. +func Init(ctx context.Context, postgres dbutil.DB, enterpriseServices *enterprise.Services) error { + if !IsEnabled() { return nil } - timescale, err := initializeCodeInsightsDB() + timescale, err := InitializeCodeInsightsDB() if err != nil { return err } @@ -39,9 +47,11 @@ func Init(ctx context.Context, postgres dbutil.DB, enterpriseServices *enterpris return nil } -// initializeCodeInsightsDB connects to and initializes the Code Insights Timescale DB, running -// database migrations before returning. -func initializeCodeInsightsDB() (*sql.DB, error) { +// InitializeCodeInsightsDB connects to and initializes the Code Insights Timescale DB, running +// database migrations before returning. It is safe to call from multiple services/containers (in +// which case, one's migration will win and the other caller will receive an error and should exit +// and restart until the other finishes.) +func InitializeCodeInsightsDB() (*sql.DB, error) { timescaleDSN := conf.Get().ServiceConnections.CodeInsightsTimescaleDSN conf.Watch(func() { if newDSN := conf.Get().ServiceConnections.CodeInsightsTimescaleDSN; timescaleDSN != newDSN {