Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Syntactic indexing: enqueuer and scheduler #62485

Merged
merged 56 commits into from
May 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
e1b62e8
WIP: Syntactic enqueuer
keynmol May 6, 2024
ebbd351
Add observability
keynmol May 7, 2024
d120724
Tests for syntactic indexing enqueuer
keynmol May 7, 2024
81f16d2
Syntactic indexing scheduler and enqueuer
keynmol May 9, 2024
5c244af
Separate scheduler into a testable interface
keynmol May 10, 2024
656ec9d
Allow injecting raw database to aid with testing
keynmol May 10, 2024
5f26d6e
Wire in metrics
keynmol May 10, 2024
03414ce
WIP
keynmol May 10, 2024
26a138b
Wire scheduling job and check site config
keynmol May 13, 2024
eb8e860
Enqueuer test and refactor test helpers
keynmol May 14, 2024
58102a0
Refactor config passing to ensure we don't use default values
keynmol May 15, 2024
d92c644
Remove usage of global InitServices
keynmol May 20, 2024
6797dbc
Cleanup scheduler bootstrap and move towards working tests
keynmol May 22, 2024
c5a2f87
Make policies store a non-internal module to use from tests
keynmol May 22, 2024
07e823f
Add more tests to scheduler
keynmol May 22, 2024
bc31f76
Restore policy store to internal package
keynmol May 23, 2024
42022f2
Rejig Policy service interface to allow using it in tests
keynmol May 23, 2024
7c4030e
Merge branch 'main' into syntactic-indexing-enqueuer
keynmol May 23, 2024
51f023d
PR comments
keynmol May 24, 2024
fb7e2f8
Regenerate Bazel build files
keynmol May 24, 2024
9bcd50e
Fix CI
keynmol May 24, 2024
e720b50
remove redundant error handling
keynmol May 28, 2024
04005cb
Clean up "insert indexes" operation
keynmol May 28, 2024
b1ca27b
Don't set up a commit if we never return it in tests
keynmol May 28, 2024
0bbddfd
Use consistent naming (indexing jobs vs indexes)
keynmol May 29, 2024
52987ee
Add test for force enqueueing
keynmol May 29, 2024
0fad383
Clean up
keynmol May 29, 2024
6b3685a
Be careful about separate frontend and codeintel databases
keynmol May 29, 2024
f51aff7
Adjust comments
keynmol May 29, 2024
de325f0
Add networking tags to bazel tests
keynmol May 29, 2024
4c973aa
Remove explicit config loading
keynmol May 29, 2024
72e6ce7
Reduce number of exit paths through function
varungandhi-src May 30, 2024
fc68b73
Simplify function bit more
varungandhi-src May 30, 2024
e78fada
Rearrange imports
varungandhi-src May 30, 2024
9f1598c
Remove spurious whitespace
varungandhi-src May 30, 2024
8048062
PR comments.
keynmol May 30, 2024
ac6f802
Whitespace / naming fixes
varungandhi-src May 30, 2024
dbb0d11
Whitespace / naming fixes - avoid codeintelDB & codeIntelDB
varungandhi-src May 30, 2024
10d8d97
Reduce blank lines
varungandhi-src May 30, 2024
61af571
Rearrange imports
varungandhi-src May 30, 2024
56812e5
Rename policies -> policiesSvc for clarity
varungandhi-src May 30, 2024
4d8db78
Simplify test code a bit
varungandhi-src May 30, 2024
0eb2992
Whitespace + add defensive check in test
varungandhi-src May 30, 2024
37ad21c
Make whitespace meaningful
varungandhi-src May 30, 2024
f7579eb
Whitespace/renaming
varungandhi-src May 30, 2024
669bf89
Remove redundant revision checks and extraneous gitserver calls
keynmol May 30, 2024
57a6035
Add clarifying comment in the test
keynmol May 30, 2024
cc953ad
Fixup comments
keynmol May 30, 2024
53a35b6
Better names
keynmol May 30, 2024
0a978d3
remove unused import
keynmol May 30, 2024
a25480b
Remove env fallback
keynmol May 30, 2024
821e175
Restore changes from bad rebase.
keynmol May 30, 2024
78c8f0e
Remove redundant private function
keynmol May 30, 2024
6080331
Remove gitserver dependency from enqueuer
keynmol May 30, 2024
e300c49
Fix tests
keynmol May 30, 2024
3e932f1
Add counter for jobs that were skipped
keynmol May 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/syntactic-code-intel-worker/shared/indexing_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,6 @@ func (i indexingHandler) Handle(ctx context.Context, logger log.Logger, record *
logger.Info("Stub indexing worker handling record",
log.Int("id", record.ID),
log.String("repository name", record.RepositoryName),
log.String("commit", record.Commit))
log.String("commit", string(record.Commit)))
return nil
}
1 change: 1 addition & 0 deletions cmd/worker/shared/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ go_library(
"//internal/authz",
"//internal/authz/providers",
"//internal/authz/subrepoperms",
"//internal/codeintel/syntactic_indexing",
"//internal/conf",
"//internal/database",
"//internal/debugserver",
Expand Down
6 changes: 5 additions & 1 deletion cmd/worker/shared/init/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,18 @@ import (
)

func InitDB(observationCtx *observation.Context) (database.DB, error) {
rawDB, err := initDatabaseMemo.Init(observationCtx)
rawDB, err := InitRawDB(observationCtx)
if err != nil {
return nil, err
}

return database.NewDB(observationCtx.Logger, rawDB), nil
}

func InitRawDB(observationCtx *observation.Context) (*sql.DB, error) {
return initDatabaseMemo.Init(observationCtx)
}

var initDatabaseMemo = memo.NewMemoizedConstructorWithArg(func(observationCtx *observation.Context) (*sql.DB, error) {
dsn := conf.GetServiceConnectionValueAndRestartOnChange(func(serviceConnections conftypes.ServiceConnections) string {
return serviceConnections.PostgresDSN
Expand Down
3 changes: 3 additions & 0 deletions cmd/worker/shared/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/sourcegraph/sourcegraph/internal/authz"
"github.com/sourcegraph/sourcegraph/internal/authz/providers"
srp "github.com/sourcegraph/sourcegraph/internal/authz/subrepoperms"
"github.com/sourcegraph/sourcegraph/internal/codeintel/syntactic_indexing"
"github.com/sourcegraph/sourcegraph/internal/conf"
"github.com/sourcegraph/sourcegraph/internal/database"
"github.com/sourcegraph/sourcegraph/internal/encryption/keyring"
Expand Down Expand Up @@ -115,6 +116,8 @@ func LoadConfig(registerEnterpriseMigrators oobmigration.RegisterMigratorsFunc)
"codeintel-uploadstore-expirer": codeintel.NewPreciseCodeIntelUploadExpirer(),
"codeintel-package-filter-applicator": codeintel.NewPackagesFilterApplicatorJob(),

"codeintel-syntactic-indexing-scheduler": syntactic_indexing.NewSyntacticindexingSchedulerJob(),

"auth-sourcegraph-operator-cleaner": auth.NewSourcegraphOperatorCleaner(),

"repo-embedding-janitor": repoembeddings.NewRepoEmbeddingJanitorJob(),
Expand Down
25 changes: 16 additions & 9 deletions internal/codeintel/policies/internal/store/configurations.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,35 +107,42 @@ func (s *store) GetConfigurationPolicies(ctx context.Context, opts shared.GetCon
conds = append(conds, sqlf.Sprintf("TRUE"))
}

var a []shared.ConfigurationPolicy
var b int
var policiesBatch []shared.ConfigurationPolicy
var totalCountResult int

err = s.db.WithTransact(ctx, func(tx *basestore.Store) error {
// TODO - standardize counting techniques
totalCount, _, err = basestore.ScanFirstInt(tx.Query(ctx, sqlf.Sprintf(
totalCountQuery := sqlf.Sprintf(
getConfigurationPoliciesCountQuery,
sqlf.Join(conds, "AND"),
)))
)

totalCount, _, err = basestore.ScanFirstInt(tx.Query(ctx, totalCountQuery))
if err != nil {
return err
}
trace.AddEvent("TODO Domain Owner", attribute.Int("totalCount", totalCount))

configurationPolicies, err := scanConfigurationPolicies(tx.Query(ctx, sqlf.Sprintf(
finalQuery := sqlf.Sprintf(
getConfigurationPoliciesLimitedQuery,
sqlf.Join(conds, "AND"),
opts.Limit,
opts.Offset,
)))
)

configurationPolicies, err := scanConfigurationPolicies(tx.Query(ctx, finalQuery))
if err != nil {
return err
}

trace.AddEvent("TODO Domain Owner", attribute.Int("numConfigurationPolicies", len(configurationPolicies)))

a = configurationPolicies
b = totalCount
policiesBatch = configurationPolicies
totalCountResult = totalCount
return nil
})
return a, b, err

return policiesBatch, totalCountResult, err
}

const getConfigurationPoliciesCountQuery = `
Expand Down
6 changes: 3 additions & 3 deletions internal/codeintel/policies/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,14 @@ func (s *Service) CreateConfigurationPolicy(ctx context.Context, configurationPo
return policy, err
}

if err := s.updateReposMatchingPolicyPatterns(ctx, policy); err != nil {
if err := s.UpdateReposMatchingPolicyPatterns(ctx, policy); err != nil {
return policy, err
}

return policy, nil
}

func (s *Service) updateReposMatchingPolicyPatterns(ctx context.Context, policy policiesshared.ConfigurationPolicy) error {
func (s *Service) UpdateReposMatchingPolicyPatterns(ctx context.Context, policy policiesshared.ConfigurationPolicy) error {
var patterns []string
if policy.RepositoryPatterns != nil {
patterns = *policy.RepositoryPatterns
Expand Down Expand Up @@ -96,7 +96,7 @@ func (s *Service) UpdateConfigurationPolicy(ctx context.Context, policy policies
return err
}

return s.updateReposMatchingPolicyPatterns(ctx, policy)
return s.UpdateReposMatchingPolicyPatterns(ctx, policy)
}

func (s *Service) DeleteConfigurationPolicyByID(ctx context.Context, id int) error {
Expand Down
70 changes: 70 additions & 0 deletions internal/codeintel/syntactic_indexing/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
load("//dev:go_defs.bzl", "go_test")
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "syntactic_indexing",
srcs = [
"enqueuer.go",
"scheduler.go",
"scheduler_config.go",
"scheduler_job.go",
],
importpath = "github.com/sourcegraph/sourcegraph/internal/codeintel/syntactic_indexing",
visibility = ["//:__subpackages__"],
deps = [
"//cmd/worker/job",
"//cmd/worker/shared/init/codeintel",
"//cmd/worker/shared/init/db",
"//internal/actor",
"//internal/api",
"//internal/codeintel/autoindexing",
"//internal/codeintel/policies",
"//internal/codeintel/policies/shared",
"//internal/codeintel/reposcheduler",
"//internal/codeintel/shared",
"//internal/codeintel/syntactic_indexing/internal",
"//internal/codeintel/syntactic_indexing/jobstore",
"//internal/codeintel/uploads",
"//internal/collections",
"//internal/conf",
"//internal/database",
"//internal/env",
"//internal/gitserver",
"//internal/goroutine",
"//internal/memo",
"//internal/metrics",
"//internal/observation",
"//lib/errors",
"@com_github_prometheus_client_golang//prometheus",
"@io_opentelemetry_go_otel//attribute",
],
)

go_test(
name = "syntactic_indexing_test",
srcs = [
"enqueuer_test.go",
"scheduler_test.go",
],
embed = [":syntactic_indexing"],
tags = [
TAG_PLATFORM_GRAPH,
"requires-network",
],
deps = [
"//internal/api",
"//internal/codeintel/policies",
"//internal/codeintel/reposcheduler",
"//internal/codeintel/shared",
"//internal/codeintel/syntactic_indexing/internal/testutils",
"//internal/codeintel/syntactic_indexing/jobstore",
"//internal/codeintel/uploads",
"//internal/database",
"//internal/database/dbtest",
"//internal/gitserver",
"//internal/gitserver/gitdomain",
"//internal/observation",
"@com_github_keegancsmith_sqlf//:sqlf",
"@com_github_stretchr_testify//require",
],
)
126 changes: 126 additions & 0 deletions internal/codeintel/syntactic_indexing/enqueuer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package syntactic_indexing

import (
"context"
"fmt"

"github.com/prometheus/client_golang/prometheus"
"github.com/sourcegraph/sourcegraph/internal/api"
"github.com/sourcegraph/sourcegraph/internal/codeintel/reposcheduler"
"github.com/sourcegraph/sourcegraph/internal/codeintel/syntactic_indexing/jobstore"
"github.com/sourcegraph/sourcegraph/internal/database"
"github.com/sourcegraph/sourcegraph/internal/memo"
"github.com/sourcegraph/sourcegraph/internal/metrics"
"github.com/sourcegraph/sourcegraph/internal/observation"
"github.com/sourcegraph/sourcegraph/lib/errors"
"go.opentelemetry.io/otel/attribute"
)

type IndexEnqueuer interface {
QueueIndexingJobs(ctx context.Context, repositoryId api.RepoID, commitId api.CommitID, options EnqueueOptions) (_ []jobstore.SyntacticIndexingJob, err error)
}

type EnqueueOptions struct {
// setting force=true will schedule the job for a
// given pair of (repo, commit) even if it already exists in the queue
force bool
}

type indexEnqueuerImpl struct {
jobStore jobstore.SyntacticIndexingJobStore
repoSchedulingStore reposcheduler.RepositorySchedulingStore
repoStore database.RepoStore
operations *operations
}

var _ IndexEnqueuer = &indexEnqueuerImpl{}

func NewIndexEnqueuer(
observationCtx *observation.Context,
jobStore jobstore.SyntacticIndexingJobStore,
store reposcheduler.RepositorySchedulingStore,
repoStore database.RepoStore,
) IndexEnqueuer {
return &indexEnqueuerImpl{
repoSchedulingStore: store,
repoStore: repoStore,
jobStore: jobStore,
operations: newOperations(observationCtx),
}
}

type operations struct {
queueIndexingJobs *observation.Operation
indexingJobsSkippedCounter prometheus.Counter
}

var (
indexingJobsSkippedCounterMemo = memo.NewMemoizedConstructorWithArg(func(r prometheus.Registerer) (prometheus.Counter, error) {
indexesJobsSkippedCounter := prometheus.NewCounter(prometheus.CounterOpts{
Name: "src_codeintel_dbstore_syntactic_indexing_jobs_skipped",
Help: "The number of codeintel syntactic indexing jobs skipped because they were already queued up",
})
r.MustRegister(indexesJobsSkippedCounter)
return indexesJobsSkippedCounter, nil
})

m = new(metrics.SingletonREDMetrics)
)

func newOperations(observationCtx *observation.Context) *operations {
m := m.Get(func() *metrics.REDMetrics {
return metrics.NewREDMetrics(
observationCtx.Registerer,
"codeintel_syntactic_indexing_enqueuer",
metrics.WithLabels("op"),
metrics.WithCountHelp("Total number of method invocations."),
)
})

op := func(name string) *observation.Operation {
return observationCtx.Operation(observation.Op{
Name: fmt.Sprintf("codeintel.syntactic_indexing.enqueuer.%s", name),
MetricLabelValues: []string{name},
Metrics: m,
})
}

indexingJobsSkippedCounter, _ := indexingJobsSkippedCounterMemo.Init(observationCtx.Registerer)

return &operations{
queueIndexingJobs: op("QueueIndexingJobs"),
indexingJobsSkippedCounter: indexingJobsSkippedCounter,
}
}

// QueueIndexingJobs schedules a syntactic indexing job for a given repositoryID at given revision.
// If options.force = true, then the job will be scheduled even if the same one already exists in the queue.
// This method will return an array of jobs that were actually successfully scheduled.
// The result can be nil iff the same job is already queued AND options.force is false.
func (s *indexEnqueuerImpl) QueueIndexingJobs(ctx context.Context, repositoryID api.RepoID, commitID api.CommitID, options EnqueueOptions) (_ []jobstore.SyntacticIndexingJob, err error) {
ctx, _, endObservation := s.operations.queueIndexingJobs.With(ctx, &err, observation.Args{Attrs: []attribute.KeyValue{
attribute.Int("repositoryID", int(repositoryID)),
attribute.String("commitID", string(commitID)),
}})
defer endObservation(1, observation.Args{})

shouldInsert := true
if !options.force {
isQueued, err := s.jobStore.IsQueued(ctx, repositoryID, commitID)
if err != nil {
return nil, errors.Wrap(err, "dbstore.IsQueued")
}
if isQueued {
s.operations.indexingJobsSkippedCounter.Add(float64(1))
}
shouldInsert = !isQueued
}
if shouldInsert {
return s.jobStore.InsertIndexingJobs(ctx, []jobstore.SyntacticIndexingJob{{
State: jobstore.Queued,
Commit: commitID,
RepositoryID: repositoryID,
}})
}
return nil, nil
}