Skip to content

Commit

Permalink
campaigns: Switch to changeset syncer per code host (#11874)
Browse files Browse the repository at this point in the history
* campaigns: Switch to changeset syncer per code host

Instead of per external service. Syncer scheduling is coupled to rate
limiting which is per code host so this change aligns them.

* No need to fetch the sources column from the repo table anymore

* Clean up some comments

* Update enterprise/internal/campaigns/syncer.go

Co-authored-by: Thorsten Ball <mrnugget@gmail.com>

* Update enterprise/internal/campaigns/syncer.go

Co-authored-by: Thorsten Ball <mrnugget@gmail.com>

* Update enterprise/internal/campaigns/syncer.go

Co-authored-by: Thorsten Ball <mrnugget@gmail.com>

* Move BaseURL from repos.ExternalService

Move it to the extsvc package so that we can also use it with
api.ExternalService types

* Check for supported code host by using new IsSupportedKind function

Co-authored-by: Thorsten Ball <mrnugget@gmail.com>
  • Loading branch information
ryanslade and mrnugget committed Jul 3, 2020
1 parent 5fb828c commit 51a804d
Show file tree
Hide file tree
Showing 7 changed files with 196 additions and 214 deletions.
36 changes: 0 additions & 36 deletions cmd/repo-updater/repos/types.go
Expand Up @@ -93,42 +93,6 @@ func (e ExternalService) Configuration() (cfg interface{}, _ error) {
return extsvc.ParseConfig(e.Kind, e.Config)
}

// BaseURL will fetch the normalised base URL from the service if
// supported.
func (e ExternalService) BaseURL() (*url.URL, error) {
config, err := extsvc.ParseConfig(e.Kind, e.Config)
if err != nil {
return nil, errors.Wrap(err, "parsing config")
}

var rawURL string
switch c := config.(type) {
case *schema.AWSCodeCommitConnection:
return nil, errors.New("BaseURL unavailable for AWSCodeCommit")
case *schema.BitbucketServerConnection:
rawURL = c.Url
case *schema.GitHubConnection:
rawURL = c.Url
case *schema.GitLabConnection:
rawURL = c.Url
case *schema.GitoliteConnection:
rawURL = c.Host
case *schema.PhabricatorConnection:
rawURL = c.Url
case *schema.OtherExternalServiceConnection:
rawURL = c.Url
default:
return nil, fmt.Errorf("unknown external service type %T", config)
}

parsed, err := url.Parse(rawURL)
if err != nil {
return nil, errors.Wrap(err, "parsing service URL")
}

return extsvc.NormalizeBaseURL(parsed), nil
}

// Exclude changes the configuration of an external service to exclude the given
// repos from being synced.
func (e *ExternalService) Exclude(rs ...*Repo) error {
Expand Down
25 changes: 3 additions & 22 deletions enterprise/internal/campaigns/store.go
Expand Up @@ -6,14 +6,12 @@ import (
"encoding/json"
"fmt"
"io"
"sort"
"time"

"github.com/keegancsmith/sqlf"
"github.com/lib/pq"
"github.com/pkg/errors"
"github.com/segmentio/fasthash/fnv1"
"github.com/sourcegraph/sourcegraph/cmd/repo-updater/repos"
"github.com/sourcegraph/sourcegraph/internal/api"
"github.com/sourcegraph/sourcegraph/internal/campaigns"
"github.com/sourcegraph/sourcegraph/internal/db/dbutil"
Expand Down Expand Up @@ -627,30 +625,13 @@ func (s *Store) ListChangesetSyncData(ctx context.Context, opts ListChangesetSyn
}

func scanChangesetSyncData(h *campaigns.ChangesetSyncData, s scanner) error {
var sources json.RawMessage
err := s.Scan(
return s.Scan(
&h.ChangesetID,
&h.UpdatedAt,
&dbutil.NullTime{Time: &h.LatestEvent},
&dbutil.NullTime{Time: &h.ExternalUpdatedAt},
&sources,
&h.RepoExternalServiceID,
)
if err != nil {
return err
}

infos := make(map[string]*repos.SourceInfo)
if err = json.Unmarshal(sources, &infos); err != nil {
return errors.Wrap(err, "scanChangesetSyncData: failed to unmarshal sources")
}
h.ExternalServiceIDs = make([]int64, 0, len(infos))
for _, v := range infos {
id := v.ExternalServiceID()
h.ExternalServiceIDs = append(h.ExternalServiceIDs, id)
}
sort.Slice(h.ExternalServiceIDs, func(i, j int) bool { return h.ExternalServiceIDs[i] < h.ExternalServiceIDs[j] })

return nil
}

func listChangesetSyncData(opts ListChangesetSyncDataOpts) *sqlf.Query {
Expand All @@ -659,7 +640,7 @@ func listChangesetSyncData(opts ListChangesetSyncDataOpts) *sqlf.Query {
changesets.updated_at,
max(ce.updated_at) AS latest_event,
changesets.external_updated_at,
r.sources
r.external_service_id
FROM changesets
LEFT JOIN changeset_events ce ON changesets.id = ce.changeset_id
JOIN campaigns ON campaigns.changeset_ids ? changesets.id::TEXT
Expand Down
74 changes: 37 additions & 37 deletions enterprise/internal/campaigns/store_test.go
Expand Up @@ -1407,25 +1407,25 @@ func testStoreListChangesetSyncData(t *testing.T, ctx context.Context, s *Store,
}
want := []cmpgn.ChangesetSyncData{
{
ChangesetID: changesets[0].ID,
UpdatedAt: clock.now(),
LatestEvent: clock.now(),
ExternalUpdatedAt: clock.now(),
ExternalServiceIDs: []int64{extSvcID},
ChangesetID: changesets[0].ID,
UpdatedAt: clock.now(),
LatestEvent: clock.now(),
ExternalUpdatedAt: clock.now(),
RepoExternalServiceID: "https://example.com/",
},
{
ChangesetID: changesets[1].ID,
UpdatedAt: clock.now(),
LatestEvent: clock.now(),
ExternalUpdatedAt: clock.now(),
ExternalServiceIDs: []int64{extSvcID},
ChangesetID: changesets[1].ID,
UpdatedAt: clock.now(),
LatestEvent: clock.now(),
ExternalUpdatedAt: clock.now(),
RepoExternalServiceID: "https://example.com/",
},
{
// No events
ChangesetID: changesets[2].ID,
UpdatedAt: clock.now(),
ExternalUpdatedAt: clock.now(),
ExternalServiceIDs: []int64{extSvcID},
ChangesetID: changesets[2].ID,
UpdatedAt: clock.now(),
ExternalUpdatedAt: clock.now(),
RepoExternalServiceID: "https://example.com/",
},
}
if diff := cmp.Diff(want, hs); diff != "" {
Expand All @@ -1451,18 +1451,18 @@ func testStoreListChangesetSyncData(t *testing.T, ctx context.Context, s *Store,
}
want := []cmpgn.ChangesetSyncData{
{
ChangesetID: changesets[1].ID,
UpdatedAt: clock.now(),
LatestEvent: clock.now(),
ExternalUpdatedAt: clock.now(),
ExternalServiceIDs: []int64{extSvcID},
ChangesetID: changesets[1].ID,
UpdatedAt: clock.now(),
LatestEvent: clock.now(),
ExternalUpdatedAt: clock.now(),
RepoExternalServiceID: "https://example.com/",
},
{
// No events
ChangesetID: changesets[2].ID,
UpdatedAt: clock.now(),
ExternalUpdatedAt: clock.now(),
ExternalServiceIDs: []int64{extSvcID},
ChangesetID: changesets[2].ID,
UpdatedAt: clock.now(),
ExternalUpdatedAt: clock.now(),
RepoExternalServiceID: "https://example.com/",
},
}
if diff := cmp.Diff(want, hs); diff != "" {
Expand Down Expand Up @@ -1494,25 +1494,25 @@ func testStoreListChangesetSyncData(t *testing.T, ctx context.Context, s *Store,
}
want = []cmpgn.ChangesetSyncData{
{
ChangesetID: changesets[0].ID,
UpdatedAt: clock.now(),
LatestEvent: clock.now(),
ExternalUpdatedAt: clock.now(),
ExternalServiceIDs: []int64{extSvcID},
ChangesetID: changesets[0].ID,
UpdatedAt: clock.now(),
LatestEvent: clock.now(),
ExternalUpdatedAt: clock.now(),
RepoExternalServiceID: "https://example.com/",
},
{
ChangesetID: changesets[1].ID,
UpdatedAt: clock.now(),
LatestEvent: clock.now(),
ExternalUpdatedAt: clock.now(),
ExternalServiceIDs: []int64{extSvcID},
ChangesetID: changesets[1].ID,
UpdatedAt: clock.now(),
LatestEvent: clock.now(),
ExternalUpdatedAt: clock.now(),
RepoExternalServiceID: "https://example.com/",
},
{
// No events
ChangesetID: changesets[2].ID,
UpdatedAt: clock.now(),
ExternalUpdatedAt: clock.now(),
ExternalServiceIDs: []int64{extSvcID},
ChangesetID: changesets[2].ID,
UpdatedAt: clock.now(),
ExternalUpdatedAt: clock.now(),
RepoExternalServiceID: "https://example.com/",
},
}
if diff := cmp.Diff(want, hs); diff != "" {
Expand Down

0 comments on commit 51a804d

Please sign in to comment.