From d00157ae47393b4f7fc25e93d314b70b953cf84d Mon Sep 17 00:00:00 2001 From: Ryan Slade Date: Wed, 9 Sep 2020 14:22:05 +0200 Subject: [PATCH] Sync single external service (#13483) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * repo-updater: Sync single external service at a time using workerutil Tests are still failing * repoupdater: Fix server_test * testSyncerSyncWithErrors uses SyncExternalService Also removes a test case that tests a sync of muktiple external services as we no longer support that. * Add EnqueueJobs tests * Removed old references to syncer.Sync Code now compiles but a lot of tests still fail. Added a cleanup function that needs to be called when we stop a worker to unregister prometheus metrics so that it doesn't panic when we start another worker in a subsequent test. * Fix repo-updater tests * Delete orphaned repos * Avoid constraints by changing upsert order * Fix minSyncInterval * WIP test sync * Add a test that syncs multiple external services in sequence * WIP test for orphaned repos * Only delete orphaned repos * repo-updater: Remove stale jobs on startup If a job is processing when repo-updater dies that job would never complete. We would then not requeue the associated external service for syncing. To be safe we delete non locked processing rows on startup. * Typo * db: Mark next_synced_at on service save This will cause a sync to be triggered ASAP for the saved external service. On save, we trigger a call to repo-updater which causes us to enqueue any pending sync jobs. As we've just upated next_sync_at the job for the newly saved repo will be queued. * Rename trigger We no longer trigger a full sync but instead trigger enqueueing pending sync jobs. * Remove orphaned repo code We now have a trigger that does the same thing * Improve orphaned repo test * Test external service deletion * Remove orphaned repo code from syncSubset * Remove leftover debug line * Fix deleting of repos when called via syncSubset * Improve EnqueueSyncJobs docstring Co-authored-by: ᴜɴᴋɴᴡᴏɴ * Improve error message Co-authored-by: ᴜɴᴋɴᴡᴏɴ * Check rows.Err() after iteration * Change log line to debug Co-authored-by: ᴜɴᴋɴᴡᴏɴ * Remove leftover debug line * Add stalled jobs resetter * Pass prometheus register to the SyncWorker * Create ResetterMetrics inline * Remove unused field * Fix docstring * Typo * Missing period * Typo * Add failing test * Resolve name conflicts deterministically * Cleanup name conflicts resolution * Resolve name conflicts during renames * Don't wrap nil error * Don't queue jobs for Phabricator services * Handle large name lists in StoreListReposArgs.Names * Trigger rebuild * Remove stale job cleanup code We use the workerutil resetter now instead * Add failing test * Fix predicate on sourcegraphcom mode * Don't sync deleted external services * When in cloud mode DON'T sync admin added external services Co-authored-by: Asdine El Hrychy Co-authored-by: ᴜɴᴋɴᴡᴏɴ --- cmd/repo-updater/repos/integration_test.go | 12 +- cmd/repo-updater/repos/observability.go | 26 + cmd/repo-updater/repos/store.go | 116 ++- cmd/repo-updater/repos/store_test.go | 164 ++- cmd/repo-updater/repos/sync_worker.go | 107 +- cmd/repo-updater/repos/sync_worker_test.go | 11 +- cmd/repo-updater/repos/syncer.go | 425 ++++++-- cmd/repo-updater/repos/syncer_test.go | 955 ++++++++++++++++-- cmd/repo-updater/repos/types.go | 13 +- cmd/repo-updater/repoupdater/server.go | 6 +- cmd/repo-updater/repoupdater/server_test.go | 15 +- cmd/repo-updater/shared/main.go | 24 +- .../repo-updater/authz/perms_syncer_test.go | 4 + internal/db/dbutil/dbutil.go | 5 +- internal/db/external_services.go | 2 +- 15 files changed, 1582 insertions(+), 303 deletions(-) diff --git a/cmd/repo-updater/repos/integration_test.go b/cmd/repo-updater/repos/integration_test.go index 72056f7398c0..1bb949e79072 100644 --- a/cmd/repo-updater/repos/integration_test.go +++ b/cmd/repo-updater/repos/integration_test.go @@ -58,6 +58,7 @@ func TestIntegration(t *testing.T) { {"DBStore/DeleteRepos", testStoreDeleteRepos}, {"DBStore/UpsertRepos", testStoreUpsertRepos}, {"DBStore/UpsertSources", testStoreUpsertSources}, + {"DBStore/EnqueueSyncJobs", testStoreEnqueueSyncJobs(db, dbstore)}, {"DBStore/ListRepos", testStoreListRepos}, {"DBStore/ListRepos/Pagination", testStoreListReposPagination}, {"DBStore/ListExternalRepoSpecs", testStoreListExternalRepoSpecs(db)}, @@ -67,10 +68,19 @@ func TestIntegration(t *testing.T) { {"DBStore/Syncer/SyncWithErrors", testSyncerSyncWithErrors}, {"DBStore/Syncer/SyncSubset", testSyncSubset}, {"DBStore/Syncer/SyncWorker", testSyncWorkerPlumbing(db)}, - // {"DBStore/Syncer/Run", testSyncRun}, + {"DBStore/Syncer/Run", testSyncRun(db)}, + {"DBStore/Syncer/MultipleServices", testSyncer(db)}, + {"DBStore/Syncer/OrphanedRepos", testOrphanedRepo(db)}, + {"DBStore/Syncer/DeleteExternalService", testDeleteExternalService(db)}, + {"DBStore/Syncer/NameConflictDiscardOld", testNameOnConflictDiscardOld(db)}, + {"DBStore/Syncer/NameConflictDiscardNew", testNameOnConflictDiscardNew(db)}, + {"DBStore/Syncer/NameConflictOnRename", testNameOnConflictOnRename(db)}, } { t.Run(tc.name, func(t *testing.T) { t.Cleanup(func() { + if t.Failed() { + return + } if _, err := db.Exec(` DELETE FROM external_service_sync_jobs; DELETE FROM external_service_repos; diff --git a/cmd/repo-updater/repos/observability.go b/cmd/repo-updater/repos/observability.go index 2ae3b647dde3..0725f26efeb3 100644 --- a/cmd/repo-updater/repos/observability.go +++ b/cmd/repo-updater/repos/observability.go @@ -10,6 +10,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/sourcegraph/sourcegraph/internal/api" + "github.com/sourcegraph/sourcegraph/internal/db/dbutil" "github.com/sourcegraph/sourcegraph/internal/logging" "github.com/sourcegraph/sourcegraph/internal/metrics" "github.com/sourcegraph/sourcegraph/internal/trace" @@ -139,6 +140,7 @@ type StoreMetrics struct { ListExternalServices *metrics.OperationMetrics SetClonedRepos *metrics.OperationMetrics CountNotClonedRepos *metrics.OperationMetrics + EnqueueSyncJobs *metrics.OperationMetrics } // MustRegister registers all metrics in StoreMetrics in the given @@ -369,6 +371,17 @@ func (o *ObservedStore) Transact(ctx context.Context) (s TxStore, err error) { }, nil } +// With calls the With method of the underlying store if exists, +// otherwise it returns the store unchanged. +// It implements the WithStore interface. +func (o *ObservedStore) With(db dbutil.DB) Store { + if ws, ok := o.store.(WithStore); ok { + return ws.With(db) + } + + return o.store +} + // Done calls into the inner Store Done method. func (o *ObservedStore) Done(errs ...*error) { tr := o.txtrace @@ -613,6 +626,19 @@ func (o *ObservedStore) CountNotClonedRepos(ctx context.Context) (count uint64, return o.store.CountNotClonedRepos(ctx) } +func (o *ObservedStore) EnqueueSyncJobs(ctx context.Context, ignoreSiteAdmin bool) (err error) { + tr, ctx := o.trace(ctx, "Store.EnqueueSyncJobs") + + defer func(began time.Time) { + secs := time.Since(began).Seconds() + o.metrics.EnqueueSyncJobs.Observe(secs, 0, &err) + tr.SetError(err) + tr.Finish() + }(time.Now()) + + return o.store.EnqueueSyncJobs(ctx, ignoreSiteAdmin) +} + func (o *ObservedStore) trace(ctx context.Context, family string) (*trace.Trace, context.Context) { txctx := o.txctx if txctx == nil { diff --git a/cmd/repo-updater/repos/store.go b/cmd/repo-updater/repos/store.go index af7a6b90d8c3..f331677386a0 100644 --- a/cmd/repo-updater/repos/store.go +++ b/cmd/repo-updater/repos/store.go @@ -27,14 +27,21 @@ type Store interface { ListExternalServices(context.Context, StoreListExternalServicesArgs) ([]*ExternalService, error) UpsertExternalServices(ctx context.Context, svcs ...*ExternalService) error - InsertRepos(context.Context, ...*Repo) error ListRepos(context.Context, StoreListReposArgs) ([]*Repo, error) - ListExternalRepoSpecs(context.Context) (map[api.ExternalRepoSpec]struct{}, error) - DeleteRepos(ctx context.Context, ids ...api.RepoID) error UpsertRepos(ctx context.Context, repos ...*Repo) error + ListExternalRepoSpecs(context.Context) (map[api.ExternalRepoSpec]struct{}, error) UpsertSources(ctx context.Context, inserts, updates, deletes map[api.RepoID][]SourceInfo) error SetClonedRepos(ctx context.Context, repoNames ...string) error CountNotClonedRepos(ctx context.Context) (uint64, error) + + // EnqueueSyncJobs enqueues sync jobs per external service where their next_sync_at is due. + // If ignoreSiteAdmin is true then we only sync user added external services. + EnqueueSyncJobs(ctx context.Context, ignoreSiteAdmin bool) error + + // TODO: These two methods should not be used in production, move them to + // an extension interface that's explicitly for testing. + InsertRepos(context.Context, ...*Repo) error + DeleteRepos(ctx context.Context, ids ...api.RepoID) error } // StoreListReposArgs is a query arguments type used by @@ -205,6 +212,20 @@ func (s *DBStore) Transact(ctx context.Context) (TxStore, error) { }, nil } +// WithStore is a store that can take a db handle and return +// a new Store implementation that uses it. +type WithStore interface { + With(dbutil.DB) Store +} + +// With returns a new store using the given db handle. +// It implements the WithStore interface. +func (s *DBStore) With(db dbutil.DB) Store { + return &DBStore{ + db: db, + } +} + // Done terminates the underlying Tx in a DBStore either by committing or rolling // back based on the value pointed to by the first given error pointer. // It's a no-op if the `DBStore` is not operating within a transaction, @@ -551,7 +572,7 @@ func (s DBStore) DeleteRepos(ctx context.Context, ids ...api.RepoID) error { return nil } - // the number of deleted repos can potentially be higher + // The number of deleted repos can potentially be higher // than the maximum number of arguments we can pass to postgres. // We pass them as a json array instead to overcome this limitation. encodedIds, err := json.Marshal(ids) @@ -584,7 +605,11 @@ AND repo.id = repo_ids.id::int // ListRepos lists all stored repos that match the given arguments. func (s DBStore) ListRepos(ctx context.Context, args StoreListReposArgs) (repos []*Repo, _ error) { - return repos, s.paginate(ctx, args.Limit, args.PerPage, 0, listReposQuery(args), + listQuery, err := listReposQuery(args) + if err != nil { + return nil, errors.Wrap(err, "creating list repos query function") + } + return repos, s.paginate(ctx, args.Limit, args.PerPage, 0, listQuery, func(sc scanner) (last, count int64, err error) { var r Repo if err := scanRepo(&r, sc); err != nil { @@ -642,15 +667,15 @@ AND %s ORDER BY id ASC LIMIT %s ` -func listReposQuery(args StoreListReposArgs) paginatedQuery { +func listReposQuery(args StoreListReposArgs) (paginatedQuery, error) { var preds []*sqlf.Query if len(args.Names) > 0 { - ns := make([]*sqlf.Query, 0, len(args.Names)) - for _, name := range args.Names { - ns = append(ns, sqlf.Sprintf("%s", name)) + encodedNames, err := json.Marshal(args.Names) + if err != nil { + return nil, errors.Wrap(err, "marshalling name args") } - preds = append(preds, sqlf.Sprintf("name IN (%s)", sqlf.Join(ns, ","))) + preds = append(preds, sqlf.Sprintf("name IN (SELECT jsonb_array_elements_text(%s) AS id)", encodedNames)) } if len(args.IDs) > 0 { @@ -715,7 +740,7 @@ func listReposQuery(args StoreListReposArgs) paginatedQuery { joinFilter, limit, ) - } + }, nil } func (s DBStore) ListExternalRepoSpecs(ctx context.Context) (map[api.ExternalRepoSpec]struct{}, error) { @@ -1061,6 +1086,75 @@ func (s *DBStore) UpsertRepos(ctx context.Context, repos ...*Repo) (err error) { return nil } +func (s *DBStore) EnqueueSyncJobs(ctx context.Context, ignoreSiteAdmin bool) error { + filter := "TRUE" + if ignoreSiteAdmin { + filter = "namespace_user_id IS NOT NULL" + } + q := sqlf.Sprintf(enqueueSyncJobsQueryFmtstr, sqlf.Sprintf(filter)) + _, err := s.db.ExecContext(ctx, q.Query(sqlf.PostgresBindVar), q.Args()...) + return err +} + +// We ignore Phabricator repos here as they are currently synced using +// RunPhabricatorRepositorySyncWorker +const enqueueSyncJobsQueryFmtstr = ` +WITH due AS ( + SELECT id + FROM external_services + WHERE (next_sync_at <= clock_timestamp() OR next_sync_at IS NULL) + AND deleted_at IS NULL + AND LOWER(kind) != 'phabricator' + AND %s +), +busy AS ( + SELECT DISTINCT external_service_id id FROM external_service_sync_jobs + WHERE state = 'queued' + OR state = 'processing' +) +INSERT INTO external_service_sync_jobs (external_service_id) +SELECT id from due EXCEPT SELECT id from busy +` + +// ListSyncJobs returns all sync jobs. +func (s *DBStore) ListSyncJobs(ctx context.Context) ([]SyncJob, error) { + rows, err := s.db.QueryContext(ctx, "SELECT * FROM external_service_sync_jobs_with_next_sync_at") + if err != nil { + return nil, err + } + defer rows.Close() + return scanJobs(rows) +} + +func scanJobs(rows *sql.Rows) ([]SyncJob, error) { + var jobs []SyncJob + + for rows.Next() { + var job SyncJob + if err := rows.Scan( + &job.ID, + &job.State, + &job.FailureMessage, + &job.StartedAt, + &job.FinishedAt, + &job.ProcessAfter, + &job.NumResets, + &job.NumFailures, + &job.ExternalServiceID, + &job.NextSyncAt, + ); err != nil { + return nil, err + } + + jobs = append(jobs, job) + } + if err := rows.Err(); err != nil { + return nil, err + } + + return jobs, nil +} + func batchReposQuery(fmtstr string, repos []*Repo) (_ *sqlf.Query, err error) { records := make([]*repoRecord, 0, len(repos)) for _, r := range repos { diff --git a/cmd/repo-updater/repos/store_test.go b/cmd/repo-updater/repos/store_test.go index 027832261fc3..8274aaac047b 100644 --- a/cmd/repo-updater/repos/store_test.go +++ b/cmd/repo-updater/repos/store_test.go @@ -227,7 +227,7 @@ func testStoreListExternalServices(userID int32) func(*testing.T, repos.Store) f }, testCase{ name: "results are in ascending order by id", - stored: mkExternalServices(7, svcs...), + stored: generateExternalServices(7, svcs...), assert: repos.Assert.ExternalServicesOrderedBy( func(a, b *repos.ExternalService) bool { return a.ID < b.ID @@ -423,7 +423,7 @@ func testStoreUpsertExternalServices(t *testing.T, store repos.Store) func(*test }) t.Run("many external services", transact(ctx, store, func(t testing.TB, tx repos.Store) { - want := mkExternalServices(7, svcs...) + want := generateExternalServices(7, svcs...) if err := tx.UpsertExternalServices(ctx, want...); err != nil { t.Fatalf("UpsertExternalServices error: %s", err) @@ -1897,6 +1897,124 @@ func testSyncRateLimiters(t *testing.T, store repos.Store) func(*testing.T) { } } +func testStoreEnqueueSyncJobs(db *sql.DB, store *repos.DBStore) func(t *testing.T, store repos.Store) func(*testing.T) { + return func(t *testing.T, _ repos.Store) func(*testing.T) { + t.Helper() + + clock := repos.NewFakeClock(time.Now(), 0) + now := clock.Now() + + services := generateExternalServices(10, mkExternalServices(now)...) + + type testCase struct { + name string + stored repos.ExternalServices + queued func(repos.ExternalServices) []int64 + ignoreSiteAdmin bool + err error + } + + var testCases []testCase + + testCases = append(testCases, testCase{ + name: "enqueue everything", + stored: services.With(func(s *repos.ExternalService) { + s.NextSyncAt = now.Add(-10 * time.Second) + }), + queued: func(svcs repos.ExternalServices) []int64 { return svcs.IDs() }, + }) + + testCases = append(testCases, testCase{ + name: "nothing to enqueue", + stored: services.With(func(s *repos.ExternalService) { + s.NextSyncAt = now.Add(10 * time.Second) + }), + queued: func(svcs repos.ExternalServices) []int64 { return []int64{} }, + }) + + testCases = append(testCases, testCase{ + name: "ignore siteadmin repos", + stored: services.With(func(s *repos.ExternalService) { + s.NextSyncAt = now.Add(10 * time.Second) + }), + ignoreSiteAdmin: true, + queued: func(svcs repos.ExternalServices) []int64 { return []int64{} }, + }) + + { + i := 0 + testCases = append(testCases, testCase{ + name: "some to enqueue", + stored: services.With(func(s *repos.ExternalService) { + if i%2 == 0 { + s.NextSyncAt = now.Add(10 * time.Second) + } else { + s.NextSyncAt = now.Add(-10 * time.Second) + } + i++ + }), + queued: func(svcs repos.ExternalServices) []int64 { + var ids []int64 + for i := range svcs { + if i%2 != 0 { + ids = append(ids, svcs[i].ID) + } + } + return ids + }, + }) + } + + return func(t *testing.T) { + ctx := context.Background() + + for _, tc := range testCases { + tc := tc + + t.Run(tc.name, func(t *testing.T) { + t.Cleanup(func() { + if _, err := db.ExecContext(ctx, "DELETE FROM external_service_sync_jobs;DELETE FROM external_services"); err != nil { + t.Fatal(err) + } + }) + stored := tc.stored.Clone() + + if err := store.UpsertExternalServices(ctx, stored...); err != nil { + t.Fatalf("failed to setup store: %v", err) + } + + err := store.EnqueueSyncJobs(ctx, tc.ignoreSiteAdmin) + if have, want := fmt.Sprint(err), fmt.Sprint(tc.err); have != want { + t.Errorf("error:\nhave: %v\nwant: %v", have, want) + } + + jobs, err := store.ListSyncJobs(ctx) + if err != nil { + t.Fatal(err) + } + + gotIDs := make([]int64, 0, len(jobs)) + for _, job := range jobs { + gotIDs = append(gotIDs, job.ExternalServiceID) + } + + want := tc.queued(stored) + sort.Slice(gotIDs, func(i, j int) bool { + return gotIDs[i] < gotIDs[j] + }) + sort.Slice(want, func(i, j int) bool { + return want[i] < want[j] + }) + + if diff := cmp.Diff(want, gotIDs); diff != "" { + t.Fatal(diff) + } + }) + } + } + } +} + func testDBStoreTransact(store *repos.DBStore) func(*testing.T) { return func(t *testing.T) { ctx := context.Background() @@ -1932,7 +2050,7 @@ func mkRepos(n int, base ...*repos.Repo) repos.Repos { return rs } -func mkExternalServices(n int, base ...*repos.ExternalService) repos.ExternalServices { +func generateExternalServices(n int, base ...*repos.ExternalService) repos.ExternalServices { if len(base) == 0 { return nil } @@ -1996,6 +2114,27 @@ func createExternalServices(t *testing.T, store repos.Store) map[string]*repos.E clock := repos.NewFakeClock(time.Now(), 0) now := clock.Now() + svcs := mkExternalServices(now) + + // create a few external services + if err := store.UpsertExternalServices(context.Background(), svcs...); err != nil { + t.Fatalf("failed to insert external services: %v", err) + } + + services, err := store.ListExternalServices(context.Background(), repos.StoreListExternalServicesArgs{}) + if err != nil { + t.Fatal("failed to list external services") + } + + servicesPerKind := make(map[string]*repos.ExternalService) + for _, svc := range services { + servicesPerKind[svc.Kind] = svc + } + + return servicesPerKind +} + +func mkExternalServices(now time.Time) repos.ExternalServices { githubSvc := repos.ExternalService{ Kind: extsvc.KindGitHub, DisplayName: "Github - Test", @@ -2052,7 +2191,7 @@ func createExternalServices(t *testing.T, store repos.Store) map[string]*repos.E UpdatedAt: now, } - svcs := []*repos.ExternalService{ + return []*repos.ExternalService{ &githubSvc, &gitlabSvc, &bitbucketServerSvc, @@ -2061,21 +2200,4 @@ func createExternalServices(t *testing.T, store repos.Store) map[string]*repos.E &otherSvc, &gitoliteSvc, } - - // create a few external services - if err := store.UpsertExternalServices(context.Background(), svcs...); err != nil { - t.Fatalf("failed to insert external services: %v", err) - } - - services, err := store.ListExternalServices(context.Background(), repos.StoreListExternalServicesArgs{}) - if err != nil { - t.Fatal("failed to list external services") - } - - servicesPerKind := make(map[string]*repos.ExternalService) - for _, svc := range services { - servicesPerKind[svc.Kind] = svc - } - - return servicesPerKind } diff --git a/cmd/repo-updater/repos/sync_worker.go b/cmd/repo-updater/repos/sync_worker.go index 80ad2855cf8d..a67c6d0a3c1f 100644 --- a/cmd/repo-updater/repos/sync_worker.go +++ b/cmd/repo-updater/repos/sync_worker.go @@ -3,7 +3,7 @@ package repos import ( "context" "database/sql" - "errors" + "github.com/prometheus/client_golang/prometheus/promauto" "time" "github.com/inconshreveable/log15" @@ -21,8 +21,21 @@ import ( "github.com/sourcegraph/sourcegraph/internal/workerutil/dbworker/store" ) +type SyncWorkerOptions struct { + NumHandlers int // defaults to 3 + WorkerInterval time.Duration // defaults to 10s + PrometheusRegisterer prometheus.Registerer // if non-nil, metrics will be collected +} + // NewSyncWorker creates a new external service sync worker. -func NewSyncWorker(ctx context.Context, db dbutil.DB, handler dbworker.Handler, numHandlers int) *workerutil.Worker { +func NewSyncWorker(ctx context.Context, db dbutil.DB, handler dbworker.Handler, opts SyncWorkerOptions) (*workerutil.Worker, *dbworker.Resetter) { + if opts.NumHandlers == 0 { + opts.NumHandlers = 3 + } + if opts.WorkerInterval == 0 { + opts.WorkerInterval = 10 * time.Second + } + dbHandle := basestore.NewHandleWithDB(db) syncJobColumns := append(store.DefaultColumnExpressions(), []*sqlf.Query{ @@ -33,31 +46,44 @@ func NewSyncWorker(ctx context.Context, db dbutil.DB, handler dbworker.Handler, store := store.NewStore(dbHandle, store.StoreOptions{ TableName: "external_service_sync_jobs", ViewName: "external_service_sync_jobs_with_next_sync_at", - Scan: scanSyncJob, + Scan: scanSingleJob, OrderByExpression: sqlf.Sprintf("next_sync_at"), ColumnExpressions: syncJobColumns, StalledMaxAge: 30 * time.Second, - // Zero for now as we expect errors to be transient - MaxNumResets: 0, - MaxNumRetries: 0, + MaxNumResets: 5, + MaxNumRetries: 0, }) - return dbworker.NewWorker(ctx, store, dbworker.WorkerOptions{ - Name: "sync_worker", + worker := dbworker.NewWorker(ctx, store, dbworker.WorkerOptions{ + Name: "repo_sync_worker", Handler: handler, - NumHandlers: numHandlers, - Interval: 1 * time.Minute, + NumHandlers: opts.NumHandlers, + Interval: opts.WorkerInterval, Metrics: workerutil.WorkerMetrics{ - HandleOperation: newObservationOperation(), + HandleOperation: newObservationOperation(opts.PrometheusRegisterer), }, }) + + resetter := dbworker.NewResetter(store, dbworker.ResetterOptions{ + Name: "sync-worker", + Interval: 5 * time.Minute, + Metrics: newResetterMetrics(opts.PrometheusRegisterer), + }) + + return worker, resetter } -func newObservationOperation() *observation.Operation { - observationContext := &observation.Context{ - Logger: log15.Root(), - Tracer: &trace.Tracer{Tracer: opentracing.GlobalTracer()}, - Registerer: prometheus.DefaultRegisterer, +func newObservationOperation(r prometheus.Registerer) *observation.Operation { + var observationContext *observation.Context + + if r == nil { + observationContext = &observation.TestContext + } else { + observationContext = &observation.Context{ + Logger: log15.Root(), + Tracer: &trace.Tracer{Tracer: opentracing.GlobalTracer()}, + Registerer: r, + } } m := metrics.NewOperationMetrics( @@ -74,28 +100,37 @@ func newObservationOperation() *observation.Operation { }) } -func scanSyncJob(rows *sql.Rows, err error) (workerutil.Record, bool, error) { +func newResetterMetrics(r prometheus.Registerer) dbworker.ResetterMetrics { + return dbworker.ResetterMetrics{ + RecordResets: promauto.With(r).NewCounter(prometheus.CounterOpts{ + Name: "src_external_service_queue_resets_total", + Help: "Total number of external services put back into queued state", + }), + RecordResetFailures: promauto.With(r).NewCounter(prometheus.CounterOpts{ + Name: "src_external_service_queue_max_resets_total", + Help: "Total number of external services that exceed the max number of resets", + }), + Errors: promauto.With(r).NewCounter(prometheus.CounterOpts{ + Name: "src_external_service_queue_reset_errors_total", + Help: "Total number of errors when running the external service resetter", + }), + } +} + +func scanSingleJob(rows *sql.Rows, err error) (workerutil.Record, bool, error) { + if err != nil { + return nil, false, err + } + + jobs, err := scanJobs(rows) if err != nil { return nil, false, err } var job SyncJob - for rows.Next() { - if err := rows.Scan( - &job.ID, - &job.State, - &job.FailureMessage, - &job.StartedAt, - &job.FinishedAt, - &job.ProcessAfter, - &job.NumResets, - &job.NumFailures, - &job.ExternalServiceID, - &job.NextSyncAt, - ); err != nil { - return nil, false, err - } + if len(jobs) > 0 { + job = jobs[0] } return &job, true, nil @@ -119,11 +154,3 @@ type SyncJob struct { func (s *SyncJob) RecordID() int { return s.ID } - -type syncHandler struct{} - -func (h *syncHandler) Handle(ctx context.Context, tx workerutil.Store, record workerutil.Record) error { - // Temporary handler which will be implemented once we have implemented code to sync a single - // external service - return errors.New("TODO") -} diff --git a/cmd/repo-updater/repos/sync_worker_test.go b/cmd/repo-updater/repos/sync_worker_test.go index dbd09967a8c4..d1ed005a4ffd 100644 --- a/cmd/repo-updater/repos/sync_worker_test.go +++ b/cmd/repo-updater/repos/sync_worker_test.go @@ -50,14 +50,19 @@ func testSyncWorkerPlumbing(db *sql.DB) func(t *testing.T, repoStore repos.Store h := &fakeRepoSyncHandler{ jobChan: jobChan, } - worker := repos.NewSyncWorker(ctx, db, h, 1) + worker, resetter := repos.NewSyncWorker(ctx, db, h, repos.SyncWorkerOptions{ + NumHandlers: 1, + WorkerInterval: 1 * time.Millisecond, + }) go worker.Start() + go resetter.Start() // There is a race between the worker being stopped and the worker util // finalising the row which means that when running tests in verbose mode we'll // see "sql: transaction has already been committed or rolled back". These // errors can be ignored. defer worker.Stop() + defer resetter.Stop() var job *repos.SyncJob select { @@ -79,14 +84,14 @@ type fakeRepoSyncHandler struct { } func (h *fakeRepoSyncHandler) Handle(ctx context.Context, tx dbws.Store, record workerutil.Record) error { - r, ok := record.(*repos.SyncJob) + sj, ok := record.(*repos.SyncJob) if !ok { return fmt.Errorf("expected repos.SyncJob, got %T", record) } select { case <-ctx.Done(): return ctx.Err() - case h.jobChan <- r: + case h.jobChan <- sj: return nil } } diff --git a/cmd/repo-updater/repos/syncer.go b/cmd/repo-updater/repos/syncer.go index 77832c78ac1d..f455a67e5c3e 100644 --- a/cmd/repo-updater/repos/syncer.go +++ b/cmd/repo-updater/repos/syncer.go @@ -2,6 +2,8 @@ package repos import ( "context" + "database/sql" + "fmt" "sort" "strconv" "strings" @@ -11,19 +13,18 @@ import ( "github.com/inconshreveable/log15" otlog "github.com/opentracing/opentracing-go/log" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" "github.com/sourcegraph/sourcegraph/internal/api" "github.com/sourcegraph/sourcegraph/internal/trace" + "github.com/sourcegraph/sourcegraph/internal/workerutil" + dbworkerstore "github.com/sourcegraph/sourcegraph/internal/workerutil/dbworker/store" ) // A Syncer periodically synchronizes available repositories from all its given Sources // with the stored Repositories in Sourcegraph. type Syncer struct { - Store Store Sourcer Sourcer - - // FailFullSync prevents Sync from running. This should only be true for - // Sourcegraph.com - FailFullSync bool + Worker *workerutil.Worker // Synced is sent a collection of Repos that were synced by Sync (only if Synced is non-nil) Synced chan Diff @@ -37,27 +38,63 @@ type Syncer struct { // Now is time.Now. Can be set by tests to get deterministic output. Now func() time.Time - // lastSyncErr contains the last error returned by the Sourcer in each - // Sync. It's reset with each Sync and if the sync produced no error, it's + // syncErrors contains the last error returned by the Sourcer in each + // Sync per external service. It's reset with each service Sync and if the sync produced no error, it's // set to nil. - lastSyncErr error - lastSyncErrMu sync.Mutex + syncErrors map[int64]error + syncErrorsMu sync.Mutex + + enqueueSignal signal +} - syncSignal signal +// RunOptions contains options customizing Run behaviour. +type RunOptions struct { + EnqueueInterval func() time.Duration // Defaults to 1 minute + IsCloud bool // Defaults to false + MinSyncInterval time.Duration // Defaults to 1 minute + DequeueInterval time.Duration // Default to 10 seconds + PrometheusRegisterer prometheus.Registerer // if non-nil, metrics will be collected } // Run runs the Sync at the specified interval. -func (s *Syncer) Run(pctx context.Context, interval func() time.Duration) error { - s.initialUnmodifiedDiffFromStore(pctx) +func (s *Syncer) Run(pctx context.Context, db *sql.DB, store Store, opts RunOptions) error { + if opts.EnqueueInterval == nil { + opts.EnqueueInterval = func() time.Duration { return time.Minute } + } + if opts.MinSyncInterval == 0 { + opts.MinSyncInterval = time.Minute + } + if opts.DequeueInterval == 0 { + opts.DequeueInterval = 10 * time.Second + } + + s.initialUnmodifiedDiffFromStore(pctx, store) + + // TODO: Make numHandlers configurable + worker, resetter := NewSyncWorker(pctx, db, &syncHandler{ + syncer: s, + store: store, + minSyncInterval: opts.MinSyncInterval, + }, SyncWorkerOptions{ + WorkerInterval: opts.DequeueInterval, + NumHandlers: 3, + PrometheusRegisterer: opts.PrometheusRegisterer, + }) + + go worker.Start() + defer worker.Stop() + + go resetter.Start() + defer resetter.Stop() for pctx.Err() == nil { - ctx, cancel := contextWithSignalCancel(pctx, s.syncSignal.Watch()) + ctx, cancel := contextWithSignalCancel(pctx, s.enqueueSignal.Watch()) - if err := s.Sync(ctx); err != nil && s.Logger != nil { - s.Logger.Error("Syncer", "error", err) + if err := store.EnqueueSyncJobs(ctx, opts.IsCloud); err != nil { + s.Logger.Error("Enqueuing sync jobs", "error", err) } - sleep(ctx, interval()) + sleep(ctx, opts.EnqueueInterval()) cancel() } @@ -65,6 +102,26 @@ func (s *Syncer) Run(pctx context.Context, interval func() time.Duration) error return pctx.Err() } +type syncHandler struct { + syncer *Syncer + store Store + minSyncInterval time.Duration +} + +func (s *syncHandler) Handle(ctx context.Context, tx dbworkerstore.Store, record workerutil.Record) error { + sj, ok := record.(*SyncJob) + if !ok { + return fmt.Errorf("expected repos.SyncJob, got %T", record) + } + + store := s.store + if ws, ok := s.store.(WithStore); ok { + store = ws.With(tx.Handle().DB()) + } + + return s.syncer.SyncExternalService(ctx, store, sj.ExternalServiceID, s.minSyncInterval) +} + // contextWithSignalCancel will return a context which will be cancelled if // signal fires. Callers need to call cancel when done. func contextWithSignalCancel(ctx context.Context, signal <-chan struct{}) (context.Context, context.CancelFunc) { @@ -89,69 +146,113 @@ func sleep(ctx context.Context, d time.Duration) { } } -// TriggerSync will run Sync now. If a sync is currently running it is -// cancelled. -func (s *Syncer) TriggerSync() { - s.syncSignal.Trigger() +// TriggerEnqueueSyncJobs will enqueue any pending sync jobs now. +func (s *Syncer) TriggerEnqueueSyncJobs() { + s.enqueueSignal.Trigger() } -// Sync synchronizes the repositories. -func (s *Syncer) Sync(ctx context.Context) (err error) { +// SyncExternalService syncs repos using the supplied external service. +func (s *Syncer) SyncExternalService(ctx context.Context, store Store, externalServiceID int64, minSyncInterval time.Duration) (err error) { var diff Diff - ctx, save := s.observe(ctx, "Syncer.Sync", "") + log15.Debug("Syncing external service", "serviceID", externalServiceID) + ctx, save := s.observe(ctx, "Syncer.SyncExternalService", "") defer save(&diff, &err) - defer s.setOrResetLastSyncErr(&err) - - if s.FailFullSync { - return errors.New("Syncer is not enabled") - } + defer s.setOrResetLastSyncErr(externalServiceID, &err) var streamingInserter func(*Repo) if s.SubsetSynced == nil { streamingInserter = func(*Repo) {} //noop } else { - streamingInserter, err = s.makeNewRepoInserter(ctx) + streamingInserter, err = s.makeNewRepoInserter(ctx, store) if err != nil { return errors.Wrap(err, "syncer.sync.streaming") } } + ids := []int64{externalServiceID} + svcs, err := store.ListExternalServices(ctx, StoreListExternalServicesArgs{IDs: ids}) + if err != nil { + return errors.Wrap(err, "fetching external services") + } + + if len(svcs) != 1 { + return errors.Errorf("want 1 external service but got %d", len(svcs)) + } + svc := svcs[0] + + // Fetch repos from the source var sourced Repos - if sourced, err = s.sourced(ctx, streamingInserter); err != nil { + if sourced, err = s.sourced(ctx, svcs, streamingInserter); err != nil { return errors.Wrap(err, "syncer.sync.sourced") } - store := s.Store - if tr, ok := s.Store.(Transactor); ok { - var txs TxStore - if txs, err = tr.Transact(ctx); err != nil { - return errors.Wrap(err, "syncer.sync.transact") - } - defer txs.Done(&err) - store = txs + var serviceRepos Repos + // Fetch repos from our DB related to externalServiceID + if serviceRepos, err = store.ListRepos(ctx, StoreListReposArgs{ExternalServiceID: externalServiceID}); err != nil { + return errors.Wrap(err, "syncer.sync.store.list-repos") } - var stored Repos - if stored, err = store.ListRepos(ctx, StoreListReposArgs{}); err != nil { + // Now fetch any possible name conflicts. + // Repo names must be globally unique, if there's conflict we need to deterministically choose one. + var conflicting Repos + if conflicting, err = store.ListRepos(ctx, StoreListReposArgs{Names: sourced.Names()}); err != nil { return errors.Wrap(err, "syncer.sync.store.list-repos") } + conflicting = conflicting.Filter(func(r *Repo) bool { + for _, id := range r.ExternalServiceIDs() { + if id == externalServiceID { + return false + } + } - // NewDiff modifies the stored slice so we clone it before passing it - storedCopy := stored.Clone() + return true + }) + + // Add the conflicts to the list of repos fetched from the db. + // NewDiff modifies the serviceRepos slice so we clone it before passing it + serviceReposAndConflicting := append(serviceRepos.Clone(), conflicting...) - diff = NewDiff(sourced, stored) + // Find the diff associated with only the currently syncing external service. + diff = newDiff(svc, sourced, serviceRepos) + resolveNameConflicts(&diff, conflicting) upserts := s.upserts(diff) + // Delete from external_service_repos only. Deletes need to happen first so that we don't end up with + // constraint violations later. + // The trigger 'trig_soft_delete_orphan_repo_by_external_service_repo' will run + // and remove any repos that no longer have any rows in the external_service_repos + // table. + sdiff := s.sourcesUpserts(&diff, serviceReposAndConflicting) + if err = store.UpsertSources(ctx, nil, nil, sdiff.Deleted); err != nil { + return errors.Wrap(err, "syncer.sync.store.delete-sources") + } + + // Next, insert or modify existing repos. This is needed so that the next call + // to UpsertSources has valid repo ids if err = store.UpsertRepos(ctx, upserts...); err != nil { return errors.Wrap(err, "syncer.sync.store.upsert-repos") } - sdiff := s.sourcesUpserts(&diff, storedCopy) - if err = store.UpsertSources(ctx, sdiff.Added, sdiff.Modified, sdiff.Deleted); err != nil { + // Only modify added and modified relationships in external_service_repos, deleted was + // handled above + // Recalculate sdiff so that we have foreign keys + sdiff = s.sourcesUpserts(&diff, serviceReposAndConflicting) + if err = store.UpsertSources(ctx, sdiff.Added, sdiff.Modified, nil); err != nil { return errors.Wrap(err, "syncer.sync.store.upsert-sources") } + now := s.Now() + interval := calcSyncInterval(now, svc.LastSyncAt, minSyncInterval, diff) + log15.Info("Synced external service", "id", externalServiceID, "backoff duration", interval) + svc.NextSyncAt = now.Add(interval) + svc.LastSyncAt = now + + err = store.UpsertExternalServices(ctx, svc) + if err != nil { + return errors.Wrap(err, "upserting external service") + } + if s.Synced != nil { select { case s.Synced <- diff: @@ -162,10 +263,87 @@ func (s *Syncer) Sync(ctx context.Context) (err error) { return nil } +// We need to resolve name conflicts by deciding whether to keep the newly added repo +// or the repo that already exists in the db. +// If the new repo wins, then the old repo is added to the diff.Deleted slice. +// If the old repo wins, then the new repo is no longer inserted and is filtered out from +// the diff.Added slice. +func resolveNameConflicts(diff *Diff, conflicting Repos) { + var toDelete Repos + diff.Added = diff.Added.Filter(func(r *Repo) bool { + for _, cr := range conflicting { + if cr.Name == r.Name { + // The repos are conflicting, we deterministically choose the one + // that has the smallest external repo spec. + switch cr.ExternalRepo.Compare(r.ExternalRepo) { + case -1: + // the repo that is currently existing in the database wins + // causing the new one to be filtered out + return false + case 1: + // the new repo wins so the old repo is deleted along with all of its relationships. + toDelete = append(toDelete, cr.With(func(r *Repo) { r.Sources = nil })) + } + + return true + } + } + + return true + }) + diff.Modified = diff.Modified.Filter(func(r *Repo) bool { + for _, cr := range conflicting { + if cr.Name == r.Name { + // The repos are conflicting, we deterministically choose the one + // that has the smallest external repo spec. + switch cr.ExternalRepo.Compare(r.ExternalRepo) { + case -1: + // the repo that is currently existing in the database wins + // causing the new one to be filtered out + toDelete = append(toDelete, r.With(func(r *Repo) { r.Sources = nil })) + return false + case 1: + // the new repo wins so the old repo is deleted along with all of its relationships. + toDelete = append(toDelete, cr.With(func(r *Repo) { r.Sources = nil })) + } + + return true + } + } + + return true + }) + diff.Deleted = append(diff.Deleted, toDelete...) +} + +func calcSyncInterval(now time.Time, lastSync time.Time, minSyncInterval time.Duration, diff Diff) time.Duration { + const maxSyncInterval = 8 * time.Hour + + // Special case, we've never synced + if lastSync.IsZero() { + return minSyncInterval + } + + // If there is any change, sync again shortly + if len(diff.Added) > 0 || len(diff.Deleted) > 0 || len(diff.Modified) > 0 { + return minSyncInterval + } + + // No change, back off + interval := now.Sub(lastSync) * 2 + if interval < minSyncInterval { + return minSyncInterval + } + if interval > maxSyncInterval { + return maxSyncInterval + } + return interval +} + // SyncSubset runs the syncer on a subset of the stored repositories. It will // only sync the repositories with the same name or external service spec as // sourcedSubset repositories. -func (s *Syncer) SyncSubset(ctx context.Context, sourcedSubset ...*Repo) (err error) { +func (s *Syncer) SyncSubset(ctx context.Context, store Store, sourcedSubset ...*Repo) (err error) { var diff Diff ctx, save := s.observe(ctx, "Syncer.SyncSubset", strings.Join(Repos(sourcedSubset).Names(), " ")) @@ -175,37 +353,36 @@ func (s *Syncer) SyncSubset(ctx context.Context, sourcedSubset ...*Repo) (err er return nil } - diff, err = s.syncSubset(ctx, false, sourcedSubset...) + if tr, ok := store.(Transactor); ok { + var txs TxStore + if txs, err = tr.Transact(ctx); err != nil { + return errors.Wrap(err, "Syncer.SyncSubset.transact") + } + defer txs.Done(&err) + store = txs + } + + diff, err = s.syncSubset(ctx, store, false, sourcedSubset...) return err } // insertIfNew is a specialization of SyncSubset. It will insert sourcedRepo // if there are no related repositories, otherwise does nothing. -func (s *Syncer) insertIfNew(ctx context.Context, sourcedRepo *Repo) (err error) { +func (s *Syncer) insertIfNew(ctx context.Context, store Store, sourcedRepo *Repo) (err error) { var diff Diff ctx, save := s.observe(ctx, "Syncer.InsertIfNew", sourcedRepo.Name) defer save(&diff, &err) - diff, err = s.syncSubset(ctx, true, sourcedRepo) + diff, err = s.syncSubset(ctx, store, true, sourcedRepo) return err } -func (s *Syncer) syncSubset(ctx context.Context, insertOnly bool, sourcedSubset ...*Repo) (diff Diff, err error) { +func (s *Syncer) syncSubset(ctx context.Context, store Store, insertOnly bool, sourcedSubset ...*Repo) (diff Diff, err error) { if insertOnly && len(sourcedSubset) != 1 { return Diff{}, errors.Errorf("syncer.syncsubset.insertOnly can only handle one sourced repo, given %d repos", len(sourcedSubset)) } - store := s.Store - if tr, ok := s.Store.(Transactor); ok { - var txs TxStore - if txs, err = tr.Transact(ctx); err != nil { - return Diff{}, errors.Wrap(err, "syncer.syncsubset.transact") - } - defer txs.Done(&err) - store = txs - } - var storedSubset Repos args := StoreListReposArgs{ Names: Repos(sourcedSubset).Names(), @@ -224,14 +401,36 @@ func (s *Syncer) syncSubset(ctx context.Context, insertOnly bool, sourcedSubset storedCopy := storedSubset.Clone() diff = NewDiff(sourcedSubset, storedSubset) - upserts := s.upserts(diff) + // We trust that if we determine that a repo needs to be deleted it should be deleted + // from all external services. By setting sources to nil this is forced when we call + // UpsertSources below. + for i := range diff.Deleted { + diff.Deleted[i].Sources = nil + } + + // Delete from external_service_repos only. Deletes need to happen first so that we don't end up with + // constraint violations later. + // The trigger 'trig_soft_delete_orphan_repo_by_external_service_repo' will run + // and remove any repos that no longer have any rows in the external_service_repos + // table. + sdiff := s.sourcesUpserts(&diff, storedCopy) + if err = store.UpsertSources(ctx, nil, nil, sdiff.Deleted); err != nil { + return Diff{}, errors.Wrap(err, "syncer.syncsubset.store.delete-sources") + } + + // Next, insert or modify existing repos. This is needed so that the next call + // to UpsertSources has valid repo ids + upserts := s.upserts(diff) if err = store.UpsertRepos(ctx, upserts...); err != nil { return Diff{}, errors.Wrap(err, "syncer.syncsubset.store.upsert-repos") } - sdiff := s.sourcesUpserts(&diff, storedCopy) - if err = store.UpsertSources(ctx, sdiff.Added, sdiff.Modified, sdiff.Deleted); err != nil { + // Only modify added and modified relationships in external_service_repos, deleted was + // handled above. + // Recalculate sdiff so that we have foreign keys + sdiff = s.sourcesUpserts(&diff, storedCopy) + if err = store.UpsertSources(ctx, sdiff.Added, sdiff.Modified, nil); err != nil { return Diff{}, errors.Wrap(err, "syncer.syncsubset.store.upsert-sources") } @@ -247,13 +446,7 @@ func (s *Syncer) syncSubset(ctx context.Context, insertOnly bool, sourcedSubset func (s *Syncer) upserts(diff Diff) []*Repo { now := s.Now() - upserts := make([]*Repo, 0, len(diff.Added)+len(diff.Deleted)+len(diff.Modified)) - - for _, repo := range diff.Deleted { - repo.UpdatedAt, repo.DeletedAt = now, now - repo.Sources = map[string]*SourceInfo{} - upserts = append(upserts, repo) - } + upserts := make([]*Repo, 0, len(diff.Added)+len(diff.Modified)) for _, repo := range diff.Modified { repo.UpdatedAt, repo.DeletedAt = now, time.Time{} @@ -303,10 +496,17 @@ func (s *Syncer) sourcesUpserts(diff *Diff, stored []*Repo) *sourceDiff { } } - // When a repository is deleted, a Postgres function is - // triggered to automatically to delete the source, - // we don't need to do anything here. - // See the trigger `trig_soft_delete_repo_reference_on_external_service_repos` defined in `external_services` table. + // When a repository is deleted, check if its source map + // has been modified, and if so compute the diff. + for _, repo := range diff.Deleted { + for _, storedRepo := range stored { + if storedRepo.ID == repo.ID { + s.sourceDiff(repo.ID, &sdiff, storedRepo.Sources, repo.Sources) + break + } + } + } + return &sdiff } @@ -340,12 +540,12 @@ func (s *Syncer) sourceDiff(repoID api.RepoID, diff *sourceDiff, oldSources, new // of s.Synced will receive a list of repos. In particular this is so that the // git update scheduler can start working straight away on existing // repositories. -func (s *Syncer) initialUnmodifiedDiffFromStore(ctx context.Context) { +func (s *Syncer) initialUnmodifiedDiffFromStore(ctx context.Context, store Store) { if s.Synced == nil { return } - stored, err := s.Store.ListRepos(ctx, StoreListReposArgs{}) + stored, err := store.ListRepos(ctx, StoreListReposArgs{}) if err != nil { s.Logger.Warn("initialUnmodifiedDiffFromStore store.ListRepos", "error", err) return @@ -402,6 +602,10 @@ func (d Diff) Repos() Repos { // NewDiff returns a diff from the given sourced and stored repos. func NewDiff(sourced, stored []*Repo) (diff Diff) { + return newDiff(nil, sourced, stored) +} + +func newDiff(svc *ExternalService, sourced, stored []*Repo) (diff Diff) { // Sort sourced so we merge deterministically sort.Sort(Repos(sourced)) @@ -431,12 +635,21 @@ func NewDiff(sourced, stored []*Repo) (diff Diff) { } seenID := make(map[api.ExternalRepoSpec]bool, len(stored)) - seenName := make(map[string]bool, len(stored)) for _, old := range stored { src := byID[old.ExternalRepo] + // if the repo hasn't been found in the sourced repo list + // we add it to the Deleted slice and, if the service is provided + // we remove the service from its source map. if src == nil { + if svc != nil { + if _, ok := old.Sources[svc.URN()]; ok { + old = old.Clone() + delete(old.Sources, svc.URN()) + } + } + diff.Deleted = append(diff.Deleted, old) } else if old.Update(src) { diff.Modified = append(diff.Modified, old) @@ -445,7 +658,6 @@ func NewDiff(sourced, stored []*Repo) (diff Diff) { } seenID[old.ExternalRepo] = true - seenName[old.Name] = true } for _, r := range byID { @@ -464,12 +676,7 @@ func merge(o, n *Repo) { o.Update(n) } -func (s *Syncer) sourced(ctx context.Context, observe ...func(*Repo)) ([]*Repo, error) { - svcs, err := s.Store.ListExternalServices(ctx, StoreListExternalServicesArgs{}) - if err != nil { - return nil, err - } - +func (s *Syncer) sourced(ctx context.Context, svcs []*ExternalService, observe ...func(*Repo)) ([]*Repo, error) { srcs, err := s.Sourcer(svcs...) if err != nil { return nil, err @@ -478,13 +685,13 @@ func (s *Syncer) sourced(ctx context.Context, observe ...func(*Repo)) ([]*Repo, return listAll(ctx, srcs, observe...) } -func (s *Syncer) makeNewRepoInserter(ctx context.Context) (func(*Repo), error) { +func (s *Syncer) makeNewRepoInserter(ctx context.Context, store Store) (func(*Repo), error) { // syncSubset requires querying the store for related repositories, and // will do nothing if `insertOnly` is set and there are any related repositories. Most // repositories will already have related repos, so to avoid that cost we - // ask the store for all repositories and only do syncsubset if it might + // ask the store for all repositories and only do syncSubset if it might // be an insert. - ids, err := s.Store.ListExternalRepoSpecs(ctx) + ids, err := store.ListExternalRepoSpecs(ctx) if err != nil { return nil, err } @@ -495,7 +702,7 @@ func (s *Syncer) makeNewRepoInserter(ctx context.Context) (func(*Repo), error) { return } - err := s.insertIfNew(ctx, r) + err := s.insertIfNew(ctx, store, r) if err != nil && s.Logger != nil { // Best-effort, final syncer will handle this repo if this failed. s.Logger.Warn("streaming insert failed", "external_id", r.ExternalRepo, "error", err) @@ -503,24 +710,45 @@ func (s *Syncer) makeNewRepoInserter(ctx context.Context) (func(*Repo), error) { }, nil } -func (s *Syncer) setOrResetLastSyncErr(perr *error) { +func (s *Syncer) setOrResetLastSyncErr(serviceID int64, perr *error) { var err error if perr != nil { err = *perr } - s.lastSyncErrMu.Lock() - s.lastSyncErr = err - s.lastSyncErrMu.Unlock() + s.syncErrorsMu.Lock() + defer s.syncErrorsMu.Unlock() + if s.syncErrors == nil { + s.syncErrors = make(map[int64]error) + } + + if err == nil { + delete(s.syncErrors, serviceID) + return + } + s.syncErrors[serviceID] = err } -// LastSyncError returns the error that was produced in the last Sync run. If -// no error was produced, this returns nil. -func (s *Syncer) LastSyncError() error { - s.lastSyncErrMu.Lock() - defer s.lastSyncErrMu.Unlock() +// SyncErrors returns all errors that was produced in the last Sync run per external sevice. If +// no error was produced, this returns an empty slice. +// Errors are sorted by external service id. +func (s *Syncer) SyncErrors() []error { + s.syncErrorsMu.Lock() + defer s.syncErrorsMu.Unlock() + + ids := make([]int, 0, len(s.syncErrors)) + + for id := range s.syncErrors { + ids = append(ids, int(id)) + } + sort.Ints(ids) - return s.lastSyncErr + sorted := make([]error, len(ids)) + for i, id := range ids { + sorted[i] = s.syncErrors[int64(id)] + } + + return sorted } func (s *Syncer) observe(ctx context.Context, family, title string) (context.Context, func(*Diff, *error)) { @@ -545,6 +773,7 @@ func (s *Syncer) observe(ctx context.Context, family, title string) (context.Con if len(repos) > 0 && s.Logger != nil { s.Logger.Debug(family, "diff."+state, repos.NamesSummary()) + s.Logger.Debug(family, "diff."+state, repos.NamesSummary()) } } syncedTotal.WithLabelValues(state).Add(float64(len(repos))) diff --git a/cmd/repo-updater/repos/syncer_test.go b/cmd/repo-updater/repos/syncer_test.go index 9a3a694065d9..ade3cdd50be1 100644 --- a/cmd/repo-updater/repos/syncer_test.go +++ b/cmd/repo-updater/repos/syncer_test.go @@ -2,6 +2,7 @@ package repos_test import ( "context" + "database/sql" "errors" "fmt" "sort" @@ -12,6 +13,7 @@ import ( "github.com/gitchander/permutation" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" + "github.com/sourcegraph/sourcegraph/cmd/repo-updater/repos" "github.com/sourcegraph/sourcegraph/internal/api" "github.com/sourcegraph/sourcegraph/internal/extsvc" @@ -26,16 +28,13 @@ import ( func testSyncerSyncWithErrors(t *testing.T, store repos.Store) func(t *testing.T) { return func(t *testing.T) { ctx := context.Background() - github := repos.ExternalService{ + + githubService := repos.ExternalService{ Kind: extsvc.KindGitHub, Config: `{}`, } - gitlab := repos.ExternalService{ - Kind: extsvc.KindGitLab, - Config: `{}`, - } - if err := store.UpsertExternalServices(ctx, &github, &gitlab); err != nil { + if err := store.UpsertExternalServices(ctx, &githubService); err != nil { t.Fatal(err) } @@ -49,20 +48,11 @@ func testSyncerSyncWithErrors(t *testing.T, store repos.Store) func(t *testing.T name: "sourcer error aborts sync", sourcer: repos.NewFakeSourcer(errors.New("boom")), store: store, - err: "syncer.sync.sourced: 2 errors occurred:\n\t* boom\n\t* boom\n\n", - }, - { - name: "sources partial errors aborts sync", - sourcer: repos.NewFakeSourcer(nil, - repos.NewFakeSource(&github, nil), - repos.NewFakeSource(&gitlab, errors.New("boom")), - ), - store: store, - err: "syncer.sync.sourced: 1 error occurred:\n\t* boom\n\n", + err: "syncer.sync.sourced: 1 error occurred:\n\t* boom\n\n", }, { name: "store list error aborts sync", - sourcer: repos.NewFakeSourcer(nil, repos.NewFakeSource(&github, nil)), + sourcer: repos.NewFakeSourcer(nil, repos.NewFakeSource(&githubService, nil)), store: &storeWithErrors{ Store: store, ListReposErr: errors.New("boom"), @@ -71,7 +61,7 @@ func testSyncerSyncWithErrors(t *testing.T, store repos.Store) func(t *testing.T }, { name: "store upsert error aborts sync", - sourcer: repos.NewFakeSourcer(nil, repos.NewFakeSource(&github, nil)), + sourcer: repos.NewFakeSourcer(nil, repos.NewFakeSource(&githubService, nil)), store: &storeWithErrors{ Store: store, UpsertReposErr: errors.New("booya"), @@ -86,18 +76,21 @@ func testSyncerSyncWithErrors(t *testing.T, store repos.Store) func(t *testing.T ctx := context.Background() syncer := &repos.Syncer{ - Store: tc.store, Sourcer: tc.sourcer, Now: now, } - err := syncer.Sync(ctx) + err := syncer.SyncExternalService(ctx, tc.store, githubService.ID, time.Millisecond) - if have, want := fmt.Sprint(err), tc.err; have != want { + if have, want := err.Error(), tc.err; have != want { t.Errorf("have error %q, want %q", have, want) } - if have, want := fmt.Sprint(syncer.LastSyncError()), tc.err; have != want { - t.Errorf("have LastSyncError %q, want %q", have, want) + if len(syncer.SyncErrors()) != 1 { + t.Fatal("expected 1 error") + } + + if have, want := syncer.SyncErrors(), tc.err; have[0].Error() != want { + t.Errorf("have SyncErrors %q, want %q", have, want) } }) } @@ -250,6 +243,7 @@ func testSyncerSync(t *testing.T, s repos.Store) func(*testing.T) { sourcer repos.Sourcer store repos.Store stored repos.Repos + svcs []*repos.ExternalService ctx context.Context now func() time.Time diff repos.Diff @@ -282,7 +276,8 @@ func testSyncerSync(t *testing.T, s repos.Store) func(*testing.T) { repos.Opt.RepoCreatedAt(clock.Time(1)), repos.Opt.RepoSources(tc.svc.Clone().URN()), )}}, - err: "", + svcs: []*repos.ExternalService{tc.svc}, + err: "", }, testCase{ name: tc.repo.Name + "/new repo sources", @@ -297,7 +292,8 @@ func testSyncerSync(t *testing.T, s repos.Store) func(*testing.T) { repos.Opt.RepoModifiedAt(clock.Time(1)), repos.Opt.RepoSources(tc.svc.URN(), svcdup.URN()), )}}, - err: "", + svcs: []*repos.ExternalService{tc.svc}, + err: "", }, testCase{ name: tc.repo.Name + "/deleted repo source", @@ -312,7 +308,8 @@ func testSyncerSync(t *testing.T, s repos.Store) func(*testing.T) { diff: repos.Diff{Modified: repos.Repos{tc.repo.With( repos.Opt.RepoModifiedAt(clock.Time(1)), )}}, - err: "", + svcs: []*repos.ExternalService{tc.svc}, + err: "", }, testCase{ name: tc.repo.Name + "/deleted ALL repo sources", @@ -325,7 +322,8 @@ func testSyncerSync(t *testing.T, s repos.Store) func(*testing.T) { diff: repos.Diff{Deleted: repos.Repos{tc.repo.With( repos.Opt.RepoDeletedAt(clock.Time(1)), )}}, - err: "", + svcs: []*repos.ExternalService{tc.svc, &svcdup}, + err: "", }, testCase{ name: tc.repo.Name + "/renamed repo is detected via external_id", @@ -339,7 +337,8 @@ func testSyncerSync(t *testing.T, s repos.Store) func(*testing.T) { tc.repo.With( repos.Opt.RepoModifiedAt(clock.Time(1))), }}, - err: "", + svcs: []*repos.ExternalService{tc.svc}, + err: "", }, testCase{ name: tc.repo.Name + "/repo got renamed to another repo that gets deleted", @@ -372,7 +371,8 @@ func testSyncerSync(t *testing.T, s repos.Store) func(*testing.T) { ), }, }, - err: "", + svcs: []*repos.ExternalService{tc.svc}, + err: "", }, testCase{ name: tc.repo.Name + "/repo inserted with same name as another repo that gets deleted", @@ -402,7 +402,8 @@ func testSyncerSync(t *testing.T, s repos.Store) func(*testing.T) { }), }, }, - err: "", + svcs: []*repos.ExternalService{tc.svc}, + err: "", }, testCase{ name: tc.repo.Name + "/repo inserted with same name as repo without id", @@ -434,7 +435,8 @@ func testSyncerSync(t *testing.T, s repos.Store) func(*testing.T) { }), }, }, - err: "", + svcs: []*repos.ExternalService{tc.svc}, + err: "", }, testCase{ name: tc.repo.Name + "/renamed repo which was deleted is detected and added", @@ -450,7 +452,8 @@ func testSyncerSync(t *testing.T, s repos.Store) func(*testing.T) { tc.repo.With( repos.Opt.RepoCreatedAt(clock.Time(1))), }}, - err: "", + svcs: []*repos.ExternalService{tc.svc}, + err: "", }, testCase{ name: tc.repo.Name + "/repos have their names swapped", @@ -490,7 +493,8 @@ func testSyncerSync(t *testing.T, s repos.Store) func(*testing.T) { }), }, }, - err: "", + svcs: []*repos.ExternalService{tc.svc}, + err: "", }, testCase{ name: tc.repo.Name + "/case insensitive name", @@ -502,6 +506,7 @@ func testSyncerSync(t *testing.T, s repos.Store) func(*testing.T) { stored: repos.Repos{tc.repo.With(repos.Opt.RepoName(strings.ToUpper(tc.repo.Name)))}, now: clock.Now, diff: repos.Diff{Modified: repos.Repos{tc.repo.With(repos.Opt.RepoModifiedAt(clock.Time(0)))}}, + svcs: []*repos.ExternalService{tc.svc}, err: "", }, func() testCase { @@ -540,7 +545,8 @@ func testSyncerSync(t *testing.T, s repos.Store) func(*testing.T) { repos.Opt.RepoModifiedAt(clock.Time(1)), repos.Opt.RepoMetadata(update), )}}, - err: "", + svcs: []*repos.ExternalService{tc.svc}, + err: "", } }(), ) @@ -583,18 +589,20 @@ func testSyncerSync(t *testing.T, s repos.Store) func(*testing.T) { } syncer := &repos.Syncer{ - Store: st, Sourcer: tc.sourcer, Now: now, } - err := syncer.Sync(ctx) - if have, want := fmt.Sprint(err), tc.err; have != want { - t.Errorf("have error %q, want %q", have, want) - } + for _, svc := range tc.svcs { + err := syncer.SyncExternalService(ctx, st, svc.ID, time.Millisecond) - if err != nil { - return + if have, want := fmt.Sprint(err), tc.err; have != want { + t.Errorf("have error %q, want %q", have, want) + } + + if err != nil { + return + } } if st != nil { @@ -724,10 +732,9 @@ func testSyncSubset(t *testing.T, s repos.Store) func(*testing.T) { clock := clock syncer := &repos.Syncer{ - Store: st, - Now: clock.Now, + Now: clock.Now, } - err := syncer.SyncSubset(ctx, tc.sourced.Clone()...) + err := syncer.SyncSubset(ctx, st, tc.sourced.Clone()...) if err != nil { t.Fatal(err) } @@ -1004,93 +1011,835 @@ func TestDiff(t *testing.T) { diff.Sort() tc.diff.Sort() if cDiff := cmp.Diff(diff, tc.diff); cDiff != "" { - // t.Logf("have: %s\nwant: %s\n", pp.Sprint(diff), pp.Sprint(tc.diff)) t.Fatalf("unexpected diff:\n%s", cDiff) } }) } } -func testSyncRun(t *testing.T, store repos.Store) func(t *testing.T) { - return func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) +func testSyncRun(db *sql.DB) func(t *testing.T, store repos.Store) func(t *testing.T) { + return func(t *testing.T, store repos.Store) func(t *testing.T) { + return func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() - // some ceremony to setup metadata on our test repos - svc := &repos.ExternalService{ - Config: `{}`, - Kind: extsvc.KindGitHub, - } + svc := &repos.ExternalService{ + Config: `{"url": "https://github.com", "repositoryQuery": ["none"], "token": "abc"}`, + Kind: extsvc.KindGitHub, + } - if err := store.UpsertExternalServices(ctx, svc); err != nil { - t.Fatal(err) + if err := store.UpsertExternalServices(ctx, svc); err != nil { + t.Fatal(err) + } + + mk := func(name string) *repos.Repo { + return &repos.Repo{ + Name: name, + Metadata: &github.Repository{}, + ExternalRepo: api.ExternalRepoSpec{ + ID: name, + ServiceID: "https://github.com", + ServiceType: svc.Kind, + }, + } + } + + // Our test will have 1 initial repo, and discover a new repo on sourcing. + stored := repos.Repos{mk("initial")}.With(repos.Opt.RepoSources(svc.URN())) + sourced := repos.Repos{mk("initial"), mk("new")} + + syncer := &repos.Syncer{ + Sourcer: repos.NewFakeSourcer(nil, repos.NewFakeSource(svc, nil, sourced...)), + Synced: make(chan repos.Diff), + SubsetSynced: make(chan repos.Diff), + Now: time.Now, + } + + // Initial repos in store + if err := store.InsertRepos(ctx, stored...); err != nil { + t.Fatal(err) + } + + done := make(chan struct{}) + go func() { + defer close(done) + err := syncer.Run(ctx, db, store, repos.RunOptions{ + EnqueueInterval: func() time.Duration { return time.Second }, + IsCloud: false, + MinSyncInterval: 1 * time.Millisecond, + DequeueInterval: 1 * time.Millisecond, + }) + if err != nil && err != context.Canceled { + t.Fatal(err) + } + }() + + // Ignore fields store adds + ignore := cmpopts.IgnoreFields(repos.Repo{}, "ID", "CreatedAt", "UpdatedAt", "Sources") + + // The first thing sent down Synced is the list of repos in store. + diff := <-syncer.Synced + if d := cmp.Diff(repos.Diff{Unmodified: stored}, diff, ignore); d != "" { + t.Fatalf("initial Synced mismatch (-want +got):\n%s", d) + } + + // Next up it should find the new repo and send it down SubsetSynced + diff = <-syncer.SubsetSynced + if d := cmp.Diff(repos.Diff{Added: repos.Repos{mk("new")}}, diff, ignore); d != "" { + t.Fatalf("SubsetSynced mismatch (-want +got):\n%s", d) + } + + // Finally we get the final diff, which will have everything listed as + // Unmodified since we added when we did SubsetSynced. + diff = <-syncer.Synced + if d := cmp.Diff(repos.Diff{Unmodified: sourced}, diff, ignore); d != "" { + t.Fatalf("final Synced mismatch (-want +got):\n%s", d) + } + + // We check synced again to test us going around the Run loop 2 times in + // total. + diff = <-syncer.Synced + if d := cmp.Diff(repos.Diff{Unmodified: sourced}, diff, ignore); d != "" { + t.Fatalf("second final Synced mismatch (-want +got):\n%s", d) + } + + // Cancel context and the run loop should stop + cancel() + <-done } + } +} + +func testSyncer(db *sql.DB) func(t *testing.T, store repos.Store) func(t *testing.T) { + return func(t *testing.T, store repos.Store) func(t *testing.T) { + return func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + services := mkExternalServices(time.Now()) + + githubService := services[0] + gitlabService := services[1] + bitbucketCloudService := services[3] + + services = repos.ExternalServices{ + githubService, + gitlabService, + bitbucketCloudService, + } + + // setup services + if err := store.UpsertExternalServices(ctx, services...); err != nil { + t.Fatal(err) + } - mk := func(name string) *repos.Repo { - return &repos.Repo{ - Name: name, + githubRepo := (&repos.Repo{ + Name: "github.com/org/foo", Metadata: &github.Repository{}, ExternalRepo: api.ExternalRepoSpec{ - ID: name, - ServiceID: "https://github.com", - ServiceType: svc.Kind, + ID: "foo-external-12345", + ServiceID: "https://github.com/", + ServiceType: extsvc.TypeGitHub, }, + }).With( + repos.Opt.RepoSources(githubService.URN()), + ) + + gitlabRepo := (&repos.Repo{ + Name: "gitlab.com/org/foo", + Metadata: &gitlab.Project{}, + ExternalRepo: api.ExternalRepoSpec{ + ID: "12345", + ServiceID: "https://gitlab.com/", + ServiceType: extsvc.TypeGitLab, + }, + }).With( + repos.Opt.RepoSources(gitlabService.URN()), + ) + + bitbucketCloudRepo := (&repos.Repo{ + Name: "bitbucket.org/team/foo", + Metadata: &bitbucketcloud.Repo{}, + ExternalRepo: api.ExternalRepoSpec{ + ID: "{e164a64c-bd73-4a40-b447-d71b43f328a8}", + ServiceID: "https://bitbucket.org/", + ServiceType: extsvc.TypeBitbucketCloud, + }, + }).With( + repos.Opt.RepoSources(bitbucketCloudService.URN()), + ) + + removeSources := func(r *repos.Repo) { + r.Sources = nil } - } - // Our test will have 1 initial repo, and discover a new repo on sourcing. - stored := repos.Repos{mk("initial")}.With(repos.Opt.RepoSources(svc.URN())) - sourced := repos.Repos{mk("initial"), mk("new")} + baseGithubRepos := mkRepos(10, githubRepo) + githubSourced := baseGithubRepos.Clone().With(removeSources) + baseGitlabRepos := mkRepos(10, gitlabRepo) + gitlabSourced := baseGitlabRepos.Clone().With(removeSources) + baseBitbucketCloudRepos := mkRepos(10, bitbucketCloudRepo) + bitbucketCloudSourced := baseBitbucketCloudRepos.Clone().With(removeSources) + + sourcers := map[int64]repos.Source{ + githubService.ID: repos.NewFakeSource(githubService, nil, githubSourced...), + gitlabService.ID: repos.NewFakeSource(gitlabService, nil, gitlabSourced...), + bitbucketCloudService.ID: repos.NewFakeSource(bitbucketCloudService, nil, bitbucketCloudSourced...), + } - syncer := &repos.Syncer{ - Store: store, - Sourcer: repos.NewFakeSourcer(nil, repos.NewFakeSource(svc, nil, sourced...)), - Synced: make(chan repos.Diff), - SubsetSynced: make(chan repos.Diff), - Now: time.Now, - } + syncer := &repos.Syncer{ + Sourcer: func(services ...*repos.ExternalService) (repos.Sources, error) { + if len(services) > 1 { + t.Fatalf("Expected 1 service, got %d", len(services)) + } + s, ok := sourcers[services[0].ID] + if !ok { + t.Fatalf("sourcer not found: %d", services[0].ID) + } + return repos.Sources{s}, nil + }, + Synced: make(chan repos.Diff), + Now: time.Now, + } - // Initial repos in store - if err := syncer.Store.InsertRepos(ctx, stored...); err != nil { - t.Fatal(err) + done := make(chan struct{}) + go func() { + defer close(done) + err := syncer.Run(ctx, db, store, repos.RunOptions{ + EnqueueInterval: func() time.Duration { return time.Second }, + IsCloud: false, + MinSyncInterval: 1 * time.Minute, + DequeueInterval: 1 * time.Millisecond, + }) + if err != nil && err != context.Canceled { + t.Fatal(err) + } + }() + + // Ignore fields store adds + ignore := cmpopts.IgnoreFields(repos.Repo{}, "ID", "CreatedAt", "UpdatedAt", "Sources") + + // The first thing sent down Synced is an empty list of repos in store. + diff := <-syncer.Synced + if d := cmp.Diff(repos.Diff{}, diff, ignore); d != "" { + t.Fatalf("initial Synced mismatch (-want +got):\n%s", d) + } + + // it should add a job for all external services + var jobCount int + for i := 0; i < 10; i++ { + if err := db.QueryRowContext(ctx, "SELECT COUNT(*) FROM external_service_sync_jobs").Scan(&jobCount); err != nil { + t.Fatal(err) + } + if jobCount == len(services) { + break + } + // We need to give the worker package time to create the jobs + time.Sleep(10 * time.Millisecond) + } + if jobCount != len(services) { + t.Fatalf("expected %d sync jobs, got %d", len(services), jobCount) + } + + for i := 0; i < len(services); i++ { + diff = <-syncer.Synced + if len(diff.Added) != 10 { + t.Fatalf("Expected 10 Added repos. got %d", len(diff.Added)) + } + if len(diff.Deleted) != 0 { + t.Fatalf("Expected 0 Deleted repos. got %d", len(diff.Added)) + } + if len(diff.Modified) != 0 { + t.Fatalf("Expected 0 Modified repos. got %d", len(diff.Added)) + } + if len(diff.Unmodified) != 0 { + t.Fatalf("Expected 0 Unmodified repos. got %d", len(diff.Added)) + } + } + + var jobsCompleted int + for i := 0; i < 10; i++ { + if err := db.QueryRowContext(ctx, "SELECT COUNT(*) FROM external_service_sync_jobs where state = 'completed'").Scan(&jobsCompleted); err != nil { + t.Fatal(err) + } + if jobsCompleted == len(services) { + break + } + // We need to give the worker package time to create the jobs + time.Sleep(10 * time.Millisecond) + } + + // Cancel context and the run loop should stop + cancel() + <-done } + } +} + +func testOrphanedRepo(db *sql.DB) func(t *testing.T, store repos.Store) func(t *testing.T) { + return func(t *testing.T, store repos.Store) func(t *testing.T) { + return func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + now := time.Now() + + svc1 := &repos.ExternalService{ + Kind: extsvc.KindGitHub, + DisplayName: "Github - Test1", + Config: `{"url": "https://github.com"}`, + CreatedAt: now, + UpdatedAt: now, + } + svc2 := &repos.ExternalService{ + Kind: extsvc.KindGitHub, + DisplayName: "Github - Test2", + Config: `{"url": "https://github.com"}`, + CreatedAt: now, + UpdatedAt: now, + } + + // setup services + if err := store.UpsertExternalServices(ctx, svc1, svc2); err != nil { + t.Fatal(err) + } + + githubRepo := &repos.Repo{ + Name: "github.com/org/foo", + Metadata: &github.Repository{}, + ExternalRepo: api.ExternalRepoSpec{ + ID: "foo-external-12345", + ServiceID: "https://github.com/", + ServiceType: extsvc.TypeGitHub, + }, + } + + // Add two services, both pointing at the same repo - done := make(chan struct{}) - go func() { - defer close(done) - syncer.Run(ctx, func() time.Duration { return 0 }) - }() + // Sync first service + syncer := &repos.Syncer{ + Sourcer: func(services ...*repos.ExternalService) (repos.Sources, error) { + s := repos.NewFakeSource(svc1, nil, githubRepo) + return repos.Sources{s}, nil + }, + Now: time.Now, + } + if err := syncer.SyncExternalService(ctx, store, svc1.ID, 10*time.Second); err != nil { + t.Fatal(err) + } + + // Sync second service + syncer = &repos.Syncer{ + Sourcer: func(services ...*repos.ExternalService) (repos.Sources, error) { + s := repos.NewFakeSource(svc2, nil, githubRepo) + return repos.Sources{s}, nil + }, + Now: time.Now, + } + if err := syncer.SyncExternalService(ctx, store, svc2.ID, 10*time.Second); err != nil { + t.Fatal(err) + } + + // Confirm that there are two relationships + var rowCount int + if err := db.QueryRowContext(ctx, "SELECT COUNT(*) FROM external_service_repos").Scan(&rowCount); err != nil { + t.Fatal(err) + } + if rowCount != 2 { + t.Fatalf("Expected 2 rows, got %d", rowCount) + } + + // We should have no deleted repos + if err := db.QueryRowContext(ctx, "SELECT COUNT(*) FROM repo where deleted_at is not null").Scan(&rowCount); err != nil { + t.Fatal(err) + } + if rowCount != 0 { + t.Fatalf("Expected 0 rows, got %d", rowCount) + } + + // Remove the repo from one service and sync again + syncer = &repos.Syncer{ + Sourcer: func(services ...*repos.ExternalService) (repos.Sources, error) { + s := repos.NewFakeSource(svc1, nil) + return repos.Sources{s}, nil + }, + Now: time.Now, + } + if err := syncer.SyncExternalService(ctx, store, svc1.ID, 10*time.Second); err != nil { + t.Fatal(err) + } + + // Confirm that the repository hasn't been deleted + rs, err := store.ListRepos(ctx, repos.StoreListReposArgs{}) + if err != nil { + t.Fatal(err) + } + if len(rs) != 1 { + t.Fatalf("Expected 1 repo, got %d", len(rs)) + } + + // Confirm that there is one relationship + if err := db.QueryRowContext(ctx, "SELECT COUNT(*) FROM external_service_repos").Scan(&rowCount); err != nil { + t.Fatal(err) + } + if rowCount != 1 { + t.Fatalf("Expected 1 rows, got %d", rowCount) + } + + // We should have no deleted repos + if err := db.QueryRowContext(ctx, "SELECT COUNT(*) FROM repo where deleted_at is not null").Scan(&rowCount); err != nil { + t.Fatal(err) + } + if rowCount != 0 { + t.Fatalf("Expected 0 rows, got %d", rowCount) + } + + // Remove the repo from the second service and sync again + syncer = &repos.Syncer{ + Sourcer: func(services ...*repos.ExternalService) (repos.Sources, error) { + s := repos.NewFakeSource(svc2, nil) + return repos.Sources{s}, nil + }, + Now: time.Now, + } + if err := syncer.SyncExternalService(ctx, store, svc2.ID, 10*time.Second); err != nil { + t.Fatal(err) + } - // Ignore fields store adds - ignore := cmpopts.IgnoreFields(repos.Repo{}, "ID", "CreatedAt", "UpdatedAt", "Sources") + // Confirm that there no relationships + if err := db.QueryRowContext(ctx, "SELECT COUNT(*) FROM external_service_repos").Scan(&rowCount); err != nil { + t.Fatal(err) + } + if rowCount != 0 { + t.Fatalf("Expected 0 rows, got %d", rowCount) + } - // The first thing sent down Synced is the list of repos in store. - diff := <-syncer.Synced - if d := cmp.Diff(repos.Diff{Unmodified: stored}, diff, ignore); d != "" { - t.Fatalf("initial Synced mismatch (-want +got):\n%s", d) + // We should have one deleted repo + if err := db.QueryRowContext(ctx, "SELECT COUNT(*) FROM repo where deleted_at is not null").Scan(&rowCount); err != nil { + t.Fatal(err) + } + if rowCount != 1 { + t.Fatalf("Expected 1 rows, got %d", rowCount) + } } + } +} - // Next up it should find the new repo and send it down SubsetSynced - diff = <-syncer.SubsetSynced - if d := cmp.Diff(repos.Diff{Added: repos.Repos{mk("new")}}, diff, ignore); d != "" { - t.Fatalf("SubsetSynced mismatch (-want +got):\n%s", d) +func testNameOnConflictDiscardOld(db *sql.DB) func(t *testing.T, store repos.Store) func(t *testing.T) { + return func(t *testing.T, store repos.Store) func(t *testing.T) { + return func(t *testing.T) { + // Test the case where more than one external service returns the same name for different repos. The names + // are the same, but the external id are different. + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + now := time.Now() + + svc1 := &repos.ExternalService{ + Kind: extsvc.KindGitHub, + DisplayName: "Github - Test1", + Config: `{"url": "https://github.com"}`, + CreatedAt: now, + UpdatedAt: now, + } + svc2 := &repos.ExternalService{ + Kind: extsvc.KindGitHub, + DisplayName: "Github - Test2", + Config: `{"url": "https://github.com"}`, + CreatedAt: now, + UpdatedAt: now, + } + + // setup services + if err := store.UpsertExternalServices(ctx, svc1, svc2); err != nil { + t.Fatal(err) + } + + githubRepo1 := &repos.Repo{ + Name: "github.com/org/foo", + Metadata: &github.Repository{}, + ExternalRepo: api.ExternalRepoSpec{ + ID: "foo-external-foo", + ServiceID: "https://github.com/", + ServiceType: extsvc.TypeGitHub, + }, + } + + githubRepo2 := &repos.Repo{ + Name: "github.com/org/foo", + Metadata: &github.Repository{}, + ExternalRepo: api.ExternalRepoSpec{ + ID: "foo-external-bar", + ServiceID: "https://github.com/", + ServiceType: extsvc.TypeGitHub, + }, + } + + // Add two services, one with each repo + + // Sync first service + syncer := &repos.Syncer{ + Sourcer: func(services ...*repos.ExternalService) (repos.Sources, error) { + s := repos.NewFakeSource(svc1, nil, githubRepo1) + return repos.Sources{s}, nil + }, + Now: time.Now, + } + if err := syncer.SyncExternalService(ctx, store, svc1.ID, 10*time.Second); err != nil { + t.Fatal(err) + } + + // Sync second service + syncer = &repos.Syncer{ + Sourcer: func(services ...*repos.ExternalService) (repos.Sources, error) { + s := repos.NewFakeSource(svc2, nil, githubRepo2) + return repos.Sources{s}, nil + }, + Now: time.Now, + } + if err := syncer.SyncExternalService(ctx, store, svc2.ID, 10*time.Second); err != nil { + t.Fatal(err) + } + + // We expect repo2 to be synced since it sorts before repo1 because the ID is alphabetically first + fromDB, err := store.ListRepos(ctx, repos.StoreListReposArgs{ + Names: []string{"github.com/org/foo"}, + }) + if err != nil { + t.Fatal(err) + } + + if len(fromDB) != 1 { + t.Fatalf("Expected 1 repo, have %d", len(fromDB)) + } + + found := fromDB[0] + expectedID := "foo-external-bar" + if found.ExternalRepo.ID != expectedID { + t.Fatalf("Want %q, got %q", expectedID, found.ExternalRepo.ID) + } } + } +} + +func testNameOnConflictDiscardNew(db *sql.DB) func(t *testing.T, store repos.Store) func(t *testing.T) { + return func(t *testing.T, store repos.Store) func(t *testing.T) { + return func(t *testing.T) { + // Test the case where more than one external service returns the same name for different repos. The names + // are the same, but the external id are different. - // Finally we get the final diff, which will have everything listed as - // Unmodified since we added when we did SubsetSynced. - diff = <-syncer.Synced - if d := cmp.Diff(repos.Diff{Unmodified: sourced}, diff, ignore); d != "" { - t.Fatalf("final Synced mismatch (-want +got):\n%s", d) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + now := time.Now() + + svc1 := &repos.ExternalService{ + Kind: extsvc.KindGitHub, + DisplayName: "Github - Test1", + Config: `{"url": "https://github.com"}`, + CreatedAt: now, + UpdatedAt: now, + } + svc2 := &repos.ExternalService{ + Kind: extsvc.KindGitHub, + DisplayName: "Github - Test2", + Config: `{"url": "https://github.com"}`, + CreatedAt: now, + UpdatedAt: now, + } + + // setup services + if err := store.UpsertExternalServices(ctx, svc1, svc2); err != nil { + t.Fatal(err) + } + + githubRepo1 := &repos.Repo{ + Name: "github.com/org/foo", + Metadata: &github.Repository{}, + ExternalRepo: api.ExternalRepoSpec{ + ID: "foo-external-bar", + ServiceID: "https://github.com/", + ServiceType: extsvc.TypeGitHub, + }, + } + + githubRepo2 := &repos.Repo{ + Name: "github.com/org/foo", + Metadata: &github.Repository{}, + ExternalRepo: api.ExternalRepoSpec{ + ID: "foo-external-foo", + ServiceID: "https://github.com/", + ServiceType: extsvc.TypeGitHub, + }, + } + + // Add two services, one with each repo + + // Sync first service + syncer := &repos.Syncer{ + Sourcer: func(services ...*repos.ExternalService) (repos.Sources, error) { + s := repos.NewFakeSource(svc1, nil, githubRepo1) + return repos.Sources{s}, nil + }, + Now: time.Now, + } + if err := syncer.SyncExternalService(ctx, store, svc1.ID, 10*time.Second); err != nil { + t.Fatal(err) + } + + // Sync second service + syncer = &repos.Syncer{ + Sourcer: func(services ...*repos.ExternalService) (repos.Sources, error) { + s := repos.NewFakeSource(svc2, nil, githubRepo2) + return repos.Sources{s}, nil + }, + Now: time.Now, + } + if err := syncer.SyncExternalService(ctx, store, svc2.ID, 10*time.Second); err != nil { + t.Fatal(err) + } + + // We expect repo1 to be synced since it sorts before repo2 because the ID is alphabetically first + fromDB, err := store.ListRepos(ctx, repos.StoreListReposArgs{ + Names: []string{"github.com/org/foo"}, + }) + if err != nil { + t.Fatal(err) + } + + if len(fromDB) != 1 { + t.Fatalf("Expected 1 repo, have %d", len(fromDB)) + } + + found := fromDB[0] + expectedID := "foo-external-bar" + if found.ExternalRepo.ID != expectedID { + t.Fatalf("Want %q, got %q", expectedID, found.ExternalRepo.ID) + } } + } +} + +func testNameOnConflictOnRename(db *sql.DB) func(t *testing.T, store repos.Store) func(t *testing.T) { + return func(t *testing.T, store repos.Store) func(t *testing.T) { + return func(t *testing.T) { + // Test the case where more than one external service returns the same name for different repos. The names + // are the same, but the external id are different. + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() - // We check synced again to test us going around the Run loop 2 times in - // total. - diff = <-syncer.Synced - if d := cmp.Diff(repos.Diff{Unmodified: sourced}, diff, ignore); d != "" { - t.Fatalf("second final Synced mismatch (-want +got):\n%s", d) + now := time.Now() + + svc1 := &repos.ExternalService{ + Kind: extsvc.KindGitHub, + DisplayName: "Github - Test1", + Config: `{"url": "https://github.com"}`, + CreatedAt: now, + UpdatedAt: now, + } + svc2 := &repos.ExternalService{ + Kind: extsvc.KindGitHub, + DisplayName: "Github - Test2", + Config: `{"url": "https://github.com"}`, + CreatedAt: now, + UpdatedAt: now, + } + + // setup services + if err := store.UpsertExternalServices(ctx, svc1, svc2); err != nil { + t.Fatal(err) + } + + githubRepo1 := &repos.Repo{ + Name: "github.com/org/foo", + Metadata: &github.Repository{}, + ExternalRepo: api.ExternalRepoSpec{ + ID: "foo-external-foo", + ServiceID: "https://github.com/", + ServiceType: extsvc.TypeGitHub, + }, + } + + githubRepo2 := &repos.Repo{ + Name: "github.com/org/bar", + Metadata: &github.Repository{}, + ExternalRepo: api.ExternalRepoSpec{ + ID: "foo-external-bar", + ServiceID: "https://github.com/", + ServiceType: extsvc.TypeGitHub, + }, + } + + // Add two services, one with each repo + + // Sync first service + syncer := &repos.Syncer{ + Sourcer: func(services ...*repos.ExternalService) (repos.Sources, error) { + s := repos.NewFakeSource(svc1, nil, githubRepo1) + return repos.Sources{s}, nil + }, + Now: time.Now, + } + if err := syncer.SyncExternalService(ctx, store, svc1.ID, 10*time.Second); err != nil { + t.Fatal(err) + } + + // Sync second service + syncer = &repos.Syncer{ + Sourcer: func(services ...*repos.ExternalService) (repos.Sources, error) { + s := repos.NewFakeSource(svc2, nil, githubRepo2) + return repos.Sources{s}, nil + }, + Now: time.Now, + } + if err := syncer.SyncExternalService(ctx, store, svc2.ID, 10*time.Second); err != nil { + t.Fatal(err) + } + + // Rename repo1 with the same name as repo2 + renamedRepo1 := githubRepo1.With(func(r *repos.Repo) { + r.Name = githubRepo2.Name + }) + + // Sync first service + syncer = &repos.Syncer{ + Sourcer: func(services ...*repos.ExternalService) (repos.Sources, error) { + s := repos.NewFakeSource(svc1, nil, renamedRepo1) + return repos.Sources{s}, nil + }, + Now: time.Now, + } + if err := syncer.SyncExternalService(ctx, store, svc1.ID, 10*time.Second); err != nil { + t.Fatal(err) + } + + // We expect repo1 to be synced since it sorts before repo2 because the ID is alphabetically first + fromDB, err := store.ListRepos(ctx, repos.StoreListReposArgs{}) + if err != nil { + t.Fatal(err) + } + + if len(fromDB) != 1 { + t.Fatalf("Expected 1 repo, have %d", len(fromDB)) + } + + found := fromDB[0] + expectedID := "foo-external-bar" + if found.ExternalRepo.ID != expectedID { + t.Fatalf("Want %q, got %q", expectedID, found.ExternalRepo.ID) + } } + } +} +func testDeleteExternalService(db *sql.DB) func(t *testing.T, store repos.Store) func(t *testing.T) { + return func(t *testing.T, store repos.Store) func(t *testing.T) { + return func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + now := time.Now() + + svc1 := &repos.ExternalService{ + Kind: extsvc.KindGitHub, + DisplayName: "Github - Test1", + Config: `{"url": "https://github.com"}`, + CreatedAt: now, + UpdatedAt: now, + } + svc2 := &repos.ExternalService{ + Kind: extsvc.KindGitHub, + DisplayName: "Github - Test2", + Config: `{"url": "https://github.com"}`, + CreatedAt: now, + UpdatedAt: now, + } + + // setup services + if err := store.UpsertExternalServices(ctx, svc1, svc2); err != nil { + t.Fatal(err) + } + + githubRepo := &repos.Repo{ + Name: "github.com/org/foo", + Metadata: &github.Repository{}, + ExternalRepo: api.ExternalRepoSpec{ + ID: "foo-external-12345", + ServiceID: "https://github.com/", + ServiceType: extsvc.TypeGitHub, + }, + } + + // Add two services, both pointing at the same repo + + // Sync first service + syncer := &repos.Syncer{ + Sourcer: func(services ...*repos.ExternalService) (repos.Sources, error) { + s := repos.NewFakeSource(svc1, nil, githubRepo) + return repos.Sources{s}, nil + }, + Now: time.Now, + } + if err := syncer.SyncExternalService(ctx, store, svc1.ID, 10*time.Second); err != nil { + t.Fatal(err) + } + + // Sync second service + syncer = &repos.Syncer{ + Sourcer: func(services ...*repos.ExternalService) (repos.Sources, error) { + s := repos.NewFakeSource(svc2, nil, githubRepo) + return repos.Sources{s}, nil + }, + Now: time.Now, + } + if err := syncer.SyncExternalService(ctx, store, svc2.ID, 10*time.Second); err != nil { + t.Fatal(err) + } + + // Delete the first service + svc1.DeletedAt = now + if err := store.UpsertExternalServices(ctx, svc1); err != nil { + t.Fatal(err) + } + + // Confirm that there is one relationship + var rowCount int + if err := db.QueryRowContext(ctx, "SELECT COUNT(*) FROM external_service_repos").Scan(&rowCount); err != nil { + t.Fatal(err) + } + if rowCount != 1 { + t.Fatalf("Expected 1 row, got %d", rowCount) + } + + // We should have no deleted repos + if err := db.QueryRowContext(ctx, "SELECT COUNT(*) FROM repo where deleted_at is not null").Scan(&rowCount); err != nil { + t.Fatal(err) + } + if rowCount != 0 { + t.Fatalf("Expected 0 rows, got %d", rowCount) + } + + // Delete the second service + svc2.DeletedAt = now + if err := store.UpsertExternalServices(ctx, svc2); err != nil { + t.Fatal(err) + } - // Cancel context and the run loop should stop - cancel() - <-done + // Confirm that there no relationships + if err := db.QueryRowContext(ctx, "SELECT COUNT(*) FROM external_service_repos").Scan(&rowCount); err != nil { + t.Fatal(err) + } + if rowCount != 0 { + t.Fatalf("Expected 0 rows, got %d", rowCount) + } + + // We should have one deleted repo + if err := db.QueryRowContext(ctx, "SELECT COUNT(*) FROM repo where deleted_at is not null").Scan(&rowCount); err != nil { + t.Fatal(err) + } + if rowCount != 1 { + t.Fatalf("Expected 1 rows, got %d", rowCount) + } + } } } diff --git a/cmd/repo-updater/repos/types.go b/cmd/repo-updater/repos/types.go index e3a227d7897e..e5e0d218386e 100644 --- a/cmd/repo-updater/repos/types.go +++ b/cmd/repo-updater/repos/types.go @@ -727,7 +727,7 @@ func (r *Repo) With(opts ...func(*Repo)) *Repo { // // Context on using other fields such as timestamps to order/resolve // conflicts: We only want to rely on values that have constraints in our -// database. Tmestamps have the following downsides: +// database. Timestamps have the following downsides: // // - We need to assume the upstream codehost has reasonable values for them // - Not all codehosts set them to relevant values (eg gitolite or other) @@ -773,7 +773,7 @@ func sortedSliceLess(a, b []string) bool { return v < b[i] } } - return true + return len(a) != len(b) } // pick deterministically chooses between a and b a repo to keep and @@ -905,6 +905,15 @@ func (rs Repos) Filter(pred func(*Repo) bool) (fs Repos) { // convenience methods for operating on lists of ExternalServices. type ExternalServices []*ExternalService +// IDs returns the list of ids from all ExternalServices. +func (es ExternalServices) IDs() []int64 { + ids := make([]int64, len(es)) + for i := range es { + ids[i] = es[i].ID + } + return ids +} + // DisplayNames returns the list of display names from all ExternalServices. func (es ExternalServices) DisplayNames() []string { names := make([]string, len(es)) diff --git a/cmd/repo-updater/repoupdater/server.go b/cmd/repo-updater/repoupdater/server.go index fce018aa37dc..d2054f368005 100644 --- a/cmd/repo-updater/repoupdater/server.go +++ b/cmd/repo-updater/repoupdater/server.go @@ -325,7 +325,7 @@ func (s *Server) handleExternalServiceSync(w http.ResponseWriter, r *http.Reques return } - s.Syncer.TriggerSync() + s.Syncer.TriggerEnqueueSyncJobs() err := externalServiceValidate(ctx, &req) if err == github.ErrIncompleteResults { @@ -529,7 +529,7 @@ func (s *Server) remoteRepoSync(ctx context.Context, codehost *extsvc.CodeHost, }, nil } - err = s.Syncer.SyncSubset(ctx, repo) + err = s.Syncer.SyncSubset(ctx, s.Store, repo) if err != nil { return nil, err } @@ -563,7 +563,7 @@ func (s *Server) handleStatusMessages(w http.ResponseWriter, r *http.Request) { }) } - if e := s.Syncer.LastSyncError(); e != nil { + for _, e := range s.Syncer.SyncErrors() { if multiErr, ok := errors.Cause(e).(*multierror.Error); ok { for _, e := range multiErr.Errors { if sourceErr, ok := e.(*repos.SourceError); ok { diff --git a/cmd/repo-updater/repoupdater/server_test.go b/cmd/repo-updater/repoupdater/server_test.go index dc9f60bcc0e6..3de5730d824c 100644 --- a/cmd/repo-updater/repoupdater/server_test.go +++ b/cmd/repo-updater/repoupdater/server_test.go @@ -16,16 +16,16 @@ import ( "testing" "time" - "github.com/google/uuid" - "github.com/sourcegraph/sourcegraph/internal/extsvc/awscodecommit" - "github.com/google/go-cmp/cmp" + "github.com/google/uuid" "github.com/inconshreveable/log15" "github.com/opentracing/opentracing-go" + "github.com/sourcegraph/sourcegraph/cmd/repo-updater/repos" "github.com/sourcegraph/sourcegraph/internal/api" "github.com/sourcegraph/sourcegraph/internal/db/dbtest" "github.com/sourcegraph/sourcegraph/internal/extsvc" + "github.com/sourcegraph/sourcegraph/internal/extsvc/awscodecommit" "github.com/sourcegraph/sourcegraph/internal/extsvc/bitbucketserver" "github.com/sourcegraph/sourcegraph/internal/extsvc/github" "github.com/sourcegraph/sourcegraph/internal/extsvc/gitlab" @@ -808,8 +808,7 @@ func testServerStatusMessages(t *testing.T, store repos.Store) func(t *testing.T clock := repos.NewFakeClock(time.Now(), 0) syncer := &repos.Syncer{ - Store: store, - Now: clock.Now, + Now: clock.Now, } if tc.sourcerErr != nil || tc.listRepoErr != nil { @@ -820,8 +819,7 @@ func testServerStatusMessages(t *testing.T, store repos.Store) func(t *testing.T sourcer := repos.NewFakeSourcer(tc.sourcerErr, repos.NewFakeSource(githubService, nil)) // Run Sync so that possibly `LastSyncErrors` is set syncer.Sourcer = sourcer - syncer.Store = store - _ = syncer.Sync(ctx) + _ = syncer.SyncExternalService(ctx, store, githubService.ID, time.Millisecond) } s := &Server{ @@ -1252,8 +1250,7 @@ func testRepoLookup(db *sql.DB) func(t *testing.T, repoStore repos.Store) func(t clock := clock syncer := &repos.Syncer{ - Store: store, - Now: clock.Now, + Now: clock.Now, } s := &Server{Syncer: syncer, Store: store} if tc.githubDotComSource != nil { diff --git a/cmd/repo-updater/shared/main.go b/cmd/repo-updater/shared/main.go index b1dfa28f2a69..cabcbc0c2922 100644 --- a/cmd/repo-updater/shared/main.go +++ b/cmd/repo-updater/shared/main.go @@ -73,6 +73,7 @@ func Main(enterpriseInit EnterpriseInit) { log.Fatalf("Detected repository DSN change, restarting to take effect: %q", newDSN) } }) + db, err := dbutil.NewDB(dsn, "repo-updater") if err != nil { log.Fatalf("failed to initialize db store: %v", err) @@ -129,7 +130,9 @@ func Main(enterpriseInit EnterpriseInit) { server.SourcegraphDotComMode = true es, err := store.ListExternalServices(ctx, repos.StoreListExternalServicesArgs{ - Kinds: []string{extsvc.KindGitHub, extsvc.KindGitLab}, + // On Cloud we want to fetch our admin owned external service only here + NamespaceUserID: -1, + Kinds: []string{extsvc.KindGitHub, extsvc.KindGitLab}, }) if err != nil { @@ -170,20 +173,21 @@ func Main(enterpriseInit EnterpriseInit) { gps := repos.NewGitolitePhabricatorMetadataSyncer(store) syncer := &repos.Syncer{ - Store: store, Sourcer: src, Logger: log15.Root(), Now: clock, } - if envvar.SourcegraphDotComMode() { - syncer.FailFullSync = true - } else { - syncer.Synced = make(chan repos.Diff) - syncer.SubsetSynced = make(chan repos.Diff) - go watchSyncer(ctx, syncer, scheduler, gps) - go func() { log.Fatal(syncer.Run(ctx, repos.GetUpdateInterval)) }() - } + syncer.Synced = make(chan repos.Diff) + syncer.SubsetSynced = make(chan repos.Diff) + go watchSyncer(ctx, syncer, scheduler, gps) + go func() { + log.Fatal(syncer.Run(ctx, db, store, repos.RunOptions{ + EnqueueInterval: repos.GetUpdateInterval, + IsCloud: envvar.SourcegraphDotComMode(), + PrometheusRegisterer: prometheus.DefaultRegisterer, + })) + }() server.Syncer = syncer go syncCloned(ctx, scheduler, gitserver.DefaultClient, store) diff --git a/enterprise/cmd/repo-updater/authz/perms_syncer_test.go b/enterprise/cmd/repo-updater/authz/perms_syncer_test.go index 0893243a8b0a..9a65029968fb 100644 --- a/enterprise/cmd/repo-updater/authz/perms_syncer_test.go +++ b/enterprise/cmd/repo-updater/authz/perms_syncer_test.go @@ -120,6 +120,10 @@ func (s *mockReposStore) CountNotClonedRepos(ctx context.Context) (uint64, error return 0, nil } +func (s *mockReposStore) EnqueueSyncJobs(ctx context.Context, ignoreSiteAdmin bool) error { + return nil +} + func TestPermsSyncer_syncUserPerms(t *testing.T) { p := &mockProvider{ serviceType: extsvc.TypeGitLab, diff --git a/internal/db/dbutil/dbutil.go b/internal/db/dbutil/dbutil.go index 5d862dddfbe8..4aa118bc8892 100644 --- a/internal/db/dbutil/dbutil.go +++ b/internal/db/dbutil/dbutil.go @@ -331,7 +331,10 @@ func (n *NullJSONRawMessage) Scan(value interface{}) error { switch value := value.(type) { case nil: case []byte: - n.Raw = value + // We make a copy here because the given value could be reused by + // the SQL driver + n.Raw = make([]byte, len(value)) + copy(n.Raw, value) default: return fmt.Errorf("value is not []byte: %T", value) } diff --git a/internal/db/external_services.go b/internal/db/external_services.go index 890edc39c32b..eb4a51d675e5 100644 --- a/internal/db/external_services.go +++ b/internal/db/external_services.go @@ -419,7 +419,7 @@ func (e *ExternalServicesStore) Update(ctx context.Context, ps []schema.AuthProv } } if update.Config != nil { - if err := execUpdate(ctx, tx, sqlf.Sprintf("config=%s", update.Config)); err != nil { + if err := execUpdate(ctx, tx, sqlf.Sprintf("config=%s, next_sync_at=now()", update.Config)); err != nil { return err } }