Navigation Menu

Skip to content

Commit

Permalink
codeintel: Collapse worker handler and processor (#12795)
Browse files Browse the repository at this point in the history
  • Loading branch information
efritz committed Aug 10, 2020
1 parent 489110b commit 910fab1
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 263 deletions.

This file was deleted.

Expand Up @@ -5,9 +5,11 @@ import (
"io/ioutil"
"os"
"path/filepath"
"sync/atomic"
"time"

"github.com/inconshreveable/log15"
"github.com/keegancsmith/sqlf"
"github.com/pkg/errors"
"github.com/sourcegraph/sourcegraph/cmd/frontend/backend"
"github.com/sourcegraph/sourcegraph/enterprise/cmd/precise-code-intel-worker/internal/correlation"
Expand All @@ -20,30 +22,72 @@ import (
"github.com/sourcegraph/sourcegraph/internal/api"
"github.com/sourcegraph/sourcegraph/internal/observation"
"github.com/sourcegraph/sourcegraph/internal/vcs"
"github.com/sourcegraph/sourcegraph/internal/workerutil"
"github.com/sourcegraph/sourcegraph/internal/workerutil/dbworker"
dbworkerstore "github.com/sourcegraph/sourcegraph/internal/workerutil/dbworker/store"
)

// CloneInProgressDelay is the delay between processing attempts when a repo is currently being cloned.
const CloneInProgressDelay = time.Minute

// Processor converts raw uploads into dumps.
type Processor interface {
Process(ctx context.Context, tx store.Store, upload store.Upload) (bool, error)
}

type processor struct {
type handler struct {
store store.Store
bundleManagerClient bundles.BundleManagerClient
gitserverClient gitserver.Client
metrics metrics.WorkerMetrics
enableBudget bool
budgetRemaining int64
}

var _ dbworker.Handler = &handler{}
var _ workerutil.WithPreDequeue = &handler{}
var _ workerutil.WithHooks = &handler{}

func (h *handler) Handle(ctx context.Context, tx dbworkerstore.Store, record workerutil.Record) error {
upload := record.(store.Upload)
store := h.store.With(tx)

_, err := h.handle(ctx, store, upload)
return err
}

// process converts a raw upload into a dump within the given transaction context. Returns true if the
func (h *handler) PreDequeue(ctx context.Context) (bool, interface{}, error) {
if !h.enableBudget {
return true, nil, nil
}

budgetRemaining := atomic.LoadInt64(&h.budgetRemaining)
if budgetRemaining <= 0 {
return false, nil, nil
}

return true, []*sqlf.Query{sqlf.Sprintf("(upload_size IS NULL OR upload_size <= %s)", budgetRemaining)}, nil
}

func (h *handler) PreHandle(ctx context.Context, record workerutil.Record) {
atomic.AddInt64(&h.budgetRemaining, -h.getSize(record))
}

func (h *handler) PostHandle(ctx context.Context, record workerutil.Record) {
atomic.AddInt64(&h.budgetRemaining, +h.getSize(record))
}

func (h *handler) getSize(record workerutil.Record) int64 {
if size := record.(store.Upload).UploadSize; size != nil {
return *size
}

return 0
}

// CloneInProgressDelay is the delay between processing attempts when a repo is currently being cloned.
const CloneInProgressDelay = time.Minute

// handle converts a raw upload into a dump within the given transaction context. Returns true if the
// upload record was requeued and false otherwise.
func (p *processor) Process(ctx context.Context, store store.Store, upload store.Upload) (_ bool, err error) {
func (h *handler) handle(ctx context.Context, store store.Store, upload store.Upload) (_ bool, err error) {
// Ensure that the repo and revision are resolvable. If the repo does not exist, or if the repo has finished
// cloning and the revision does not exist, then the upload will fail to process. If the repo is currently
// cloning, then we'll requeue the upload to be tried again later. This will not increase the reset count
// of the record (so this doesn't count against the upload as a legitimate attempt).
if cloneInProgress, err := p.isRepoCurrentlyCloning(ctx, upload.RepositoryID, upload.Commit); err != nil {
if cloneInProgress, err := h.isRepoCurrentlyCloning(ctx, upload.RepositoryID, upload.Commit); err != nil {
return false, err
} else if cloneInProgress {
if err := store.Requeue(ctx, upload.ID, time.Now().UTC().Add(CloneInProgressDelay)); err != nil {
Expand All @@ -65,33 +109,33 @@ func (p *processor) Process(ctx context.Context, store store.Store, upload store
}()

// Pull raw uploaded data from bundle manager
r, err := p.bundleManagerClient.GetUpload(ctx, upload.ID)
r, err := h.bundleManagerClient.GetUpload(ctx, upload.ID)
if err != nil {
return false, errors.Wrap(err, "bundleManager.GetUpload")
}
defer func() {
if err != nil {
// Remove upload file on error instead of waiting for it to expire
if deleteErr := p.bundleManagerClient.DeleteUpload(ctx, upload.ID); deleteErr != nil {
if deleteErr := h.bundleManagerClient.DeleteUpload(ctx, upload.ID); deleteErr != nil {
log15.Warn("Failed to delete upload file", "err", err)
}
}
}()

getChildren := func(ctx context.Context, dirnames []string) (map[string][]string, error) {
directoryChildren, err := p.gitserverClient.DirectoryChildren(ctx, store, upload.RepositoryID, upload.Commit, dirnames)
directoryChildren, err := h.gitserverClient.DirectoryChildren(ctx, store, upload.RepositoryID, upload.Commit, dirnames)
if err != nil {
return nil, errors.Wrap(err, "gitserverClient.DirectoryChildren")
}
return directoryChildren, nil
}

groupedBundleData, err := correlation.Correlate(ctx, r, upload.ID, upload.Root, getChildren, p.metrics)
groupedBundleData, err := correlation.Correlate(ctx, r, upload.ID, upload.Root, getChildren, h.metrics)
if err != nil {
return false, errors.Wrap(err, "correlation.Correlate")
}

if err := p.write(ctx, tempDir, groupedBundleData); err != nil {
if err := h.write(ctx, tempDir, groupedBundleData); err != nil {
return false, err
}

Expand All @@ -112,12 +156,12 @@ func (p *processor) Process(ctx context.Context, store store.Store, upload store
err = tx.Done(err)
}()

if err := p.updateXrepoData(ctx, store, upload, groupedBundleData.Packages, groupedBundleData.PackageReferences); err != nil {
if err := h.updateXrepoData(ctx, store, upload, groupedBundleData.Packages, groupedBundleData.PackageReferences); err != nil {
return false, err
}

// Send converted database file to bundle manager
if err := p.sendDB(ctx, upload.ID, filepath.Join(tempDir, "sqlite.db")); err != nil {
if err := h.sendDB(ctx, upload.ID, filepath.Join(tempDir, "sqlite.db")); err != nil {
return false, err
}

Expand All @@ -126,8 +170,8 @@ func (p *processor) Process(ctx context.Context, store store.Store, upload store

// isRepoCurrentlyCloning determines if the target repository is currently being cloned.
// This function returns an error if the repo or commit cannot be resolved.
func (p *processor) isRepoCurrentlyCloning(ctx context.Context, repoID int, commit string) (_ bool, err error) {
ctx, endOperation := p.metrics.RepoStateOperation.With(ctx, &err, observation.Args{})
func (h *handler) isRepoCurrentlyCloning(ctx context.Context, repoID int, commit string) (_ bool, err error) {
ctx, endOperation := h.metrics.RepoStateOperation.With(ctx, &err, observation.Args{})
defer endOperation(1, observation.Args{})

repo, err := backend.Repos.Get(ctx, api.RepoID(repoID))
Expand All @@ -147,8 +191,8 @@ func (p *processor) isRepoCurrentlyCloning(ctx context.Context, repoID int, comm
}

// write commits the correlated data to disk.
func (p *processor) write(ctx context.Context, dirname string, groupedBundleData *correlation.GroupedBundleData) (err error) {
ctx, endOperation := p.metrics.WriteOperation.With(ctx, &err, observation.Args{})
func (h *handler) write(ctx context.Context, dirname string, groupedBundleData *correlation.GroupedBundleData) (err error) {
ctx, endOperation := h.metrics.WriteOperation.With(ctx, &err, observation.Args{})
defer endOperation(1, observation.Args{})

writer, err := sqlitewriter.NewWriter(ctx, filepath.Join(dirname, "sqlite.db"))
Expand Down Expand Up @@ -179,8 +223,8 @@ func (p *processor) write(ctx context.Context, dirname string, groupedBundleData
}

// TODO(efritz) - refactor/simplify this after last change
func (p *processor) updateXrepoData(ctx context.Context, store store.Store, upload store.Upload, packages []types.Package, packageReferences []types.PackageReference) (err error) {
ctx, endOperation := p.metrics.UpdateXrepoDatabaseOperation.With(ctx, &err, observation.Args{})
func (h *handler) updateXrepoData(ctx context.Context, store store.Store, upload store.Upload, packages []types.Package, packageReferences []types.PackageReference) (err error) {
ctx, endOperation := h.metrics.UpdateXrepoDatabaseOperation.With(ctx, &err, observation.Args{})
defer endOperation(1, observation.Args{})

// Update package and package reference data to support cross-repo queries.
Expand All @@ -198,8 +242,7 @@ func (p *processor) updateXrepoData(ctx context.Context, store store.Store, uplo
return errors.Wrap(err, "store.DeleteOverlappingDumps")
}

// Almost-success: we need to mark this upload as complete at this point as the next step changes
// the visibility of the dumps for this repository. This requires that the new dump be available in
// Almost-success: we need to mark this upload as complete at this point as the next step changes // the visibility of the dumps for this repository. This requires that the new dump be available in
// the lsif_dumps view, which requires a change of state. In the event of a future failure we can
// still roll back to the save point and mark the upload as errored.
if err := store.MarkComplete(ctx, upload.ID); err != nil {
Expand All @@ -218,11 +261,11 @@ func (p *processor) updateXrepoData(ctx context.Context, store store.Store, uplo
return nil
}

func (p *processor) sendDB(ctx context.Context, uploadID int, tempDir string) (err error) {
ctx, endOperation := p.metrics.SendDBOperation.With(ctx, &err, observation.Args{})
func (h *handler) sendDB(ctx context.Context, uploadID int, tempDir string) (err error) {
ctx, endOperation := h.metrics.SendDBOperation.With(ctx, &err, observation.Args{})
defer endOperation(1, observation.Args{})

if err := p.bundleManagerClient.SendDB(ctx, uploadID, tempDir); err != nil {
if err := h.bundleManagerClient.SendDB(ctx, uploadID, tempDir); err != nil {
return errors.Wrap(err, "bundleManager.SendDB")
}

Expand Down
Expand Up @@ -29,7 +29,7 @@ func init() {
sqliteutil.MustRegisterSqlite3WithPcre()
}

func TestProcess(t *testing.T) {
func TestHandle(t *testing.T) {
setupRepoMocks(t)

upload := store.Upload{
Expand All @@ -56,15 +56,15 @@ func TestProcess(t *testing.T) {
"": {"foo.go", "bar.go"},
}, nil)

processor := &processor{
handler := &handler{
bundleManagerClient: bundleManagerClient,
gitserverClient: gitserverClient,
metrics: metrics.NewWorkerMetrics(&observation.TestContext),
}

requeued, err := processor.Process(context.Background(), mockStore, upload)
requeued, err := handler.handle(context.Background(), mockStore, upload)
if err != nil {
t.Fatalf("unexpected error processing upload: %s", err)
t.Fatalf("unexpected error handlling upload: %s", err)
} else if requeued {
t.Errorf("unexpected requeue")
}
Expand Down Expand Up @@ -125,7 +125,7 @@ func TestProcess(t *testing.T) {
}
}

func TestProcessError(t *testing.T) {
func TestHandleError(t *testing.T) {
setupRepoMocks(t)

upload := store.Upload{
Expand All @@ -150,15 +150,15 @@ func TestProcessError(t *testing.T) {
// Set a different tip commit
mockStore.MarkRepositoryAsDirtyFunc.SetDefaultReturn(fmt.Errorf("uh-oh!"))

processor := &processor{
handler := &handler{
bundleManagerClient: bundleManagerClient,
gitserverClient: gitserverClient,
metrics: metrics.NewWorkerMetrics(&observation.TestContext),
}

requeued, err := processor.Process(context.Background(), mockStore, upload)
requeued, err := handler.handle(context.Background(), mockStore, upload)
if err == nil {
t.Fatalf("unexpected nil error processing upload")
t.Fatalf("unexpected nil error handling upload")
} else if !strings.Contains(err.Error(), "uh-oh!") {
t.Fatalf("unexpected error: %s", err)
} else if requeued {
Expand All @@ -174,7 +174,7 @@ func TestProcessError(t *testing.T) {
}
}

func TestProcessCloneInProgress(t *testing.T) {
func TestHandleCloneInProgress(t *testing.T) {
t.Cleanup(func() {
backend.Mocks.Repos.Get = nil
backend.Mocks.Repos.ResolveRev = nil
Expand Down Expand Up @@ -203,15 +203,15 @@ func TestProcessCloneInProgress(t *testing.T) {
bundleManagerClient := bundlemocks.NewMockBundleManagerClient()
gitserverClient := gitservermocks.NewMockClient()

processor := &processor{
handler := &handler{
bundleManagerClient: bundleManagerClient,
gitserverClient: gitserverClient,
metrics: metrics.NewWorkerMetrics(&observation.TestContext),
}

requeued, err := processor.Process(context.Background(), mockStore, upload)
requeued, err := handler.handle(context.Background(), mockStore, upload)
if err != nil {
t.Fatalf("unexpected error processing upload: %s", err)
t.Fatalf("unexpected error handling upload: %s", err)
} else if !requeued {
t.Errorf("expected upload to be requeued")
}
Expand Down

0 comments on commit 910fab1

Please sign in to comment.