From a3916fae73c016d6534b06ca67ec3069fa5f7b33 Mon Sep 17 00:00:00 2001 From: Mehul Kar Date: Thu, 30 Mar 2023 21:37:57 -0700 Subject: [PATCH] Add cache state to task summaries on real runs (2nd try) (#4393) This commit threads the itemStatus struct all the way down to execContext.exec() so we can include it in the run summaries. This is a re-implementation of 883d6d4d9f212998869623c25712b5aa3e230871 which had a crucial bug where RestoreOutputs would report a cache hit, but then _return_ a cache miss value, so the caller (execContext.exec()) would continue executing as if there were a cache miss. --- .../basic_monorepo/run_summary/run_summary.t | 28 +++++++++++- cli/internal/cache/async_cache.go | 2 +- cli/internal/cache/cache.go | 21 ++++++--- cli/internal/cache/cache_fs.go | 16 +++---- cli/internal/cache/cache_fs_test.go | 3 +- cli/internal/cache/cache_http.go | 10 ++--- cli/internal/cache/cache_noop.go | 14 +++--- cli/internal/cache/cache_test.go | 22 ++++----- cli/internal/run/real_run.go | 6 ++- cli/internal/runcache/runcache.go | 23 +++++++--- cli/internal/taskhash/taskhash.go | 45 ++++++++++++++----- 11 files changed, 131 insertions(+), 59 deletions(-) diff --git a/cli/integration_tests/basic_monorepo/run_summary/run_summary.t b/cli/integration_tests/basic_monorepo/run_summary/run_summary.t index a84283f6fa511..a0ac8d932e0ae 100644 --- a/cli/integration_tests/basic_monorepo/run_summary/run_summary.t +++ b/cli/integration_tests/basic_monorepo/run_summary/run_summary.t @@ -7,14 +7,27 @@ Setup $ TURBO_RUN_SUMMARY=true ${TURBO} run build -- someargs > /dev/null # first run (should be cache miss) +# HACK: Generated run summaries are named with a ksuid, which is a time-sorted ID. This _generally_ works +# but we're seeing in this test that sometimes a summary file is not sorted (with /bin/ls) in the order we expect +# causing intermittent test failures. +# Add a sleep statement so we can be sure that the second run is a later timestamp, +# so we can reliably get it with `|head -n1` and `|tail -n1` later in this test. +# When we start emitting the path to the run summary file that was generated, or a way to specify +# the output file, we can remove this and look for the file directly. +# If you find this sleep statement, try running this test 10 times in a row. If there are no +# failures, it *should* be safe to remove. + $ sleep 1 + $ TURBO_RUN_SUMMARY=true ${TURBO} run build -- someargs > /dev/null # run again (expecting full turbo here) + # no output, just check for 0 status code, which means the directory was created $ test -d .turbo/runs # expect 2 run summaries are created $ ls .turbo/runs/*.json | wc -l - \s*1 (re) + \s*2 (re) # get jq-parsed output of each run summary $ FIRST=$(/bin/ls .turbo/runs/*.json | head -n1) + $ SECOND=$(/bin/ls .turbo/runs/*.json | tail -n1) # some top level run summary validation $ cat $FIRST | jq '.tasks | length' @@ -39,6 +52,7 @@ Setup # Extract some task-specific summaries from each $ source "$TESTDIR/../../run-summary-utils.sh" $ FIRST_APP_BUILD=$(getSummaryTaskId "$FIRST" "my-app#build") + $ SECOND_APP_BUILD=$(getSummaryTaskId "$SECOND" "my-app#build") $ FIRST_UTIL_BUILD=$(getSummaryTaskId "$FIRST" "util#build") $ echo $FIRST_APP_BUILD | jq '.execution' @@ -53,13 +67,23 @@ Setup [ "someargs" ] - $ echo $FIRST_APP_BUILD | jq '.hashOfExternalDependencies' "ccab0b28617f1f56" $ echo $FIRST_APP_BUILD | jq '.expandedOutputs' [ "apps/my-app/.turbo/turbo-build.log" ] +# validate that cache state updates in second run + $ echo $FIRST_APP_BUILD | jq '.cacheState' + { + "local": false, + "remote": false + } + $ echo $SECOND_APP_BUILD | jq '.cacheState' + { + "local": true, + "remote": false + } # Some validation of util#build $ echo $FIRST_UTIL_BUILD | jq '.execution' diff --git a/cli/internal/cache/async_cache.go b/cli/internal/cache/async_cache.go index e91ed77c0abb4..0a8f467cc579c 100644 --- a/cli/internal/cache/async_cache.go +++ b/cli/internal/cache/async_cache.go @@ -50,7 +50,7 @@ func (c *asyncCache) Put(anchor turbopath.AbsoluteSystemPath, key string, durati return nil } -func (c *asyncCache) Fetch(anchor turbopath.AbsoluteSystemPath, key string, files []string) (bool, []turbopath.AnchoredSystemPath, int, error) { +func (c *asyncCache) Fetch(anchor turbopath.AbsoluteSystemPath, key string, files []string) (ItemStatus, []turbopath.AnchoredSystemPath, int, error) { return c.realCache.Fetch(anchor, key, files) } diff --git a/cli/internal/cache/cache.go b/cli/internal/cache/cache.go index 20f6783ba9659..c5968329e6c06 100644 --- a/cli/internal/cache/cache.go +++ b/cli/internal/cache/cache.go @@ -20,7 +20,7 @@ import ( type Cache interface { // Fetch returns true if there is a cache it. It is expected to move files // into their correct position as a side effect - Fetch(anchor turbopath.AbsoluteSystemPath, hash string, files []string) (bool, []turbopath.AnchoredSystemPath, int, error) + Fetch(anchor turbopath.AbsoluteSystemPath, hash string, files []string) (ItemStatus, []turbopath.AnchoredSystemPath, int, error) Exists(hash string) ItemStatus // Put caches files for a given hash Put(anchor turbopath.AbsoluteSystemPath, hash string, duration int, files []turbopath.AnchoredSystemPath) error @@ -232,17 +232,24 @@ func (mplex *cacheMultiplexer) removeCache(removal *cacheRemoval) { } } -func (mplex *cacheMultiplexer) Fetch(anchor turbopath.AbsoluteSystemPath, key string, files []string) (bool, []turbopath.AnchoredSystemPath, int, error) { +func (mplex *cacheMultiplexer) Fetch(anchor turbopath.AbsoluteSystemPath, key string, files []string) (ItemStatus, []turbopath.AnchoredSystemPath, int, error) { // Make a shallow copy of the caches, since storeUntil can call removeCache mplex.mu.RLock() caches := make([]Cache, len(mplex.caches)) copy(caches, mplex.caches) mplex.mu.RUnlock() + // We need to return a composite cache status from multiple caches + // Initialize the empty struct so we can assign values to it. This is similar + // to how the Exists() method works. + combinedCacheState := ItemStatus{} + // Retrieve from caches sequentially; if we did them simultaneously we could // easily write the same file from two goroutines at once. for i, cache := range caches { - ok, actualFiles, duration, err := cache.Fetch(anchor, key, files) + itemStatus, actualFiles, duration, err := cache.Fetch(anchor, key, files) + ok := itemStatus.Local || itemStatus.Remote + if err != nil { cd := &util.CacheDisabledError{} if errors.As(err, &cd) { @@ -261,11 +268,15 @@ func (mplex *cacheMultiplexer) Fetch(anchor turbopath.AbsoluteSystemPath, key st // we have previously successfully stored in a higher-priority cache, and so the overall // result is a success at fetching. Storing in lower-priority caches is an optimization. _ = mplex.storeUntil(anchor, key, duration, actualFiles, i) - return ok, actualFiles, duration, err + + // If another cache had already set this to true, we don't need to set it again from this cache + combinedCacheState.Local = combinedCacheState.Local || itemStatus.Local + combinedCacheState.Remote = combinedCacheState.Remote || itemStatus.Remote + return combinedCacheState, actualFiles, duration, err } } - return false, nil, 0, nil + return ItemStatus{Local: false, Remote: false}, nil, 0, nil } func (mplex *cacheMultiplexer) Exists(target string) ItemStatus { diff --git a/cli/internal/cache/cache_fs.go b/cli/internal/cache/cache_fs.go index 2ec25e2dc36b4..b1dc1acf76442 100644 --- a/cli/internal/cache/cache_fs.go +++ b/cli/internal/cache/cache_fs.go @@ -33,7 +33,7 @@ func newFsCache(opts Opts, recorder analytics.Recorder, repoRoot turbopath.Absol } // Fetch returns true if items are cached. It moves them into position as a side effect. -func (f *fsCache) Fetch(anchor turbopath.AbsoluteSystemPath, hash string, _unusedOutputGlobs []string) (bool, []turbopath.AnchoredSystemPath, int, error) { +func (f *fsCache) Fetch(anchor turbopath.AbsoluteSystemPath, hash string, _ []string) (ItemStatus, []turbopath.AnchoredSystemPath, int, error) { uncompressedCachePath := f.cacheDirectory.UntypedJoin(hash + ".tar") compressedCachePath := f.cacheDirectory.UntypedJoin(hash + ".tar.zst") @@ -45,33 +45,33 @@ func (f *fsCache) Fetch(anchor turbopath.AbsoluteSystemPath, hash string, _unuse } else { // It's not in the cache, bail now f.logFetch(false, hash, 0) - return false, nil, 0, nil + return ItemStatus{Local: false}, nil, 0, nil } cacheItem, openErr := cacheitem.Open(actualCachePath) if openErr != nil { - return false, nil, 0, openErr + return ItemStatus{Local: false}, nil, 0, openErr } restoredFiles, restoreErr := cacheItem.Restore(anchor) if restoreErr != nil { _ = cacheItem.Close() - return false, nil, 0, restoreErr + return ItemStatus{Local: false}, nil, 0, restoreErr } meta, err := ReadCacheMetaFile(f.cacheDirectory.UntypedJoin(hash + "-meta.json")) if err != nil { _ = cacheItem.Close() - return false, nil, 0, fmt.Errorf("error reading cache metadata: %w", err) + return ItemStatus{Local: false}, nil, 0, fmt.Errorf("error reading cache metadata: %w", err) } f.logFetch(true, hash, meta.Duration) // Wait to see what happens with close. closeErr := cacheItem.Close() if closeErr != nil { - return false, restoredFiles, 0, closeErr + return ItemStatus{Local: false}, restoredFiles, 0, closeErr } - return true, restoredFiles, meta.Duration, nil + return ItemStatus{Local: true}, restoredFiles, meta.Duration, nil } func (f *fsCache) Exists(hash string) ItemStatus { @@ -129,7 +129,7 @@ func (f *fsCache) Put(anchor turbopath.AbsoluteSystemPath, hash string, duration return cacheItem.Close() } -func (f *fsCache) Clean(anchor turbopath.AbsoluteSystemPath) { +func (f *fsCache) Clean(_ turbopath.AbsoluteSystemPath) { fmt.Println("Not implemented yet") } diff --git a/cli/internal/cache/cache_fs_test.go b/cli/internal/cache/cache_fs_test.go index 89f7ca658ca9f..614ad8640b5da 100644 --- a/cli/internal/cache/cache_fs_test.go +++ b/cli/internal/cache/cache_fs_test.go @@ -216,8 +216,9 @@ func TestFetch(t *testing.T) { outputDir := turbopath.AbsoluteSystemPath(t.TempDir()) dstOutputPath := "some-package" - hit, files, _, err := cache.Fetch(outputDir, "the-hash", []string{}) + cacheStatus, files, _, err := cache.Fetch(outputDir, "the-hash", []string{}) assert.NilError(t, err, "Fetch") + hit := cacheStatus.Local || cacheStatus.Remote if !hit { t.Error("Fetch got false, want true") } diff --git a/cli/internal/cache/cache_http.go b/cli/internal/cache/cache_http.go index e1a79b4df83b4..d943444938ad1 100644 --- a/cli/internal/cache/cache_http.go +++ b/cli/internal/cache/cache_http.go @@ -56,7 +56,7 @@ var mtime = time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC) // nobody is the usual uid / gid of the 'nobody' user. const nobody = 65534 -func (cache *httpCache) Put(anchor turbopath.AbsoluteSystemPath, hash string, duration int, files []turbopath.AnchoredSystemPath) error { +func (cache *httpCache) Put(_ turbopath.AbsoluteSystemPath, hash string, duration int, files []turbopath.AnchoredSystemPath) error { // if cache.writable { cache.requestLimiter.acquire() defer cache.requestLimiter.release() @@ -143,16 +143,16 @@ func (cache *httpCache) storeFile(tw *tar.Writer, repoRelativePath turbopath.Anc return err } -func (cache *httpCache) Fetch(anchor turbopath.AbsoluteSystemPath, key string, _unusedOutputGlobs []string) (bool, []turbopath.AnchoredSystemPath, int, error) { +func (cache *httpCache) Fetch(_ turbopath.AbsoluteSystemPath, key string, _ []string) (ItemStatus, []turbopath.AnchoredSystemPath, int, error) { cache.requestLimiter.acquire() defer cache.requestLimiter.release() hit, files, duration, err := cache.retrieve(key) if err != nil { // TODO: analytics event? - return false, files, duration, fmt.Errorf("failed to retrieve files from HTTP cache: %w", err) + return ItemStatus{Remote: false}, files, duration, fmt.Errorf("failed to retrieve files from HTTP cache: %w", err) } cache.logFetch(hit, key, duration) - return hit, files, duration, err + return ItemStatus{Remote: hit}, files, duration, err } func (cache *httpCache) Exists(key string) ItemStatus { @@ -349,7 +349,7 @@ func restoreSymlink(root turbopath.AbsoluteSystemPath, hdr *tar.Header, allowNon return nil } -func (cache *httpCache) Clean(anchor turbopath.AbsoluteSystemPath) { +func (cache *httpCache) Clean(_ turbopath.AbsoluteSystemPath) { // Not possible; this implementation can only clean for a hash. } diff --git a/cli/internal/cache/cache_noop.go b/cli/internal/cache/cache_noop.go index ff36ed63d04e3..80a3c235de226 100644 --- a/cli/internal/cache/cache_noop.go +++ b/cli/internal/cache/cache_noop.go @@ -8,16 +8,16 @@ func newNoopCache() *noopCache { return &noopCache{} } -func (c *noopCache) Put(anchor turbopath.AbsoluteSystemPath, key string, duration int, files []turbopath.AnchoredSystemPath) error { +func (c *noopCache) Put(_ turbopath.AbsoluteSystemPath, _ string, _ int, _ []turbopath.AnchoredSystemPath) error { return nil } -func (c *noopCache) Fetch(anchor turbopath.AbsoluteSystemPath, key string, files []string) (bool, []turbopath.AnchoredSystemPath, int, error) { - return false, nil, 0, nil +func (c *noopCache) Fetch(_ turbopath.AbsoluteSystemPath, _ string, _ []string) (ItemStatus, []turbopath.AnchoredSystemPath, int, error) { + return ItemStatus{Local: false, Remote: false}, nil, 0, nil } -func (c *noopCache) Exists(key string) ItemStatus { +func (c *noopCache) Exists(_ string) ItemStatus { return ItemStatus{} } -func (c *noopCache) Clean(anchor turbopath.AbsoluteSystemPath) {} -func (c *noopCache) CleanAll() {} -func (c *noopCache) Shutdown() {} +func (c *noopCache) Clean(_ turbopath.AbsoluteSystemPath) {} +func (c *noopCache) CleanAll() {} +func (c *noopCache) Shutdown() {} diff --git a/cli/internal/cache/cache_test.go b/cli/internal/cache/cache_test.go index 9e86da138b93b..3f1787780d1e3 100644 --- a/cli/internal/cache/cache_test.go +++ b/cli/internal/cache/cache_test.go @@ -17,16 +17,16 @@ type testCache struct { entries map[string][]turbopath.AnchoredSystemPath } -func (tc *testCache) Fetch(anchor turbopath.AbsoluteSystemPath, hash string, files []string) (bool, []turbopath.AnchoredSystemPath, int, error) { +func (tc *testCache) Fetch(_ turbopath.AbsoluteSystemPath, hash string, _ []string) (ItemStatus, []turbopath.AnchoredSystemPath, int, error) { if tc.disabledErr != nil { - return false, nil, 0, tc.disabledErr + return ItemStatus{}, nil, 0, tc.disabledErr } foundFiles, ok := tc.entries[hash] if ok { duration := 5 - return true, foundFiles, duration, nil + return ItemStatus{Local: true}, foundFiles, duration, nil } - return false, nil, 0, nil + return ItemStatus{}, nil, 0, nil } func (tc *testCache) Exists(hash string) ItemStatus { @@ -40,7 +40,7 @@ func (tc *testCache) Exists(hash string) ItemStatus { return ItemStatus{} } -func (tc *testCache) Put(anchor turbopath.AbsoluteSystemPath, hash string, duration int, files []turbopath.AnchoredSystemPath) error { +func (tc *testCache) Put(_ turbopath.AbsoluteSystemPath, hash string, _ int, files []turbopath.AnchoredSystemPath) error { if tc.disabledErr != nil { return tc.disabledErr } @@ -48,9 +48,9 @@ func (tc *testCache) Put(anchor turbopath.AbsoluteSystemPath, hash string, durat return nil } -func (tc *testCache) Clean(anchor turbopath.AbsoluteSystemPath) {} -func (tc *testCache) CleanAll() {} -func (tc *testCache) Shutdown() {} +func (tc *testCache) Clean(_ turbopath.AbsoluteSystemPath) {} +func (tc *testCache) CleanAll() {} +func (tc *testCache) Shutdown() {} func newEnabledCache() *testCache { return &testCache{ @@ -106,10 +106,11 @@ func TestPutCachingDisabled(t *testing.T) { mplex.mu.RUnlock() // subsequent Fetch should still work - hit, _, _, err := mplex.Fetch("unused-target", "some-hash", []string{"unused", "files"}) + cacheStatus, _, _, err := mplex.Fetch("unused-target", "some-hash", []string{"unused", "files"}) if err != nil { t.Errorf("got error fetching files: %v", err) } + hit := cacheStatus.Local || cacheStatus.Remote if !hit { t.Error("failed to find previously stored files") } @@ -185,11 +186,12 @@ func TestFetchCachingDisabled(t *testing.T) { }, } - hit, _, _, err := mplex.Fetch("unused-target", "some-hash", []string{"unused", "files"}) + cacheStatus, _, _, err := mplex.Fetch("unused-target", "some-hash", []string{"unused", "files"}) if err != nil { // don't leak the cache removal t.Errorf("Fetch got error %v, want ", err) } + hit := cacheStatus.Local || cacheStatus.Remote if hit { t.Error("hit on empty cache, expected miss") } diff --git a/cli/internal/run/real_run.go b/cli/internal/run/real_run.go index df972e43b487d..b86dbe4204db3 100644 --- a/cli/internal/run/real_run.go +++ b/cli/internal/run/real_run.go @@ -103,6 +103,7 @@ func RealRun( if taskExecutionSummary != nil { taskSummary.ExpandedOutputs = taskHashTracker.GetExpandedOutputs(taskSummary.TaskID) taskSummary.Execution = taskExecutionSummary + taskSummary.CacheState = taskHashTracker.GetCacheStatus(taskSummary.TaskID) // lock since multiple things to be appending to this array at the same time mu.Lock() @@ -247,7 +248,10 @@ func (ec *execContext) exec(ctx gocontext.Context, packageTask *nodes.PackageTas ErrorPrefix: prettyPrefix, WarnPrefix: prettyPrefix, } - hit, err := taskCache.RestoreOutputs(ctx, prefixedUI, progressLogger) + cacheStatus, err := taskCache.RestoreOutputs(ctx, prefixedUI, progressLogger) + ec.taskHashTracker.SetCacheStatus(packageTask.TaskID, cacheStatus) + + hit := cacheStatus.Local || cacheStatus.Remote if err != nil { prefixedUI.Error(fmt.Sprintf("error fetching from cache: %s", err)) } else if hit { diff --git a/cli/internal/runcache/runcache.go b/cli/internal/runcache/runcache.go index 79caef5888195..7b7f2048619e0 100644 --- a/cli/internal/runcache/runcache.go +++ b/cli/internal/runcache/runcache.go @@ -109,13 +109,14 @@ type TaskCache struct { // RestoreOutputs attempts to restore output for the corresponding task from the cache. // Returns true if successful. -func (tc *TaskCache) RestoreOutputs(ctx context.Context, prefixedUI *cli.PrefixedUi, progressLogger hclog.Logger) (bool, error) { +func (tc *TaskCache) RestoreOutputs(ctx context.Context, prefixedUI *cli.PrefixedUi, progressLogger hclog.Logger) (cache.ItemStatus, error) { if tc.cachingDisabled || tc.rc.readsDisabled { if tc.taskOutputMode != util.NoTaskOutput && tc.taskOutputMode != util.ErrorTaskOutput { prefixedUI.Output(fmt.Sprintf("cache bypass, force executing %s", ui.Dim(tc.hash))) } - return false, nil + return cache.ItemStatus{Local: false, Remote: false}, nil } + changedOutputGlobs, err := tc.rc.outputWatcher.GetChangedOutputs(ctx, tc.hash, tc.repoRelativeGlobs.Inclusions) if err != nil { progressLogger.Warn(fmt.Sprintf("Failed to check if we can skip restoring outputs for %v: %v. Proceeding to check cache", tc.pt.TaskID, err)) @@ -124,19 +125,26 @@ func (tc *TaskCache) RestoreOutputs(ctx context.Context, prefixedUI *cli.Prefixe } hasChangedOutputs := len(changedOutputGlobs) > 0 + var cacheStatus cache.ItemStatus + if hasChangedOutputs { // Note that we currently don't use the output globs when restoring, but we could in the // future to avoid doing unnecessary file I/O. We also need to pass along the exclusion // globs as well. - hit, restoredFiles, _, err := tc.rc.cache.Fetch(tc.rc.repoRoot, tc.hash, nil) + itemStatus, restoredFiles, _, err := tc.rc.cache.Fetch(tc.rc.repoRoot, tc.hash, nil) + hit := itemStatus.Local || itemStatus.Remote tc.ExpandedOutputs = restoredFiles + // Assign to this variable outside this closure so we can return at the end of the function + cacheStatus = itemStatus if err != nil { - return false, err + // If there was an error fetching from cache, we'll say there was no cache hit + return cache.ItemStatus{Local: false, Remote: false}, err } else if !hit { if tc.taskOutputMode != util.NoTaskOutput && tc.taskOutputMode != util.ErrorTaskOutput { prefixedUI.Output(fmt.Sprintf("cache miss, executing %s", ui.Dim(tc.hash))) } - return false, nil + // If there was no hit, we can also say there was no hit + return cache.ItemStatus{Local: false, Remote: false}, nil } if err := tc.rc.outputWatcher.NotifyOutputsWritten(ctx, tc.hash, tc.repoRelativeGlobs); err != nil { @@ -144,6 +152,8 @@ func (tc *TaskCache) RestoreOutputs(ctx context.Context, prefixedUI *cli.Prefixe prefixedUI.Warn(ui.Dim(fmt.Sprintf("Failed to mark outputs as cached for %v: %v", tc.pt.TaskID, err))) } } else { + // If no outputs have changed, that means we have a local cache hit. + cacheStatus.Local = true prefixedUI.Warn(fmt.Sprintf("Skipping cache check for %v, outputs have not changed since previous run.", tc.pt.TaskID)) } @@ -162,8 +172,7 @@ func (tc *TaskCache) RestoreOutputs(ctx context.Context, prefixedUI *cli.Prefixe default: // NoLogs, do not output anything } - - return true, nil + return cacheStatus, nil } // ReplayLogFile writes out the stored logfile to the terminal diff --git a/cli/internal/taskhash/taskhash.go b/cli/internal/taskhash/taskhash.go index 46cadf3218f7a..7c4423efa9f85 100644 --- a/cli/internal/taskhash/taskhash.go +++ b/cli/internal/taskhash/taskhash.go @@ -10,6 +10,7 @@ import ( "github.com/hashicorp/go-hclog" "github.com/pyr-sh/dag" gitignore "github.com/sabhiram/go-gitignore" + "github.com/vercel/turbo/cli/internal/cache" "github.com/vercel/turbo/cli/internal/doublestar" "github.com/vercel/turbo/cli/internal/env" "github.com/vercel/turbo/cli/internal/fs" @@ -41,23 +42,25 @@ type Tracker struct { // mu is a mutex that we can lock/unlock to read/write from maps // the fields below should be protected by the mutex. - mu sync.RWMutex - packageTaskEnvVars map[string]env.DetailedMap // taskId -> envvar pairs that affect the hash. - packageTaskHashes map[string]string // taskID -> hash - packageTaskFramework map[string]string // taskID -> inferred framework for package - packageTaskOutputs map[string][]turbopath.AnchoredSystemPath + mu sync.RWMutex + packageTaskEnvVars map[string]env.DetailedMap // taskId -> envvar pairs that affect the hash. + packageTaskHashes map[string]string // taskID -> hash + packageTaskFramework map[string]string // taskID -> inferred framework for package + packageTaskOutputs map[string][]turbopath.AnchoredSystemPath + packageTaskCacheStatus map[string]cache.ItemStatus } // NewTracker creates a tracker for package-inputs combinations and package-task combinations. func NewTracker(rootNode string, globalHash string, pipeline fs.Pipeline) *Tracker { return &Tracker{ - rootNode: rootNode, - globalHash: globalHash, - pipeline: pipeline, - packageTaskHashes: make(map[string]string), - packageTaskFramework: make(map[string]string), - packageTaskEnvVars: make(map[string]env.DetailedMap), - packageTaskOutputs: make(map[string][]turbopath.AnchoredSystemPath), + rootNode: rootNode, + globalHash: globalHash, + pipeline: pipeline, + packageTaskHashes: make(map[string]string), + packageTaskFramework: make(map[string]string), + packageTaskEnvVars: make(map[string]env.DetailedMap), + packageTaskOutputs: make(map[string][]turbopath.AnchoredSystemPath), + packageTaskCacheStatus: make(map[string]cache.ItemStatus), } } @@ -421,3 +424,21 @@ func (th *Tracker) SetExpandedOutputs(taskID string, outputs []turbopath.Anchore defer th.mu.Unlock() th.packageTaskOutputs[taskID] = outputs } + +// SetCacheStatus records the task status for the given taskID +func (th *Tracker) SetCacheStatus(taskID string, cacheStatus cache.ItemStatus) { + th.mu.Lock() + defer th.mu.Unlock() + th.packageTaskCacheStatus[taskID] = cacheStatus +} + +// GetCacheStatus records the task status for the given taskID +func (th *Tracker) GetCacheStatus(taskID string) cache.ItemStatus { + th.mu.Lock() + defer th.mu.Unlock() + status, ok := th.packageTaskCacheStatus[taskID] + if !ok { + return cache.ItemStatus{Local: false, Remote: false} + } + return status +}