Skip to content

Commit

Permalink
Sync single external service (#13483)
Browse files Browse the repository at this point in the history
* repo-updater: Sync single external service at a time using workerutil

Tests are still failing

* repoupdater: Fix server_test

* testSyncerSyncWithErrors uses SyncExternalService

Also removes a test case that tests a sync of muktiple external services
as we no longer support that.

* Add EnqueueJobs tests

* Removed old references to syncer.Sync

Code now compiles but a lot of tests still fail.

Added a cleanup function that needs to be called when we stop
a worker to unregister prometheus metrics so that it doesn't panic when we
start another worker in a subsequent test.

* Fix repo-updater tests

* Delete orphaned repos

* Avoid constraints by changing upsert order

* Fix minSyncInterval

* WIP test sync

* Add a test that syncs multiple external services in sequence

* WIP test for orphaned repos

* Only delete orphaned repos

* repo-updater: Remove stale jobs on startup

If a job is processing when repo-updater dies that job would never
complete. We would then not requeue the associated external service for
syncing. To be safe we delete non locked processing rows on startup.

* Typo

* db: Mark next_synced_at on service save

This will cause a sync to be triggered ASAP for the saved external
service.

On save, we trigger a call to repo-updater which causes us to
enqueue any pending sync jobs. As we've just upated next_sync_at the job
for the newly saved repo will be queued.

* Rename trigger

We no longer trigger a full sync but instead trigger enqueueing pending
sync jobs.

* Remove orphaned repo code

We now have a trigger that does the same thing

* Improve orphaned repo test

* Test external service deletion

* Remove orphaned repo code from syncSubset

* Remove leftover debug line

* Fix deleting of repos when called via syncSubset

* Improve EnqueueSyncJobs docstring

Co-authored-by: ᴜɴᴋɴᴡᴏɴ <joe@sourcegraph.com>

* Improve error message

Co-authored-by: ᴜɴᴋɴᴡᴏɴ <joe@sourcegraph.com>

* Check rows.Err() after iteration

* Change log line to debug

Co-authored-by: ᴜɴᴋɴᴡᴏɴ <joe@sourcegraph.com>

* Remove leftover debug line

* Add stalled jobs resetter

* Pass prometheus register to the SyncWorker

* Create ResetterMetrics inline

* Remove unused field

* Fix docstring

* Typo

* Missing period

* Typo

* Add failing test

* Resolve name conflicts deterministically

* Cleanup name conflicts resolution

* Resolve name conflicts during renames

* Don't wrap nil error

* Don't queue jobs for Phabricator services

* Handle large name lists in StoreListReposArgs.Names

* Trigger rebuild

* Remove stale job cleanup code

We use the workerutil resetter now instead

* Add failing test

* Fix predicate on sourcegraphcom mode

* Don't sync deleted external services

* When in cloud mode DON'T sync admin added external services

Co-authored-by: Asdine El Hrychy <asdine.elhrychy@gmail.com>
Co-authored-by: ᴜɴᴋɴᴡᴏɴ <joe@sourcegraph.com>
  • Loading branch information
3 people committed Sep 9, 2020
1 parent 9b6e31a commit d00157a
Show file tree
Hide file tree
Showing 15 changed files with 1,582 additions and 303 deletions.
12 changes: 11 additions & 1 deletion cmd/repo-updater/repos/integration_test.go
Expand Up @@ -58,6 +58,7 @@ func TestIntegration(t *testing.T) {
{"DBStore/DeleteRepos", testStoreDeleteRepos},
{"DBStore/UpsertRepos", testStoreUpsertRepos},
{"DBStore/UpsertSources", testStoreUpsertSources},
{"DBStore/EnqueueSyncJobs", testStoreEnqueueSyncJobs(db, dbstore)},
{"DBStore/ListRepos", testStoreListRepos},
{"DBStore/ListRepos/Pagination", testStoreListReposPagination},
{"DBStore/ListExternalRepoSpecs", testStoreListExternalRepoSpecs(db)},
Expand All @@ -67,10 +68,19 @@ func TestIntegration(t *testing.T) {
{"DBStore/Syncer/SyncWithErrors", testSyncerSyncWithErrors},
{"DBStore/Syncer/SyncSubset", testSyncSubset},
{"DBStore/Syncer/SyncWorker", testSyncWorkerPlumbing(db)},
// {"DBStore/Syncer/Run", testSyncRun},
{"DBStore/Syncer/Run", testSyncRun(db)},
{"DBStore/Syncer/MultipleServices", testSyncer(db)},
{"DBStore/Syncer/OrphanedRepos", testOrphanedRepo(db)},
{"DBStore/Syncer/DeleteExternalService", testDeleteExternalService(db)},
{"DBStore/Syncer/NameConflictDiscardOld", testNameOnConflictDiscardOld(db)},
{"DBStore/Syncer/NameConflictDiscardNew", testNameOnConflictDiscardNew(db)},
{"DBStore/Syncer/NameConflictOnRename", testNameOnConflictOnRename(db)},
} {
t.Run(tc.name, func(t *testing.T) {
t.Cleanup(func() {
if t.Failed() {
return
}
if _, err := db.Exec(`
DELETE FROM external_service_sync_jobs;
DELETE FROM external_service_repos;
Expand Down
26 changes: 26 additions & 0 deletions cmd/repo-updater/repos/observability.go
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/sourcegraph/sourcegraph/internal/api"
"github.com/sourcegraph/sourcegraph/internal/db/dbutil"
"github.com/sourcegraph/sourcegraph/internal/logging"
"github.com/sourcegraph/sourcegraph/internal/metrics"
"github.com/sourcegraph/sourcegraph/internal/trace"
Expand Down Expand Up @@ -139,6 +140,7 @@ type StoreMetrics struct {
ListExternalServices *metrics.OperationMetrics
SetClonedRepos *metrics.OperationMetrics
CountNotClonedRepos *metrics.OperationMetrics
EnqueueSyncJobs *metrics.OperationMetrics
}

// MustRegister registers all metrics in StoreMetrics in the given
Expand Down Expand Up @@ -369,6 +371,17 @@ func (o *ObservedStore) Transact(ctx context.Context) (s TxStore, err error) {
}, nil
}

// With calls the With method of the underlying store if exists,
// otherwise it returns the store unchanged.
// It implements the WithStore interface.
func (o *ObservedStore) With(db dbutil.DB) Store {
if ws, ok := o.store.(WithStore); ok {
return ws.With(db)
}

return o.store
}

// Done calls into the inner Store Done method.
func (o *ObservedStore) Done(errs ...*error) {
tr := o.txtrace
Expand Down Expand Up @@ -613,6 +626,19 @@ func (o *ObservedStore) CountNotClonedRepos(ctx context.Context) (count uint64,
return o.store.CountNotClonedRepos(ctx)
}

func (o *ObservedStore) EnqueueSyncJobs(ctx context.Context, ignoreSiteAdmin bool) (err error) {
tr, ctx := o.trace(ctx, "Store.EnqueueSyncJobs")

defer func(began time.Time) {
secs := time.Since(began).Seconds()
o.metrics.EnqueueSyncJobs.Observe(secs, 0, &err)
tr.SetError(err)
tr.Finish()
}(time.Now())

return o.store.EnqueueSyncJobs(ctx, ignoreSiteAdmin)
}

func (o *ObservedStore) trace(ctx context.Context, family string) (*trace.Trace, context.Context) {
txctx := o.txctx
if txctx == nil {
Expand Down
116 changes: 105 additions & 11 deletions cmd/repo-updater/repos/store.go
Expand Up @@ -27,14 +27,21 @@ type Store interface {
ListExternalServices(context.Context, StoreListExternalServicesArgs) ([]*ExternalService, error)
UpsertExternalServices(ctx context.Context, svcs ...*ExternalService) error

InsertRepos(context.Context, ...*Repo) error
ListRepos(context.Context, StoreListReposArgs) ([]*Repo, error)
ListExternalRepoSpecs(context.Context) (map[api.ExternalRepoSpec]struct{}, error)
DeleteRepos(ctx context.Context, ids ...api.RepoID) error
UpsertRepos(ctx context.Context, repos ...*Repo) error
ListExternalRepoSpecs(context.Context) (map[api.ExternalRepoSpec]struct{}, error)
UpsertSources(ctx context.Context, inserts, updates, deletes map[api.RepoID][]SourceInfo) error
SetClonedRepos(ctx context.Context, repoNames ...string) error
CountNotClonedRepos(ctx context.Context) (uint64, error)

// EnqueueSyncJobs enqueues sync jobs per external service where their next_sync_at is due.
// If ignoreSiteAdmin is true then we only sync user added external services.
EnqueueSyncJobs(ctx context.Context, ignoreSiteAdmin bool) error

// TODO: These two methods should not be used in production, move them to
// an extension interface that's explicitly for testing.
InsertRepos(context.Context, ...*Repo) error
DeleteRepos(ctx context.Context, ids ...api.RepoID) error
}

// StoreListReposArgs is a query arguments type used by
Expand Down Expand Up @@ -205,6 +212,20 @@ func (s *DBStore) Transact(ctx context.Context) (TxStore, error) {
}, nil
}

// WithStore is a store that can take a db handle and return
// a new Store implementation that uses it.
type WithStore interface {
With(dbutil.DB) Store
}

// With returns a new store using the given db handle.
// It implements the WithStore interface.
func (s *DBStore) With(db dbutil.DB) Store {
return &DBStore{
db: db,
}
}

// Done terminates the underlying Tx in a DBStore either by committing or rolling
// back based on the value pointed to by the first given error pointer.
// It's a no-op if the `DBStore` is not operating within a transaction,
Expand Down Expand Up @@ -551,7 +572,7 @@ func (s DBStore) DeleteRepos(ctx context.Context, ids ...api.RepoID) error {
return nil
}

// the number of deleted repos can potentially be higher
// The number of deleted repos can potentially be higher
// than the maximum number of arguments we can pass to postgres.
// We pass them as a json array instead to overcome this limitation.
encodedIds, err := json.Marshal(ids)
Expand Down Expand Up @@ -584,7 +605,11 @@ AND repo.id = repo_ids.id::int

// ListRepos lists all stored repos that match the given arguments.
func (s DBStore) ListRepos(ctx context.Context, args StoreListReposArgs) (repos []*Repo, _ error) {
return repos, s.paginate(ctx, args.Limit, args.PerPage, 0, listReposQuery(args),
listQuery, err := listReposQuery(args)
if err != nil {
return nil, errors.Wrap(err, "creating list repos query function")
}
return repos, s.paginate(ctx, args.Limit, args.PerPage, 0, listQuery,
func(sc scanner) (last, count int64, err error) {
var r Repo
if err := scanRepo(&r, sc); err != nil {
Expand Down Expand Up @@ -642,15 +667,15 @@ AND %s
ORDER BY id ASC LIMIT %s
`

func listReposQuery(args StoreListReposArgs) paginatedQuery {
func listReposQuery(args StoreListReposArgs) (paginatedQuery, error) {
var preds []*sqlf.Query

if len(args.Names) > 0 {
ns := make([]*sqlf.Query, 0, len(args.Names))
for _, name := range args.Names {
ns = append(ns, sqlf.Sprintf("%s", name))
encodedNames, err := json.Marshal(args.Names)
if err != nil {
return nil, errors.Wrap(err, "marshalling name args")
}
preds = append(preds, sqlf.Sprintf("name IN (%s)", sqlf.Join(ns, ",")))
preds = append(preds, sqlf.Sprintf("name IN (SELECT jsonb_array_elements_text(%s) AS id)", encodedNames))
}

if len(args.IDs) > 0 {
Expand Down Expand Up @@ -715,7 +740,7 @@ func listReposQuery(args StoreListReposArgs) paginatedQuery {
joinFilter,
limit,
)
}
}, nil
}

func (s DBStore) ListExternalRepoSpecs(ctx context.Context) (map[api.ExternalRepoSpec]struct{}, error) {
Expand Down Expand Up @@ -1061,6 +1086,75 @@ func (s *DBStore) UpsertRepos(ctx context.Context, repos ...*Repo) (err error) {
return nil
}

func (s *DBStore) EnqueueSyncJobs(ctx context.Context, ignoreSiteAdmin bool) error {
filter := "TRUE"
if ignoreSiteAdmin {
filter = "namespace_user_id IS NOT NULL"
}
q := sqlf.Sprintf(enqueueSyncJobsQueryFmtstr, sqlf.Sprintf(filter))
_, err := s.db.ExecContext(ctx, q.Query(sqlf.PostgresBindVar), q.Args()...)
return err
}

// We ignore Phabricator repos here as they are currently synced using
// RunPhabricatorRepositorySyncWorker
const enqueueSyncJobsQueryFmtstr = `
WITH due AS (
SELECT id
FROM external_services
WHERE (next_sync_at <= clock_timestamp() OR next_sync_at IS NULL)
AND deleted_at IS NULL
AND LOWER(kind) != 'phabricator'
AND %s
),
busy AS (
SELECT DISTINCT external_service_id id FROM external_service_sync_jobs
WHERE state = 'queued'
OR state = 'processing'
)
INSERT INTO external_service_sync_jobs (external_service_id)
SELECT id from due EXCEPT SELECT id from busy
`

// ListSyncJobs returns all sync jobs.
func (s *DBStore) ListSyncJobs(ctx context.Context) ([]SyncJob, error) {
rows, err := s.db.QueryContext(ctx, "SELECT * FROM external_service_sync_jobs_with_next_sync_at")
if err != nil {
return nil, err
}
defer rows.Close()
return scanJobs(rows)
}

func scanJobs(rows *sql.Rows) ([]SyncJob, error) {
var jobs []SyncJob

for rows.Next() {
var job SyncJob
if err := rows.Scan(
&job.ID,
&job.State,
&job.FailureMessage,
&job.StartedAt,
&job.FinishedAt,
&job.ProcessAfter,
&job.NumResets,
&job.NumFailures,
&job.ExternalServiceID,
&job.NextSyncAt,
); err != nil {
return nil, err
}

jobs = append(jobs, job)
}
if err := rows.Err(); err != nil {
return nil, err
}

return jobs, nil
}

func batchReposQuery(fmtstr string, repos []*Repo) (_ *sqlf.Query, err error) {
records := make([]*repoRecord, 0, len(repos))
for _, r := range repos {
Expand Down

0 comments on commit d00157a

Please sign in to comment.