diff --git a/cmd/syntactic-code-intel-worker/shared/indexing_worker.go b/cmd/syntactic-code-intel-worker/shared/indexing_worker.go index 95e41f7db4f3..f4ce49eaec2e 100644 --- a/cmd/syntactic-code-intel-worker/shared/indexing_worker.go +++ b/cmd/syntactic-code-intel-worker/shared/indexing_worker.go @@ -38,6 +38,6 @@ func (i indexingHandler) Handle(ctx context.Context, logger log.Logger, record * logger.Info("Stub indexing worker handling record", log.Int("id", record.ID), log.String("repository name", record.RepositoryName), - log.String("commit", record.Commit)) + log.String("commit", string(record.Commit))) return nil } diff --git a/cmd/worker/shared/BUILD.bazel b/cmd/worker/shared/BUILD.bazel index d1708d9a9331..0692a468fcb5 100644 --- a/cmd/worker/shared/BUILD.bazel +++ b/cmd/worker/shared/BUILD.bazel @@ -41,6 +41,7 @@ go_library( "//internal/authz", "//internal/authz/providers", "//internal/authz/subrepoperms", + "//internal/codeintel/syntactic_indexing", "//internal/conf", "//internal/database", "//internal/debugserver", diff --git a/cmd/worker/shared/init/db/db.go b/cmd/worker/shared/init/db/db.go index fb1484ab59c9..5081ea0c6137 100644 --- a/cmd/worker/shared/init/db/db.go +++ b/cmd/worker/shared/init/db/db.go @@ -13,7 +13,7 @@ import ( ) func InitDB(observationCtx *observation.Context) (database.DB, error) { - rawDB, err := initDatabaseMemo.Init(observationCtx) + rawDB, err := InitRawDB(observationCtx) if err != nil { return nil, err } @@ -21,6 +21,10 @@ func InitDB(observationCtx *observation.Context) (database.DB, error) { return database.NewDB(observationCtx.Logger, rawDB), nil } +func InitRawDB(observationCtx *observation.Context) (*sql.DB, error) { + return initDatabaseMemo.Init(observationCtx) +} + var initDatabaseMemo = memo.NewMemoizedConstructorWithArg(func(observationCtx *observation.Context) (*sql.DB, error) { dsn := conf.GetServiceConnectionValueAndRestartOnChange(func(serviceConnections conftypes.ServiceConnections) string { return serviceConnections.PostgresDSN diff --git a/cmd/worker/shared/main.go b/cmd/worker/shared/main.go index 8bc48d42cf91..8d2d8c8cfa50 100644 --- a/cmd/worker/shared/main.go +++ b/cmd/worker/shared/main.go @@ -41,6 +41,7 @@ import ( "github.com/sourcegraph/sourcegraph/internal/authz" "github.com/sourcegraph/sourcegraph/internal/authz/providers" srp "github.com/sourcegraph/sourcegraph/internal/authz/subrepoperms" + "github.com/sourcegraph/sourcegraph/internal/codeintel/syntactic_indexing" "github.com/sourcegraph/sourcegraph/internal/conf" "github.com/sourcegraph/sourcegraph/internal/database" "github.com/sourcegraph/sourcegraph/internal/encryption/keyring" @@ -115,6 +116,8 @@ func LoadConfig(registerEnterpriseMigrators oobmigration.RegisterMigratorsFunc) "codeintel-uploadstore-expirer": codeintel.NewPreciseCodeIntelUploadExpirer(), "codeintel-package-filter-applicator": codeintel.NewPackagesFilterApplicatorJob(), + "codeintel-syntactic-indexing-scheduler": syntactic_indexing.NewSyntacticindexingSchedulerJob(), + "auth-sourcegraph-operator-cleaner": auth.NewSourcegraphOperatorCleaner(), "repo-embedding-janitor": repoembeddings.NewRepoEmbeddingJanitorJob(), diff --git a/internal/codeintel/policies/internal/store/configurations.go b/internal/codeintel/policies/internal/store/configurations.go index 2ea5256d68e8..7f336b68add6 100644 --- a/internal/codeintel/policies/internal/store/configurations.go +++ b/internal/codeintel/policies/internal/store/configurations.go @@ -107,35 +107,42 @@ func (s *store) GetConfigurationPolicies(ctx context.Context, opts shared.GetCon conds = append(conds, sqlf.Sprintf("TRUE")) } - var a []shared.ConfigurationPolicy - var b int + var policiesBatch []shared.ConfigurationPolicy + var totalCountResult int + err = s.db.WithTransact(ctx, func(tx *basestore.Store) error { // TODO - standardize counting techniques - totalCount, _, err = basestore.ScanFirstInt(tx.Query(ctx, sqlf.Sprintf( + totalCountQuery := sqlf.Sprintf( getConfigurationPoliciesCountQuery, sqlf.Join(conds, "AND"), - ))) + ) + + totalCount, _, err = basestore.ScanFirstInt(tx.Query(ctx, totalCountQuery)) if err != nil { return err } trace.AddEvent("TODO Domain Owner", attribute.Int("totalCount", totalCount)) - configurationPolicies, err := scanConfigurationPolicies(tx.Query(ctx, sqlf.Sprintf( + finalQuery := sqlf.Sprintf( getConfigurationPoliciesLimitedQuery, sqlf.Join(conds, "AND"), opts.Limit, opts.Offset, - ))) + ) + + configurationPolicies, err := scanConfigurationPolicies(tx.Query(ctx, finalQuery)) if err != nil { return err } + trace.AddEvent("TODO Domain Owner", attribute.Int("numConfigurationPolicies", len(configurationPolicies))) - a = configurationPolicies - b = totalCount + policiesBatch = configurationPolicies + totalCountResult = totalCount return nil }) - return a, b, err + + return policiesBatch, totalCountResult, err } const getConfigurationPoliciesCountQuery = ` diff --git a/internal/codeintel/policies/service.go b/internal/codeintel/policies/service.go index 21c742a3384a..6fd32b2959e0 100644 --- a/internal/codeintel/policies/service.go +++ b/internal/codeintel/policies/service.go @@ -59,14 +59,14 @@ func (s *Service) CreateConfigurationPolicy(ctx context.Context, configurationPo return policy, err } - if err := s.updateReposMatchingPolicyPatterns(ctx, policy); err != nil { + if err := s.UpdateReposMatchingPolicyPatterns(ctx, policy); err != nil { return policy, err } return policy, nil } -func (s *Service) updateReposMatchingPolicyPatterns(ctx context.Context, policy policiesshared.ConfigurationPolicy) error { +func (s *Service) UpdateReposMatchingPolicyPatterns(ctx context.Context, policy policiesshared.ConfigurationPolicy) error { var patterns []string if policy.RepositoryPatterns != nil { patterns = *policy.RepositoryPatterns @@ -96,7 +96,7 @@ func (s *Service) UpdateConfigurationPolicy(ctx context.Context, policy policies return err } - return s.updateReposMatchingPolicyPatterns(ctx, policy) + return s.UpdateReposMatchingPolicyPatterns(ctx, policy) } func (s *Service) DeleteConfigurationPolicyByID(ctx context.Context, id int) error { diff --git a/internal/codeintel/syntactic_indexing/BUILD.bazel b/internal/codeintel/syntactic_indexing/BUILD.bazel index e69de29bb2d1..3540ea6bdd42 100644 --- a/internal/codeintel/syntactic_indexing/BUILD.bazel +++ b/internal/codeintel/syntactic_indexing/BUILD.bazel @@ -0,0 +1,70 @@ +load("//dev:go_defs.bzl", "go_test") +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "syntactic_indexing", + srcs = [ + "enqueuer.go", + "scheduler.go", + "scheduler_config.go", + "scheduler_job.go", + ], + importpath = "github.com/sourcegraph/sourcegraph/internal/codeintel/syntactic_indexing", + visibility = ["//:__subpackages__"], + deps = [ + "//cmd/worker/job", + "//cmd/worker/shared/init/codeintel", + "//cmd/worker/shared/init/db", + "//internal/actor", + "//internal/api", + "//internal/codeintel/autoindexing", + "//internal/codeintel/policies", + "//internal/codeintel/policies/shared", + "//internal/codeintel/reposcheduler", + "//internal/codeintel/shared", + "//internal/codeintel/syntactic_indexing/internal", + "//internal/codeintel/syntactic_indexing/jobstore", + "//internal/codeintel/uploads", + "//internal/collections", + "//internal/conf", + "//internal/database", + "//internal/env", + "//internal/gitserver", + "//internal/goroutine", + "//internal/memo", + "//internal/metrics", + "//internal/observation", + "//lib/errors", + "@com_github_prometheus_client_golang//prometheus", + "@io_opentelemetry_go_otel//attribute", + ], +) + +go_test( + name = "syntactic_indexing_test", + srcs = [ + "enqueuer_test.go", + "scheduler_test.go", + ], + embed = [":syntactic_indexing"], + tags = [ + TAG_PLATFORM_GRAPH, + "requires-network", + ], + deps = [ + "//internal/api", + "//internal/codeintel/policies", + "//internal/codeintel/reposcheduler", + "//internal/codeintel/shared", + "//internal/codeintel/syntactic_indexing/internal/testutils", + "//internal/codeintel/syntactic_indexing/jobstore", + "//internal/codeintel/uploads", + "//internal/database", + "//internal/database/dbtest", + "//internal/gitserver", + "//internal/gitserver/gitdomain", + "//internal/observation", + "@com_github_keegancsmith_sqlf//:sqlf", + "@com_github_stretchr_testify//require", + ], +) diff --git a/internal/codeintel/syntactic_indexing/enqueuer.go b/internal/codeintel/syntactic_indexing/enqueuer.go new file mode 100644 index 000000000000..157dd3ef4fe4 --- /dev/null +++ b/internal/codeintel/syntactic_indexing/enqueuer.go @@ -0,0 +1,126 @@ +package syntactic_indexing + +import ( + "context" + "fmt" + + "github.com/prometheus/client_golang/prometheus" + "github.com/sourcegraph/sourcegraph/internal/api" + "github.com/sourcegraph/sourcegraph/internal/codeintel/reposcheduler" + "github.com/sourcegraph/sourcegraph/internal/codeintel/syntactic_indexing/jobstore" + "github.com/sourcegraph/sourcegraph/internal/database" + "github.com/sourcegraph/sourcegraph/internal/memo" + "github.com/sourcegraph/sourcegraph/internal/metrics" + "github.com/sourcegraph/sourcegraph/internal/observation" + "github.com/sourcegraph/sourcegraph/lib/errors" + "go.opentelemetry.io/otel/attribute" +) + +type IndexEnqueuer interface { + QueueIndexingJobs(ctx context.Context, repositoryId api.RepoID, commitId api.CommitID, options EnqueueOptions) (_ []jobstore.SyntacticIndexingJob, err error) +} + +type EnqueueOptions struct { + // setting force=true will schedule the job for a + // given pair of (repo, commit) even if it already exists in the queue + force bool +} + +type indexEnqueuerImpl struct { + jobStore jobstore.SyntacticIndexingJobStore + repoSchedulingStore reposcheduler.RepositorySchedulingStore + repoStore database.RepoStore + operations *operations +} + +var _ IndexEnqueuer = &indexEnqueuerImpl{} + +func NewIndexEnqueuer( + observationCtx *observation.Context, + jobStore jobstore.SyntacticIndexingJobStore, + store reposcheduler.RepositorySchedulingStore, + repoStore database.RepoStore, +) IndexEnqueuer { + return &indexEnqueuerImpl{ + repoSchedulingStore: store, + repoStore: repoStore, + jobStore: jobStore, + operations: newOperations(observationCtx), + } +} + +type operations struct { + queueIndexingJobs *observation.Operation + indexingJobsSkippedCounter prometheus.Counter +} + +var ( + indexingJobsSkippedCounterMemo = memo.NewMemoizedConstructorWithArg(func(r prometheus.Registerer) (prometheus.Counter, error) { + indexesJobsSkippedCounter := prometheus.NewCounter(prometheus.CounterOpts{ + Name: "src_codeintel_dbstore_syntactic_indexing_jobs_skipped", + Help: "The number of codeintel syntactic indexing jobs skipped because they were already queued up", + }) + r.MustRegister(indexesJobsSkippedCounter) + return indexesJobsSkippedCounter, nil + }) + + m = new(metrics.SingletonREDMetrics) +) + +func newOperations(observationCtx *observation.Context) *operations { + m := m.Get(func() *metrics.REDMetrics { + return metrics.NewREDMetrics( + observationCtx.Registerer, + "codeintel_syntactic_indexing_enqueuer", + metrics.WithLabels("op"), + metrics.WithCountHelp("Total number of method invocations."), + ) + }) + + op := func(name string) *observation.Operation { + return observationCtx.Operation(observation.Op{ + Name: fmt.Sprintf("codeintel.syntactic_indexing.enqueuer.%s", name), + MetricLabelValues: []string{name}, + Metrics: m, + }) + } + + indexingJobsSkippedCounter, _ := indexingJobsSkippedCounterMemo.Init(observationCtx.Registerer) + + return &operations{ + queueIndexingJobs: op("QueueIndexingJobs"), + indexingJobsSkippedCounter: indexingJobsSkippedCounter, + } +} + +// QueueIndexingJobs schedules a syntactic indexing job for a given repositoryID at given revision. +// If options.force = true, then the job will be scheduled even if the same one already exists in the queue. +// This method will return an array of jobs that were actually successfully scheduled. +// The result can be nil iff the same job is already queued AND options.force is false. +func (s *indexEnqueuerImpl) QueueIndexingJobs(ctx context.Context, repositoryID api.RepoID, commitID api.CommitID, options EnqueueOptions) (_ []jobstore.SyntacticIndexingJob, err error) { + ctx, _, endObservation := s.operations.queueIndexingJobs.With(ctx, &err, observation.Args{Attrs: []attribute.KeyValue{ + attribute.Int("repositoryID", int(repositoryID)), + attribute.String("commitID", string(commitID)), + }}) + defer endObservation(1, observation.Args{}) + + shouldInsert := true + if !options.force { + isQueued, err := s.jobStore.IsQueued(ctx, repositoryID, commitID) + if err != nil { + return nil, errors.Wrap(err, "dbstore.IsQueued") + } + if isQueued { + s.operations.indexingJobsSkippedCounter.Add(float64(1)) + } + shouldInsert = !isQueued + } + if shouldInsert { + return s.jobStore.InsertIndexingJobs(ctx, []jobstore.SyntacticIndexingJob{{ + State: jobstore.Queued, + Commit: commitID, + RepositoryID: repositoryID, + }}) + } + return nil, nil +} diff --git a/internal/codeintel/syntactic_indexing/enqueuer_test.go b/internal/codeintel/syntactic_indexing/enqueuer_test.go new file mode 100644 index 000000000000..7d1f2a03be8c --- /dev/null +++ b/internal/codeintel/syntactic_indexing/enqueuer_test.go @@ -0,0 +1,74 @@ +package syntactic_indexing + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/sourcegraph/sourcegraph/internal/api" + "github.com/sourcegraph/sourcegraph/internal/codeintel/reposcheduler" + "github.com/sourcegraph/sourcegraph/internal/codeintel/syntactic_indexing/internal/testutils" + "github.com/sourcegraph/sourcegraph/internal/codeintel/syntactic_indexing/jobstore" + "github.com/sourcegraph/sourcegraph/internal/database" + "github.com/sourcegraph/sourcegraph/internal/database/dbtest" + "github.com/sourcegraph/sourcegraph/internal/observation" +) + +func TestSyntacticIndexingStoreEnqueue(t *testing.T) { + /* + The purpose of this test is to verify that methods InsertIndexes and IsQueued + correctly interact with each other, and that the records inserted using those methods + are valid from the point of view of the DB worker interface + */ + observationCtx := observation.TestContextTB(t) + sqlDB := dbtest.NewDB(t) + db := database.NewDB(observationCtx.Logger, sqlDB) + ctx := context.Background() + + jobStore, err := jobstore.NewStoreWithDB(observationCtx, sqlDB) + require.NoError(t, err, "unexpected error creating dbworker stores") + + repoSchedulingStore := reposcheduler.NewSyntacticStore(observationCtx, db) + + repoStore := db.Repos() + + enqueuer := NewIndexEnqueuer(observationCtx, jobStore, repoSchedulingStore, repoStore) + + tacosRepoId, tacosRepoName, tacosCommit := api.RepoID(1), "tangy/tacos", testutils.MakeCommit(1) + empanadasRepoId, empanadasRepoName := api.RepoID(2), "salty/empanadas" + mangosRepoId, mangosRepoName := api.RepoID(3), "juicy/mangos" + + testutils.InsertRepo(t, db, tacosRepoId, tacosRepoName) + testutils.InsertRepo(t, db, empanadasRepoId, empanadasRepoName) + testutils.InsertRepo(t, db, mangosRepoId, mangosRepoName) + + isQueued, err := jobStore.IsQueued(ctx, tacosRepoId, tacosCommit) + require.False(t, isQueued) + require.NoError(t, err) + + // Happy path + scheduled, err := enqueuer.QueueIndexingJobs(ctx, tacosRepoId, tacosCommit, EnqueueOptions{}) + + require.NoError(t, err) + require.Equal(t, 1, len(scheduled)) + require.Equal(t, scheduled[0].Commit, tacosCommit) + require.Equal(t, scheduled[0].RepositoryID, tacosRepoId) + require.Equal(t, scheduled[0].State, jobstore.Queued) + require.Equal(t, scheduled[0].RepositoryName, tacosRepoName) + + // scheduling the same (repo, revision) twice doesn't return an error, + // but also doesn't insert a new job + result := unwrap(enqueuer.QueueIndexingJobs(ctx, tacosRepoId, tacosCommit, EnqueueOptions{}))(t) + require.Empty(t, result) + + // force: true in EnqueueOptions allows scheduling the same (repo, revision) twice + reinserted := unwrap(enqueuer.QueueIndexingJobs(ctx, tacosRepoId, tacosCommit, EnqueueOptions{force: true}))(t) + require.Equal(t, 1, len(reinserted)) + require.NotEqual(t, reinserted[0].ID, scheduled[0].ID) // ensure it's actually a new job + require.Equal(t, reinserted[0].Commit, tacosCommit) + require.Equal(t, reinserted[0].RepositoryID, tacosRepoId) + require.Equal(t, reinserted[0].State, jobstore.Queued) + require.Equal(t, reinserted[0].RepositoryName, tacosRepoName) + +} diff --git a/internal/codeintel/syntactic_indexing/internal/BUILD.bazel b/internal/codeintel/syntactic_indexing/internal/BUILD.bazel new file mode 100644 index 000000000000..0e92064d8d5b --- /dev/null +++ b/internal/codeintel/syntactic_indexing/internal/BUILD.bazel @@ -0,0 +1,13 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "internal", + srcs = ["policy_iterator.go"], + importpath = "github.com/sourcegraph/sourcegraph/internal/codeintel/syntactic_indexing/internal", + visibility = ["//:__subpackages__"], + deps = [ + "//internal/codeintel/policies", + "//internal/codeintel/policies/shared", + "//lib/pointers", + ], +) diff --git a/internal/codeintel/syntactic_indexing/internal/policy_iterator.go b/internal/codeintel/syntactic_indexing/internal/policy_iterator.go new file mode 100644 index 000000000000..8bd98426e805 --- /dev/null +++ b/internal/codeintel/syntactic_indexing/internal/policy_iterator.go @@ -0,0 +1,73 @@ +package internal + +import ( + "context" + + "github.com/sourcegraph/sourcegraph/internal/codeintel/policies" + policiesshared "github.com/sourcegraph/sourcegraph/internal/codeintel/policies/shared" + "github.com/sourcegraph/sourcegraph/lib/pointers" +) + +type PolicyType string + +const ( + SyntacticIndexing PolicyType = "SYNTACTIC_INDEXING" + PreciseIndexing PolicyType = "PRECISE_INDEXING" +) + +// This iterator abstracts away the pagination logic for retrieving policies batches, +// propagating any errors +type PolicyIterator interface { + // Iterate over all matching policies in batches. The `handle` function is NEVER + // invoked with an empty policies list + ForEachPoliciesBatch(ctx context.Context, handle func([]policiesshared.ConfigurationPolicy) error) error +} + +type policyIterator struct { + Service policies.Service + RepositoryID int + PolicyType PolicyType + BatchSize int +} + +func (p policyIterator) ForEachPoliciesBatch(ctx context.Context, handle func([]policiesshared.ConfigurationPolicy) error) error { + forSyntacticIndexing := pointers.Ptr(p.PolicyType == SyntacticIndexing) + forPreciseIndexing := pointers.Ptr(p.PolicyType == PreciseIndexing) + + options := policiesshared.GetConfigurationPoliciesOptions{ + RepositoryID: p.RepositoryID, + ForSyntacticIndexing: forSyntacticIndexing, + ForPreciseIndexing: forPreciseIndexing, + Limit: p.BatchSize, + } + + for offset := 0; ; { + options.Offset = 0 + policiesBatch, totalCount, err := p.Service.GetConfigurationPolicies(ctx, options) + if err != nil { + return err + } + if len(policiesBatch) == 0 { + break + } + if handlerError := handle(policiesBatch); handlerError != nil { + return handlerError + } + if offset = offset + len(policiesBatch); offset >= totalCount { + break + } + } + + return nil +} + +var _ PolicyIterator = policyIterator{} + +func NewPolicyIterator(service policies.Service, repositoryId int, policyType PolicyType, batchSize int) PolicyIterator { + return policyIterator{ + Service: service, + RepositoryID: repositoryId, + PolicyType: policyType, + BatchSize: batchSize, + } +} diff --git a/internal/codeintel/syntactic_indexing/internal/testutils/BUILD.bazel b/internal/codeintel/syntactic_indexing/internal/testutils/BUILD.bazel new file mode 100644 index 000000000000..374be99a9ca9 --- /dev/null +++ b/internal/codeintel/syntactic_indexing/internal/testutils/BUILD.bazel @@ -0,0 +1,14 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "testutils", + srcs = ["testutils.go"], + importpath = "github.com/sourcegraph/sourcegraph/internal/codeintel/syntactic_indexing/internal/testutils", + visibility = ["//:__subpackages__"], + deps = [ + "//internal/api", + "//internal/database", + "@com_github_keegancsmith_sqlf//:sqlf", + "@com_github_stretchr_testify//require", + ], +) diff --git a/internal/codeintel/syntactic_indexing/internal/testutils/testutils.go b/internal/codeintel/syntactic_indexing/internal/testutils/testutils.go new file mode 100644 index 000000000000..9e6e33497315 --- /dev/null +++ b/internal/codeintel/syntactic_indexing/internal/testutils/testutils.go @@ -0,0 +1,50 @@ +package testutils + +import ( + "context" + "fmt" + "strings" + "testing" + + "github.com/keegancsmith/sqlf" + "github.com/sourcegraph/sourcegraph/internal/api" + "github.com/sourcegraph/sourcegraph/internal/database" + "github.com/stretchr/testify/require" +) + +func InsertRepo(t testing.TB, db database.DB, id api.RepoID, name string) { + if name == "" { + name = fmt.Sprintf("n-%d", id) + } + + deletedAt := sqlf.Sprintf("NULL") + if strings.HasPrefix(name, "DELETED-") { + deletedAt = sqlf.Sprintf("%s", "2024-02-08 15:06:50.973329+00") + } + insertRepoQuery := sqlf.Sprintf( + `INSERT INTO repo (id, name, deleted_at, private) VALUES (%s, %s, %s, %s) ON CONFLICT (id) DO NOTHING`, + id, + name, + deletedAt, + false, + ) + _, err := db.ExecContext(context.Background(), insertRepoQuery.Query(sqlf.PostgresBindVar), insertRepoQuery.Args()...) + require.NoError(t, err, "unexpected error while upserting repository") + + status := "cloned" + if strings.HasPrefix(name, "DELETED-") { + status = "not_cloned" + } + updateGitserverRepoQuery := sqlf.Sprintf( + `UPDATE gitserver_repos SET clone_status = %s WHERE repo_id = %s`, + status, + id, + ) + + _, err = db.ExecContext(context.Background(), updateGitserverRepoQuery.Query(sqlf.PostgresBindVar), updateGitserverRepoQuery.Args()...) + require.NoError(t, err, "unexpected error while upserting gitserver repository") +} + +func MakeCommit(i int) api.CommitID { + return api.CommitID(fmt.Sprintf("%040d", i)) +} diff --git a/internal/codeintel/syntactic_indexing/jobstore/BUILD.bazel b/internal/codeintel/syntactic_indexing/jobstore/BUILD.bazel index 4e8eab396062..d780c3aa0fbd 100644 --- a/internal/codeintel/syntactic_indexing/jobstore/BUILD.bazel +++ b/internal/codeintel/syntactic_indexing/jobstore/BUILD.bazel @@ -6,17 +6,26 @@ go_library( srcs = [ "job.go", "store.go", + "store_observability.go", ], importpath = "github.com/sourcegraph/sourcegraph/internal/codeintel/syntactic_indexing/jobstore", tags = [TAG_PLATFORM_GRAPH], visibility = ["//:__subpackages__"], deps = [ + "//internal/actor", + "//internal/api", + "//internal/database", "//internal/database/basestore", "//internal/database/dbutil", + "//internal/memo", + "//internal/metrics", "//internal/observation", "//internal/workerutil", "//internal/workerutil/dbworker/store", "@com_github_keegancsmith_sqlf//:sqlf", + "@com_github_prometheus_client_golang//prometheus", + "@com_github_sourcegraph_log//:log", + "@io_opentelemetry_go_otel//attribute", ], ) @@ -32,6 +41,8 @@ go_test( "requires-network", ], deps = [ + "//internal/api", + "//internal/codeintel/syntactic_indexing/internal/testutils", "//internal/database", "//internal/database/dbtest", "//internal/observation", diff --git a/internal/codeintel/syntactic_indexing/jobstore/job.go b/internal/codeintel/syntactic_indexing/jobstore/job.go index d826c27139c5..8407fa3a6b00 100644 --- a/internal/codeintel/syntactic_indexing/jobstore/job.go +++ b/internal/codeintel/syntactic_indexing/jobstore/job.go @@ -4,6 +4,7 @@ import ( "strconv" "time" + "github.com/sourcegraph/sourcegraph/internal/api" "github.com/sourcegraph/sourcegraph/internal/database/dbutil" "github.com/sourcegraph/sourcegraph/internal/workerutil" ) @@ -37,9 +38,9 @@ type SyntacticIndexingJob struct { // The fields below are not part of the standard dbworker fields // Which commit to index - Commit string `json:"commit"` + Commit api.CommitID `json:"commit"` // Which repository id to index - RepositoryID int `json:"repositoryId"` + RepositoryID api.RepoID `json:"repositoryId"` // Name of repository being indexed RepositoryName string `json:"repositoryName"` // Which user scheduled this job diff --git a/internal/codeintel/syntactic_indexing/jobstore/store.go b/internal/codeintel/syntactic_indexing/jobstore/store.go index 347ee1b627df..d4f9b2dc6cc1 100644 --- a/internal/codeintel/syntactic_indexing/jobstore/store.go +++ b/internal/codeintel/syntactic_indexing/jobstore/store.go @@ -1,22 +1,33 @@ package jobstore import ( + "context" "database/sql" "github.com/keegancsmith/sqlf" + "go.opentelemetry.io/otel/attribute" + "github.com/sourcegraph/log" + "github.com/sourcegraph/sourcegraph/internal/actor" + "github.com/sourcegraph/sourcegraph/internal/api" + "github.com/sourcegraph/sourcegraph/internal/database" "github.com/sourcegraph/sourcegraph/internal/database/basestore" + "github.com/sourcegraph/sourcegraph/internal/database/dbutil" "github.com/sourcegraph/sourcegraph/internal/observation" dbworkerstore "github.com/sourcegraph/sourcegraph/internal/workerutil/dbworker/store" ) type SyntacticIndexingJobStore interface { DBWorkerStore() dbworkerstore.Store[*SyntacticIndexingJob] + InsertIndexingJobs(ctx context.Context, indexingJobs []SyntacticIndexingJob) ([]SyntacticIndexingJob, error) + IsQueued(ctx context.Context, repositoryID api.RepoID, commitID api.CommitID) (bool, error) } type syntacticIndexingJobStoreImpl struct { - store dbworkerstore.Store[*SyntacticIndexingJob] - db *basestore.Store + store dbworkerstore.Store[*SyntacticIndexingJob] + db *basestore.Store + operations *operations + logger log.Logger } var _ SyntacticIndexingJobStore = &syntacticIndexingJobStoreImpl{} @@ -57,7 +68,132 @@ func NewStoreWithDB(observationCtx *observation.Context, db *sql.DB) (SyntacticI handle := basestore.NewHandleWithDB(observationCtx.Logger, db, sql.TxOptions{}) return &syntacticIndexingJobStoreImpl{ - store: dbworkerstore.New(observationCtx, handle, storeOptions), - db: basestore.NewWithHandle(handle), + store: dbworkerstore.New(observationCtx, handle, storeOptions), + db: basestore.NewWithHandle(handle), + operations: newOperations(observationCtx), + logger: observationCtx.Logger.Scoped("syntactic_indexing.store"), }, nil } + +func (s *syntacticIndexingJobStoreImpl) InsertIndexingJobs(ctx context.Context, indexingJobs []SyntacticIndexingJob) (_ []SyntacticIndexingJob, err error) { + ctx, _, endObservation := s.operations.insertIndexingJobs.With(ctx, &err, observation.Args{Attrs: []attribute.KeyValue{ + attribute.Int("numIndexingJobs", len(indexingJobs)), + }}) + endObservation(1, observation.Args{}) + + if len(indexingJobs) == 0 { + return nil, nil + } + + indexingJobsValues := make([]*sqlf.Query, 0, len(indexingJobs)) + for _, index := range indexingJobs { + indexingJobsValues = append(indexingJobsValues, sqlf.Sprintf( + "(%s, %s, %s, %s)", + index.State, + index.Commit, + index.RepositoryID, + actor.FromContext(ctx).UID, + )) + } + + indexingJobs = []SyntacticIndexingJob{} + err = s.db.WithTransact(ctx, func(tx *basestore.Store) error { + insertedJobIds, err := basestore.ScanInts(tx.Query(ctx, sqlf.Sprintf(insertIndexQuery, sqlf.Join(indexingJobsValues, ",")))) + if err != nil { + return err + } + s.operations.indexingJobsInserted.Add(float64(len(insertedJobIds))) + + authzConds, err := database.AuthzQueryConds(ctx, database.NewDBWith(s.logger, s.db)) + if err != nil { + return err + } + + jobLookupQueries := make([]*sqlf.Query, 0, len(insertedJobIds)) + for _, id := range insertedJobIds { + jobLookupQueries = append(jobLookupQueries, sqlf.Sprintf("%d", id)) + } + indexingJobs, err = scanIndexes(tx.Query(ctx, sqlf.Sprintf(getIndexesByIDsQuery, sqlf.Join(jobLookupQueries, ", "), authzConds))) + return err + }) + + return indexingJobs, err +} + +func (s *syntacticIndexingJobStoreImpl) IsQueued(ctx context.Context, repositoryID api.RepoID, commitID api.CommitID) (bool, error) { + isQueued, _, err := basestore.ScanFirstBool(s.db.Query(ctx, sqlf.Sprintf( + isQueuedQuery, + repositoryID, + commitID, + ))) + return isQueued, err +} + +const insertIndexQuery = ` +INSERT INTO syntactic_scip_indexing_jobs ( + state, + commit, + repository_id, + enqueuer_user_id +) +VALUES %s +RETURNING id +` + +const isQueuedQuery = ` +SELECT EXISTS( + SELECT queued_at + FROM syntactic_scip_indexing_jobs + WHERE + repository_id = %s AND + commit = %s + ORDER BY queued_at DESC + LIMIT 1 +) +` + +const getIndexesByIDsQuery = ` +SELECT + u.id, + u.commit, + u.queued_at, + u.state, + u.failure_message, + u.started_at, + u.finished_at, + u.process_after, + u.num_resets, + u.num_failures, + u.repository_id, + u.repository_name, + u.should_reindex, + u.enqueuer_user_id +FROM syntactic_scip_indexing_jobs_with_repository_name u +WHERE u.id IN (%s) and %s +ORDER BY u.id +` + +func scanIndex(s dbutil.Scanner) (index SyntacticIndexingJob, err error) { + if err := s.Scan( + &index.ID, + &index.Commit, + &index.QueuedAt, + &index.State, + &index.FailureMessage, + &index.StartedAt, + &index.FinishedAt, + &index.ProcessAfter, + &index.NumResets, + &index.NumFailures, + &index.RepositoryID, + &index.RepositoryName, + &index.ShouldReindex, + &index.EnqueuerUserID, + ); err != nil { + return index, err + } + + return index, nil +} + +var scanIndexes = basestore.NewSliceScanner(scanIndex) diff --git a/internal/codeintel/syntactic_indexing/jobstore/store_helpers_test.go b/internal/codeintel/syntactic_indexing/jobstore/store_helpers_test.go index a691058f7a5c..080a5a1b378d 100644 --- a/internal/codeintel/syntactic_indexing/jobstore/store_helpers_test.go +++ b/internal/codeintel/syntactic_indexing/jobstore/store_helpers_test.go @@ -2,11 +2,10 @@ package jobstore import ( "context" - "fmt" - "strings" "testing" "github.com/keegancsmith/sqlf" + "github.com/sourcegraph/sourcegraph/internal/codeintel/syntactic_indexing/internal/testutils" "github.com/sourcegraph/sourcegraph/internal/database" "github.com/stretchr/testify/require" ) @@ -14,7 +13,7 @@ import ( func insertIndexRecords(t testing.TB, db database.DB, records ...SyntacticIndexingJob) { for _, index := range records { if index.Commit == "" { - index.Commit = makeCommit(index.ID) + index.Commit = testutils.MakeCommit(index.ID) } if index.State == "" { index.State = Completed @@ -23,7 +22,7 @@ func insertIndexRecords(t testing.TB, db database.DB, records ...SyntacticIndexi index.RepositoryID = 50 } // Ensure we have a repo for the inner join in select queries - insertRepo(t, db, index.RepositoryID, index.RepositoryName) + testutils.InsertRepo(t, db, index.RepositoryID, index.RepositoryName) query := sqlf.Sprintf(` INSERT INTO syntactic_scip_indexing_jobs ( @@ -61,40 +60,3 @@ func insertIndexRecords(t testing.TB, db database.DB, records ...SyntacticIndexi require.NoError(t, err, "unexpected error while inserting index") } } - -func insertRepo(t testing.TB, db database.DB, id int, name string) { - if name == "" { - name = fmt.Sprintf("n-%d", id) - } - - deletedAt := sqlf.Sprintf("NULL") - if strings.HasPrefix(name, "DELETED-") { - deletedAt = sqlf.Sprintf("%s", "2024-02-08 15:06:50.973329+00") - } - insertRepoQuery := sqlf.Sprintf( - `INSERT INTO repo (id, name, deleted_at, private) VALUES (%s, %s, %s, %s) ON CONFLICT (id) DO NOTHING`, - id, - name, - deletedAt, - false, - ) - _, err := db.ExecContext(context.Background(), insertRepoQuery.Query(sqlf.PostgresBindVar), insertRepoQuery.Args()...) - require.NoError(t, err, "unexpected error while upserting repository") - - status := "cloned" - if strings.HasPrefix(name, "DELETED-") { - status = "not_cloned" - } - updateGitserverRepoQuery := sqlf.Sprintf( - `UPDATE gitserver_repos SET clone_status = %s WHERE repo_id = %s`, - status, - id, - ) - - _, err = db.ExecContext(context.Background(), updateGitserverRepoQuery.Query(sqlf.PostgresBindVar), updateGitserverRepoQuery.Args()...) - require.NoError(t, err, "unexpected error while upserting gitserver repository") -} - -func makeCommit(i int) string { - return fmt.Sprintf("%040d", i) -} diff --git a/internal/codeintel/syntactic_indexing/jobstore/store_observability.go b/internal/codeintel/syntactic_indexing/jobstore/store_observability.go new file mode 100644 index 000000000000..e4852376344a --- /dev/null +++ b/internal/codeintel/syntactic_indexing/jobstore/store_observability.go @@ -0,0 +1,55 @@ +package jobstore + +import ( + "fmt" + + "github.com/prometheus/client_golang/prometheus" + + "github.com/sourcegraph/sourcegraph/internal/memo" + "github.com/sourcegraph/sourcegraph/internal/metrics" + "github.com/sourcegraph/sourcegraph/internal/observation" +) + +type operations struct { + isQueued *observation.Operation + insertIndexingJobs *observation.Operation + indexingJobsInserted prometheus.Counter +} + +var ( + indexesInsertedCounterMemo = memo.NewMemoizedConstructorWithArg(func(r prometheus.Registerer) (prometheus.Counter, error) { + indexesInsertedCounter := prometheus.NewCounter(prometheus.CounterOpts{ + Name: "src_codeintel_dbstore_syntactic_indexing_jobs_inserted", + Help: "The number of codeintel syntactic indexing jobs inserted.", + }) + r.MustRegister(indexesInsertedCounter) + return indexesInsertedCounter, nil + }) + m = new(metrics.SingletonREDMetrics) +) + +func newOperations(observationCtx *observation.Context) *operations { + m := m.Get(func() *metrics.REDMetrics { + return metrics.NewREDMetrics( + observationCtx.Registerer, + "codeintel_syntactic_indexing_jobs_store", + metrics.WithLabels("op"), + metrics.WithCountHelp("Total number of method invocations."), + ) + }) + + op := func(name string) *observation.Operation { + return observationCtx.Operation(observation.Op{ + Name: fmt.Sprintf("codeintel.syntacticindexing.store.%s", name), + MetricLabelValues: []string{name}, + Metrics: m, + }) + } + + indexesInsertedCounter, _ := indexesInsertedCounterMemo.Init(observationCtx.Registerer) + return &operations{ + isQueued: op("IsQueued"), + insertIndexingJobs: op("InsertIndexingJobs"), + indexingJobsInserted: indexesInsertedCounter, + } +} diff --git a/internal/codeintel/syntactic_indexing/jobstore/store_test.go b/internal/codeintel/syntactic_indexing/jobstore/store_test.go index 021aa940d808..c6f25f874915 100644 --- a/internal/codeintel/syntactic_indexing/jobstore/store_test.go +++ b/internal/codeintel/syntactic_indexing/jobstore/store_test.go @@ -5,13 +5,15 @@ import ( "testing" "time" + "github.com/sourcegraph/sourcegraph/internal/api" + "github.com/sourcegraph/sourcegraph/internal/codeintel/syntactic_indexing/internal/testutils" "github.com/sourcegraph/sourcegraph/internal/database" "github.com/sourcegraph/sourcegraph/internal/database/dbtest" "github.com/sourcegraph/sourcegraph/internal/observation" "github.com/stretchr/testify/require" ) -func TestIndexingWorkerStore(t *testing.T) { +func TestSyntacticIndexingStoreDequeue(t *testing.T) { /* The purpose of this test is to verify that the DB schema we're using for the syntactic code intel work matches @@ -38,6 +40,8 @@ func TestIndexingWorkerStore(t *testing.T) { require.Equal(t, 0, initCount) + commit1, commit2, commit3 := testutils.MakeCommit(1), testutils.MakeCommit(2), testutils.MakeCommit(3) + insertIndexRecords(t, db, // Even though this record is the oldest in the queue, // it is associated with a deleted repository. @@ -45,7 +49,7 @@ func TestIndexingWorkerStore(t *testing.T) { // record at all, and the first one should still be the record with ID=1 SyntacticIndexingJob{ ID: 500, - Commit: "deadbeefdeadbeefdeadbeefdeadbeefdead3333", + Commit: commit3, RepositoryID: 4, RepositoryName: "DELETED-org/repo", State: Queued, @@ -53,7 +57,7 @@ func TestIndexingWorkerStore(t *testing.T) { }, SyntacticIndexingJob{ ID: 1, - Commit: "deadbeefdeadbeefdeadbeefdeadbeefdead1111", + Commit: commit1, RepositoryID: 1, RepositoryName: "tangy/tacos", State: Queued, @@ -61,7 +65,7 @@ func TestIndexingWorkerStore(t *testing.T) { }, SyntacticIndexingJob{ ID: 2, - Commit: "deadbeefdeadbeefdeadbeefdeadbeefdead2222", + Commit: commit2, RepositoryID: 2, RepositoryName: "salty/empanadas", State: Queued, @@ -69,7 +73,7 @@ func TestIndexingWorkerStore(t *testing.T) { }, SyntacticIndexingJob{ ID: 3, - Commit: "deadbeefdeadbeefdeadbeefdeadbeefdead3333", + Commit: commit3, RepositoryID: 3, RepositoryName: "juicy/mangoes", State: Processing, @@ -87,7 +91,7 @@ func TestIndexingWorkerStore(t *testing.T) { require.True(t, hasRecord) require.Equal(t, 1, record1.ID) require.Equal(t, "tangy/tacos", record1.RepositoryName) - require.Equal(t, "deadbeefdeadbeefdeadbeefdeadbeefdead1111", record1.Commit) + require.Equal(t, commit1, record1.Commit) record2, hasRecord, err := store.Dequeue(ctx, "worker2", nil) @@ -95,10 +99,93 @@ func TestIndexingWorkerStore(t *testing.T) { require.True(t, hasRecord) require.Equal(t, 2, record2.ID) require.Equal(t, "salty/empanadas", record2.RepositoryName) - require.Equal(t, "deadbeefdeadbeefdeadbeefdeadbeefdead2222", record2.Commit) + require.Equal(t, commit2, record2.Commit) _, hasRecord, err = store.Dequeue(ctx, "worker2", nil) require.NoError(t, err) require.False(t, hasRecord) } + +func TestSyntacticIndexingStoreEnqueue(t *testing.T) { + /* + The purpose of this test is to verify that methods InsertIndexingJobs and IsQueued + correctly interact with each other, and that the records inserted using those methods + are valid from the point of view of the DB worker interface + */ + observationContext := observation.TestContextTB(t) + sqlDB := dbtest.NewDB(t) + db := database.NewDB(observationContext.Logger, sqlDB) + ctx := context.Background() + + jobStore, err := NewStoreWithDB(observationContext, sqlDB) + require.NoError(t, err, "unexpected error creating dbworker stores") + store := jobStore.DBWorkerStore() + + tacosRepoId, tacosRepoName, tacosCommit := api.RepoID(1), "tangy/tacos", testutils.MakeCommit(1) + empanadasRepoId, empanadasRepoName, empanadasCommit := api.RepoID(2), "salty/empanadas", testutils.MakeCommit(2) + mangosRepoId, mangosRepoName, mangosCommit := api.RepoID(2), "juicy/mangos", testutils.MakeCommit(2) + + testutils.InsertRepo(t, db, tacosRepoId, tacosRepoName) + testutils.InsertRepo(t, db, empanadasRepoId, empanadasRepoName) + testutils.InsertRepo(t, db, mangosRepoId, mangosRepoName) + + jobStore.InsertIndexingJobs(ctx, []SyntacticIndexingJob{ + { + ID: 1, + Commit: tacosCommit, + RepositoryID: tacosRepoId, + RepositoryName: tacosRepoName, + State: Queued, + QueuedAt: time.Now().Add(time.Second * -5), + }, + { + ID: 2, + Commit: empanadasCommit, + RepositoryID: empanadasRepoId, + RepositoryName: empanadasRepoName, + State: Queued, + QueuedAt: time.Now().Add(time.Second * -2), + }, + }) + + // Assertions below verify the interactions between InsertIndexes and IsQueued + tacosIsQueued, err := jobStore.IsQueued(ctx, tacosRepoId, tacosCommit) + require.NoError(t, err) + require.True(t, tacosIsQueued) + + empanadasIsQueued, err := jobStore.IsQueued(ctx, empanadasRepoId, empanadasCommit) + require.NoError(t, err) + require.True(t, empanadasIsQueued) + + mangosIsQueued, err := jobStore.IsQueued(ctx, mangosRepoId, mangosCommit) + require.NoError(t, err) + require.True(t, mangosIsQueued) + + // Assertions below verify that records inserted by InsertIndexes are + // still visible by DB Worker interface + afterCount, _ := store.QueuedCount(ctx, true) + + require.Equal(t, 2, afterCount) + + record1, hasRecord, err := store.Dequeue(ctx, "worker1", nil) + + require.NoError(t, err) + require.True(t, hasRecord) + require.Equal(t, 1, record1.ID) + require.Equal(t, tacosRepoName, record1.RepositoryName) + require.Equal(t, tacosCommit, record1.Commit) + + record2, hasRecord, err := store.Dequeue(ctx, "worker2", nil) + + require.NoError(t, err) + require.True(t, hasRecord) + require.Equal(t, 2, record2.ID) + require.Equal(t, empanadasRepoName, record2.RepositoryName) + require.Equal(t, empanadasCommit, record2.Commit) + + _, hasRecord, err = store.Dequeue(ctx, "worker2", nil) + require.False(t, hasRecord) + require.NoError(t, err) + +} diff --git a/internal/codeintel/syntactic_indexing/scheduler.go b/internal/codeintel/syntactic_indexing/scheduler.go new file mode 100644 index 000000000000..637ff44d1030 --- /dev/null +++ b/internal/codeintel/syntactic_indexing/scheduler.go @@ -0,0 +1,144 @@ +package syntactic_indexing + +import ( + "context" + "database/sql" + "time" + + "github.com/sourcegraph/sourcegraph/internal/api" + "github.com/sourcegraph/sourcegraph/internal/codeintel/autoindexing" + "github.com/sourcegraph/sourcegraph/internal/codeintel/policies" + policiesshared "github.com/sourcegraph/sourcegraph/internal/codeintel/policies/shared" + "github.com/sourcegraph/sourcegraph/internal/codeintel/reposcheduler" + codeintelshared "github.com/sourcegraph/sourcegraph/internal/codeintel/shared" + "github.com/sourcegraph/sourcegraph/internal/codeintel/syntactic_indexing/internal" + "github.com/sourcegraph/sourcegraph/internal/codeintel/syntactic_indexing/jobstore" + "github.com/sourcegraph/sourcegraph/internal/codeintel/uploads" + "github.com/sourcegraph/sourcegraph/internal/collections" + "github.com/sourcegraph/sourcegraph/internal/database" + "github.com/sourcegraph/sourcegraph/internal/gitserver" + "github.com/sourcegraph/sourcegraph/internal/observation" + "github.com/sourcegraph/sourcegraph/lib/errors" +) + +type SyntacticJobScheduler interface { + Schedule(observationCtx *observation.Context, ctx context.Context, currentTime time.Time) error + GetConfig() *SchedulerConfig +} + +type syntacticJobScheduler struct { + RepositorySchedulingService reposcheduler.RepositorySchedulingService + PolicyMatcher autoindexing.PolicyMatcher + PoliciesService policies.Service + RepoStore database.RepoStore + Enqueuer IndexEnqueuer + Config *SchedulerConfig +} + +var _ SyntacticJobScheduler = &syntacticJobScheduler{} + +func NewSyntacticJobScheduler(repoSchedulingSvc reposcheduler.RepositorySchedulingService, + policyMatcher policies.Matcher, policiesSvc policies.Service, + repoStore database.RepoStore, enqueuer IndexEnqueuer, config SchedulerConfig) (SyntacticJobScheduler, error) { + + return &syntacticJobScheduler{ + RepositorySchedulingService: repoSchedulingSvc, + PolicyMatcher: &policyMatcher, + PoliciesService: policiesSvc, + RepoStore: repoStore, + Enqueuer: enqueuer, + Config: &config, + }, nil +} + +func BootstrapSyntacticJobScheduler(observationCtx *observation.Context, frontendSQLDB *sql.DB, codeintelSQLDB *sql.DB) (SyntacticJobScheduler, error) { + frontendDB := database.NewDB(observationCtx.Logger, frontendSQLDB) + + gitserverClient := gitserver.NewClient("codeintel-syntactic-indexing") + + codeIntelDB := codeintelshared.NewCodeIntelDB(observationCtx.Logger, codeintelSQLDB) + + uploadsSvc := uploads.NewService(observationCtx, frontendDB, codeIntelDB, gitserverClient.Scoped("uploads")) + policiesSvc := policies.NewService(observationCtx, frontendDB, uploadsSvc, gitserverClient.Scoped("policies")) + + matcher := policies.NewMatcher( + gitserverClient, + policies.IndexingExtractor, + true, + true, + ) + + repoSchedulingStore := reposcheduler.NewSyntacticStore(observationCtx, frontendDB) + repoSchedulingSvc := reposcheduler.NewService(repoSchedulingStore) + + jobStore, err := jobstore.NewStoreWithDB(observationCtx, frontendSQLDB) + if err != nil { + return nil, err + } + + repoStore := frontendDB.Repos() + + enqueuer := NewIndexEnqueuer(observationCtx, jobStore, repoSchedulingStore, repoStore) + + return NewSyntacticJobScheduler(repoSchedulingSvc, *matcher, *policiesSvc, repoStore, enqueuer, *schedulerConfig) +} + +// GetConfig implements SyntacticJobScheduler. +func (s *syntacticJobScheduler) GetConfig() *SchedulerConfig { + return s.Config +} + +func (s *syntacticJobScheduler) Schedule(observationCtx *observation.Context, ctx context.Context, currentTime time.Time) error { + batchOptions := reposcheduler.NewBatchOptions(schedulerConfig.RepositoryProcessDelay, true, &schedulerConfig.PolicyBatchSize, schedulerConfig.RepositoryBatchSize) + + repos, err := s.RepositorySchedulingService.GetRepositoriesForIndexScan(ctx, + batchOptions, currentTime) + + if err != nil { + return err + } + + commitsToSchedule := make(map[api.RepoID]collections.Set[api.CommitID]) + enqueueOptions := EnqueueOptions{force: false} + + var allErrors error + + for _, repoToIndex := range repos { + repo, _ := s.RepoStore.Get(ctx, api.RepoID(repoToIndex.ID)) + policyIterator := internal.NewPolicyIterator(s.PoliciesService, repoToIndex.ID, internal.SyntacticIndexing, schedulerConfig.PolicyBatchSize) + err := policyIterator.ForEachPoliciesBatch(ctx, func(policies []policiesshared.ConfigurationPolicy) error { + commitMap, err := s.PolicyMatcher.CommitsDescribedByPolicy(ctx, int(repoToIndex.ID), repo.Name, policies, currentTime) + + if err != nil { + return err + } + + for commit, policyMatches := range commitMap { + if len(policyMatches) == 0 { + continue + } + if commits := commitsToSchedule[repo.ID]; commits != nil { + commits.Add(api.CommitID(commit)) + } else { + commitsToSchedule[repo.ID] = collections.NewSet(api.CommitID(commit)) + } + } + + return nil + }) + + if err != nil { + allErrors = errors.Append(allErrors, errors.Newf("Failed to discover commits eligible for syntactic indexing for repo [%s]: %v", repo.Name, err)) + } + } + + for repoId, commits := range commitsToSchedule { + for _, commitId := range commits.Values() { + if _, err := s.Enqueuer.QueueIndexingJobs(ctx, repoId, commitId, enqueueOptions); err != nil { + allErrors = errors.Append(allErrors, errors.Newf("Failed to schedule syntactic indexing of repo [ID=%s], commit [%s]: %v", repoId, commitId, err)) + } + } + } + + return allErrors +} diff --git a/internal/codeintel/syntactic_indexing/scheduler_config.go b/internal/codeintel/syntactic_indexing/scheduler_config.go new file mode 100644 index 000000000000..e7c1bfb9a83e --- /dev/null +++ b/internal/codeintel/syntactic_indexing/scheduler_config.go @@ -0,0 +1,28 @@ +package syntactic_indexing + +import ( + "time" + + "github.com/sourcegraph/sourcegraph/internal/env" +) + +type SchedulerConfig struct { + env.BaseConfig + + SchedulerInterval time.Duration + RepositoryProcessDelay time.Duration + RepositoryBatchSize int + PolicyBatchSize int +} + +func (c *SchedulerConfig) Load() { + intervalName := env.ChooseFallbackVariableName("CODEINTEL_SYNTACTIC_INDEXING_SCHEDULER_INTERVAL") + repositoryProcessDelayName := env.ChooseFallbackVariableName("CODEINTEL_SYNTACTIC_INDEXING_SCHEDULER_REPOSITORY_PROCESS_DELAY") + repositoryBatchSizeName := env.ChooseFallbackVariableName("CODEINTEL_SYNTACTIC_INDEXING_SCHEDULER_REPOSITORY_BATCH_SIZE") + policyBatchSizeName := env.ChooseFallbackVariableName("CODEINTEL_SYNTACTIC_INDEXING_SCHEDULER_POLICY_BATCH_SIZE") + + c.SchedulerInterval = c.GetInterval(intervalName, "2m", "How frequently to run the auto-indexing scheduling routine.") + c.RepositoryProcessDelay = c.GetInterval(repositoryProcessDelayName, "24h", "The minimum frequency that the same repository can be considered for auto-index scheduling.") + c.RepositoryBatchSize = c.GetInt(repositoryBatchSizeName, "2500", "The number of repositories to consider for auto-indexing scheduling at a time.") + c.PolicyBatchSize = c.GetInt(policyBatchSizeName, "100", "The number of policies to consider for auto-indexing scheduling at a time.") +} diff --git a/internal/codeintel/syntactic_indexing/scheduler_job.go b/internal/codeintel/syntactic_indexing/scheduler_job.go new file mode 100644 index 000000000000..f55f9530621b --- /dev/null +++ b/internal/codeintel/syntactic_indexing/scheduler_job.go @@ -0,0 +1,103 @@ +package syntactic_indexing + +import ( + "context" + "time" + + "github.com/sourcegraph/sourcegraph/cmd/worker/job" + codeinteldb "github.com/sourcegraph/sourcegraph/cmd/worker/shared/init/codeintel" + workerdb "github.com/sourcegraph/sourcegraph/cmd/worker/shared/init/db" + "github.com/sourcegraph/sourcegraph/internal/actor" + "github.com/sourcegraph/sourcegraph/internal/conf" + "github.com/sourcegraph/sourcegraph/internal/env" + "github.com/sourcegraph/sourcegraph/internal/goroutine" + "github.com/sourcegraph/sourcegraph/internal/metrics" + "github.com/sourcegraph/sourcegraph/internal/observation" +) + +type syntacticIndexingSchedulerJob struct{} + +var _ job.Job = &syntacticIndexingSchedulerJob{} + +var schedulerConfig *SchedulerConfig = &SchedulerConfig{} + +func NewSyntacticindexingSchedulerJob() job.Job { + return &syntacticIndexingSchedulerJob{} +} + +func (job *syntacticIndexingSchedulerJob) Description() string { + return "" +} + +func (job *syntacticIndexingSchedulerJob) Config() []env.Config { + return []env.Config{ + schedulerConfig, + } +} + +func (job *syntacticIndexingSchedulerJob) Routines(_ context.Context, observationCtx *observation.Context) ([]goroutine.BackgroundRoutine, error) { + frontendDB, err := workerdb.InitRawDB(observationCtx) + if err != nil { + return nil, err + } + + codeintelDB, err := codeinteldb.InitRawDB(observationCtx) + if err != nil { + return nil, err + } + + scheduler, err := BootstrapSyntacticJobScheduler(observationCtx, frontendDB, codeintelDB) + if err != nil { + return nil, err + } + + return []goroutine.BackgroundRoutine{ + newSchedulerJob( + observationCtx, + scheduler, + ), + }, nil + +} + +func newSchedulerJob( + observationCtx *observation.Context, + scheduler SyntacticJobScheduler, +) goroutine.BackgroundRoutine { + + m := new(metrics.SingletonREDMetrics) + + redMetrics := m.Get(func() *metrics.REDMetrics { + return metrics.NewREDMetrics( + observationCtx.Registerer, + "codeintel_syntactic_indexing_background", + metrics.WithLabels("op"), + metrics.WithCountHelp("Total number of method invocations."), + ) + }) + + return goroutine.NewPeriodicGoroutine( + actor.WithInternalActor(context.Background()), + goroutine.HandlerFunc(func(ctx context.Context) error { + config := conf.Get().ExperimentalFeatures + + if config != nil && config.CodeintelSyntacticIndexingEnabled { + return scheduler.Schedule(observationCtx, ctx, time.Now()) + } else { + observationCtx.Logger.Info("Syntactic indexing is disabled") + return nil + } + }), + goroutine.WithName("codeintel.syntactic-indexing-background-scheduler"), + goroutine.WithDescription("schedule syntactic indexing jobs in the background"), + goroutine.WithInterval(time.Second*5), + goroutine.WithOperation(observationCtx.Operation(observation.Op{ + Name: "codeintel.syntactic_indexing.HandleIndexSchedule", + MetricLabelValues: []string{"HandleIndexSchedule"}, + Metrics: redMetrics, + ErrorFilter: func(err error) observation.ErrorFilterBehaviour { + return observation.EmitForDefault + }, + })), + ) +} diff --git a/internal/codeintel/syntactic_indexing/scheduler_test.go b/internal/codeintel/syntactic_indexing/scheduler_test.go new file mode 100644 index 000000000000..23fafda60b6b --- /dev/null +++ b/internal/codeintel/syntactic_indexing/scheduler_test.go @@ -0,0 +1,203 @@ +package syntactic_indexing + +import ( + "context" + "database/sql" + "errors" + "fmt" + "testing" + "time" + + "github.com/keegancsmith/sqlf" + "github.com/stretchr/testify/require" + + "github.com/sourcegraph/sourcegraph/internal/api" + "github.com/sourcegraph/sourcegraph/internal/codeintel/policies" + "github.com/sourcegraph/sourcegraph/internal/codeintel/reposcheduler" + codeintelshared "github.com/sourcegraph/sourcegraph/internal/codeintel/shared" + "github.com/sourcegraph/sourcegraph/internal/codeintel/syntactic_indexing/internal/testutils" + "github.com/sourcegraph/sourcegraph/internal/codeintel/syntactic_indexing/jobstore" + "github.com/sourcegraph/sourcegraph/internal/codeintel/uploads" + "github.com/sourcegraph/sourcegraph/internal/database" + "github.com/sourcegraph/sourcegraph/internal/database/dbtest" + "github.com/sourcegraph/sourcegraph/internal/gitserver" + "github.com/sourcegraph/sourcegraph/internal/gitserver/gitdomain" + "github.com/sourcegraph/sourcegraph/internal/observation" +) + +func TestSyntacticIndexingScheduler(t *testing.T) { + observationCtx := observation.TestContextTB(t) + + // Bootstrap scheduler + frontendRawDB := dbtest.NewDB(t) + codeintelRawDB := dbtest.NewCodeintelDB(t) + db := database.NewDB(observationCtx.Logger, frontendRawDB) + config := &SchedulerConfig{ + PolicyBatchSize: 100, + RepositoryBatchSize: 2500, + } + gitserverClient := gitserver.NewMockClient() + scheduler, jobStore, policiesSvc := bootstrapScheduler(t, observationCtx, frontendRawDB, codeintelRawDB, gitserverClient, config) + + ctx := context.Background() + + // Setup repositories + tacosRepoId, tacosRepoName, tacosCommit := api.RepoID(1), "github.com/tangy/tacos", testutils.MakeCommit(1) + empanadasRepoId, empanadasRepoName, empanadasCommit := api.RepoID(2), "github.com/salty/empanadas", testutils.MakeCommit(2) + mangosRepoId, mangosRepoName, _ := api.RepoID(3), "gitlab.com/juicy/mangos", testutils.MakeCommit(3) + testutils.InsertRepo(t, db, tacosRepoId, tacosRepoName) + testutils.InsertRepo(t, db, empanadasRepoId, empanadasRepoName) + testutils.InsertRepo(t, db, mangosRepoId, mangosRepoName) + + setupRepoPolicies(t, ctx, db, policiesSvc) + + gitserverClient.ResolveRevisionFunc.SetDefaultHook(func(ctx context.Context, repo api.RepoName, rev string, options gitserver.ResolveRevisionOptions) (api.CommitID, error) { + isTacos := repo == api.RepoName(tacosRepoName) && rev == string(tacosCommit) + isEmpanadas := repo == api.RepoName(empanadasRepoName) && rev == string(empanadasCommit) + + if isTacos || isEmpanadas { + return api.CommitID(rev), nil + } else { + return api.CommitID("what"), errors.New(fmt.Sprintf("Unexpected repo (`%s`) and revision (`%s`) requested from gitserver: ", repo, rev)) + } + }) + + gitserverClient.ListRefsFunc.SetDefaultHook(func(ctx context.Context, repoName api.RepoName, opts gitserver.ListRefsOpts) ([]gitdomain.Ref, error) { + ref := gitdomain.Ref{ + Name: "refs/head/main", + Type: gitdomain.RefTypeBranch, + IsHead: true, + } + switch string(repoName) { + case empanadasRepoName: + ref.CommitID = api.CommitID(empanadasCommit) + case tacosRepoName: + ref.CommitID = api.CommitID(tacosCommit) + default: + return nil, errors.New(fmt.Sprintf("Unexpected repo (`%s`) requested from gitserver's ListRef", repoName)) + } + return []gitdomain.Ref{ref}, nil + }) + + err := scheduler.Schedule(observationCtx, ctx, time.Now()) + require.NoError(t, err) + require.Equal(t, 2, unwrap(jobStore.DBWorkerStore().QueuedCount(ctx, false))(t)) + + job1, recordReturned, err := jobStore.DBWorkerStore().Dequeue(ctx, "worker-1", []*sqlf.Query{}) + require.NoError(t, err) + require.True(t, recordReturned) + + job2, recordReturned, err := jobStore.DBWorkerStore().Dequeue(ctx, "worker-1", []*sqlf.Query{}) + require.NoError(t, err) + require.True(t, recordReturned) + + // There are only two records because in our policies setup we have + // explicitly disabled syntactic indexing for the last remaining repository + job3, recordReturned, err := jobStore.DBWorkerStore().Dequeue(ctx, "worker-1", []*sqlf.Query{}) + require.Nil(t, job3) + require.False(t, recordReturned) + require.NoError(t, err) + + // Ensure the test is resilient to order changes + tacosJob := &jobstore.SyntacticIndexingJob{} + empanadasJob := &jobstore.SyntacticIndexingJob{} + + if job1.RepositoryName == tacosRepoName { + tacosJob = job1 + empanadasJob = job2 + } else { + require.Equal(t, empanadasRepoName, job1.RepositoryName) + tacosJob = job2 + empanadasJob = job1 + } + + require.Equal(t, tacosRepoName, tacosJob.RepositoryName) + require.Equal(t, tacosCommit, tacosJob.Commit) + + require.Equal(t, empanadasRepoName, empanadasJob.RepositoryName) + require.Equal(t, empanadasCommit, empanadasJob.Commit) + +} + +func unwrap[T any](v T, err error) func(*testing.T) T { + return func(t *testing.T) T { + require.NoError(t, err) + return v + } +} + +func bootstrapScheduler(t *testing.T, observationCtx *observation.Context, + frontendRawDB *sql.DB, codeintelDB *sql.DB, gitserverClient gitserver.Client, + config *SchedulerConfig) (SyntacticJobScheduler, jobstore.SyntacticIndexingJobStore, *policies.Service) { + frontendDB := database.NewDB(observationCtx.Logger, frontendRawDB) + codeIntelDB := codeintelshared.NewCodeIntelDB(observationCtx.Logger, codeintelDB) + uploadsSvc := uploads.NewService(observationCtx, frontendDB, codeIntelDB, gitserverClient.Scoped("uploads")) + policiesSvc := policies.NewService(observationCtx, frontendDB, uploadsSvc, gitserverClient.Scoped("policies")) + + schedulerConfig.Load() + matcher := policies.NewMatcher( + gitserverClient, + policies.IndexingExtractor, + true, + true, + ) + repoSchedulingStore := reposcheduler.NewSyntacticStore(observationCtx, frontendDB) + repoSchedulingSvc := reposcheduler.NewService(repoSchedulingStore) + jobStore := unwrap(jobstore.NewStoreWithDB(observationCtx, frontendRawDB))(t) + + repoStore := frontendDB.Repos() + enqueuer := NewIndexEnqueuer(observationCtx, jobStore, repoSchedulingStore, repoStore) + scheduler := unwrap(NewSyntacticJobScheduler(repoSchedulingSvc, *matcher, *policiesSvc, repoStore, enqueuer, *config))(t) + return scheduler, jobStore, policiesSvc +} + +func setupRepoPolicies(t *testing.T, ctx context.Context, db database.DB, policies *policies.Service) { + + if _, err := db.ExecContext(context.Background(), `TRUNCATE lsif_configuration_policies`); err != nil { + t.Fatalf("unexpected error while inserting configuration policies: %s", err) + } + + query := ` + INSERT INTO lsif_configuration_policies ( + id, + repository_id, + name, + type, + pattern, + repository_patterns, + retention_enabled, + retention_duration_hours, + retain_intermediate_commits, + syntactic_indexing_enabled, + indexing_enabled, + index_commit_max_age_hours, + index_intermediate_commits + ) VALUES + -- ↙ retention_enabled + -- | ↙ retention_duration_hours + -- | | ↙ retain_intermediate_commits + -- | | | ↙ syntactic_indexing_enabled + -- | | | | ↙ indexing_enabled + -- | | | | | ↙ index_commit_max_age_hours + -- | | | | | | ↙ index_intermediate_commits + (1000, 2, 'policy 1 abc', 'GIT_TREE', '', null, false, 0, false, true, false, 0, false), + -- Policy below specifically disables syntactic indexing for repo with ID=3 + (1003, 3, 'policy 3 bcd', 'GIT_TREE', '', null, false, 0, false, false, false, 0, false), + -- Policy below enables syntactic indexing for all repositories starting with 'github.com' + (1100, NULL, 'policy 10 def', 'GIT_TREE', '', '{github.com/*}', false, 0, false, true, false, 0, false) + ` + unwrap(db.ExecContext(ctx, query))(t) + + // Policy 1100 is the only one that contains repository patterns. + // For it to be matched against our repository, we need to update + // an extra bit of database state - a lookup table identifying + // policies and repositories that were matched by them + // + // The other two policies (1000 and 1003) have explicit repository_id set + // and don't need any extra database state to be returned by policy matcher. + for _, policyID := range []int{1100} { + policy, _, err := policies.GetConfigurationPolicyByID(ctx, policyID) + require.NoError(t, err) + require.NoError(t, policies.UpdateReposMatchingPolicyPatterns(ctx, policy)) + } +} diff --git a/internal/database/dbtest/dbtest.go b/internal/database/dbtest/dbtest.go index 70e4666c21fa..657aebdf1d48 100644 --- a/internal/database/dbtest/dbtest.go +++ b/internal/database/dbtest/dbtest.go @@ -69,6 +69,13 @@ func NewDB(t testing.TB) *sql.DB { return newDB(logger, t, "migrated", schemas.Frontend, schemas.CodeIntel) } +// NewCodeintelDB returns a connection to a new clean temporary testing database +// with only the codeintel schema applied +func NewCodeintelDB(t testing.TB) *sql.DB { + logger := logtest.Scoped(t) + return newDB(logger, t, "migrated-codeintel", schemas.CodeIntel) +} + // NewDBAtRev returns a connection to a clean, new temporary testing database with // the same schema as Sourcegraph's production Postgres database at the given revision. func NewDBAtRev(logger log.Logger, t testing.TB, rev string) *sql.DB {