Skip to content

Commit 9566c79

Browse files
authored
fix: lake: deduplicate artifact transfers (#14060)
This PR has Lake deduplicate artifacts by their hash when uploading or downloading to the cache (e.g., in `lake cache put` or `lake cache get`). This fixes possible errors when `curl` was asked to transfer to the same file and/or URL multiple times. 🤖 Claude Code assisted with code review and writing tests.
1 parent ab122a1 commit 9566c79

2 files changed

Lines changed: 90 additions & 14 deletions

File tree

src/lake/Lake/Config/Cache.lean

Lines changed: 65 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -703,8 +703,39 @@ inductive TransferKind
703703

704704
structure TransferInfo where
705705
url : String
706+
hash : Hash
706707
path : FilePath
707-
descr : ArtifactDescr
708+
/-- Additional paths for a downloaded artifact. -/
709+
extraPaths : Array FilePath := #[]
710+
711+
@[inline] def TransferInfo.addPath (self : TransferInfo) (path : FilePath) (extra := true) : TransferInfo :=
712+
if extra then
713+
{self with extraPaths := self.extraPaths.push path}
714+
else
715+
{self with path, extraPaths := self.extraPaths.push self.path}
716+
717+
structure TransferDict where
718+
infos : Array TransferInfo
719+
indices : Std.HashMap Hash Nat
720+
721+
@[inline] def TransferDict.empty : TransferDict :=
722+
⟨#[], ∅⟩
723+
724+
@[inline] def TransferDict.push
725+
(self : TransferDict) (url : String) (hash : Hash) (path : FilePath)
726+
: TransferDict := ⟨self.infos.push {url, hash, path}, self.indices.insert hash self.infos.size⟩
727+
728+
@[inline] def TransferDict.addIfNew
729+
(self : TransferDict) (url : String) (hash : Hash) (path : FilePath)
730+
: TransferDict := if self.indices.contains hash then self else self.push url hash path
731+
732+
@[inline] def TransferDict.add
733+
(self : TransferDict) (url : String) (hash : Hash) (path : FilePath) (extra := true)
734+
: TransferDict :=
735+
if let some j := self.indices.get? hash then
736+
{self with infos := self.infos.modify j (·.addPath path extra)}
737+
else
738+
self.push url hash path
708739

709740
structure TransferConfig where
710741
kind : TransferKind
@@ -716,6 +747,12 @@ structure TransferState where
716747
didError : Bool := false
717748
numSuccesses : Nat := 0
718749

750+
def createExtraPaths (path : FilePath) (extraPaths : Array FilePath) : IO Unit := do
751+
-- Note: No intra-cache hard links (breaks permissions/pruning), so we copy
752+
let contents ← IO.FS.readBinFile path
753+
for extraPath in extraPaths do
754+
IO.FS.writeBinFile extraPath contents
755+
719756
partial def monitorTransfer
720757
(cfg : TransferConfig) (h hOut : IO.FS.Handle) (s : TransferState)
721758
: LoggerIO TransferState := do
@@ -726,7 +763,7 @@ partial def monitorTransfer
726763
let s ← (·.2) <$> StateT.run (s := s) do
727764
match Json.parse line >>= fromJson? with
728765
| .ok (out : JsonObject) =>
729-
let some info@{url, path, descr} := getInfo? out
766+
let some info@{url, hash, path, extraPaths} := getInfo? out
730767
| logError s!"{cfg.scope}: unidentifiable transfer completed: {line.trimAscii}"
731768
modify ({· with didError := true})
732769
return
@@ -735,18 +772,20 @@ partial def monitorTransfer
735772
| .ok 201 =>
736773
match cfg.kind with
737774
| .get =>
738-
logInfo s!"{cfg.scope}: downloaded artifact {descr.hash}\
775+
logInfo s!"{cfg.scope}: downloaded artifact {hash}\
739776
\n local path: {path}\
740777
\n remote URL: {url}"
741778
let actualHash ← computeFileHash path
742-
if actualHash != descr.hash then
779+
if actualHash != hash then
743780
logError s!"{path}: downloaded artifact hash mismatch, got {actualHash}"
744781
IO.FS.removeFile path
745782
modify ({· with didError := true})
746783
else
784+
unless extraPaths.isEmpty do
785+
createExtraPaths path extraPaths
747786
modify fun s => {s with numSuccesses := s.numSuccesses + 1}
748787
| .put =>
749-
logInfo s!"{cfg.scope}: uploaded artifact {descr.hash}\
788+
logInfo s!"{cfg.scope}: uploaded artifact {hash}\
750789
\n local path: {path}\
751790
\n remote URL: {url}"
752791
modify fun s => {s with numSuccesses := s.numSuccesses + 1}
@@ -764,7 +803,7 @@ where
764803
| _ => none
765804
handleFailure info code? out line : LoggerIO Unit := do
766805
let action := match cfg.kind with | .get => "download" | .put => "upload"
767-
let mut msg := s!"{cfg.scope}: failed to {action} artifact {info.descr.hash}"
806+
let mut msg := s!"{cfg.scope}: failed to {action} artifact {info.hash}"
768807
if let .ok code := code? then
769808
msg := s!"{msg} (status code: {code})"
770809
if let .ok errMsg := out.getAs String "errormsg" then
@@ -851,14 +890,25 @@ public def downloadArtifacts
851890
if descrs.isEmpty then
852891
logWarning "no artifacts to download"
853892
return
854-
let infos ← descrs.foldlM (init := #[]) fun s descr => do
893+
let {infos, ..} ← descrs.foldlM (init := TransferDict.empty) fun s {descr} => do
894+
let hash := descr.hash
895+
let url := service.artifactUrl hash scope
855896
let path := cache.artifactDir / descr.relPath
856897
if force then
857898
removeFileIfExists path
899+
return s.add url hash path
858900
else if (← path.pathExists) then
859-
return s
860-
let url := service.artifactUrl descr.hash scope
861-
return s.push {url, path, descr}
901+
return s.add url hash path (extra := false)
902+
else
903+
return s.add url hash path
904+
let infos ← infos.filterM fun info => do
905+
if info.extraPaths.isEmpty then
906+
not <$> info.path.pathExists
907+
else
908+
match (← createExtraPaths info.path info.extraPaths |>.toBaseIO) with
909+
| .ok _ => return false
910+
| .error (.noFileOrDirectory ..) => return true
911+
| .error e => error s!"failed to copy artifact: {e}"
862912
if infos.isEmpty then
863913
return
864914
let infos ← id do
@@ -871,7 +921,7 @@ public def downloadArtifacts
871921
transferArtifacts {scope, infos, kind := .get}
872922
where
873923
fetchUrls url infos := IO.FS.withTempFile fun h path => do
874-
let body := Json.arr <| infos.map (toJson ·.descr.hash)
924+
let body := Json.arr <| infos.map (toJson ·.hash)
875925
h.putStr body.compress
876926
h.flush
877927
let args := #[
@@ -940,9 +990,10 @@ public def uploadArtifacts
940990
if n = 0 then
941991
logWarning "no artifacts to upload"
942992
return
943-
let infos ← n.foldM (init := #[]) fun i _ s => do
944-
let url := service.artifactUrl descrs[i].hash scope
945-
return s.push {url, path := paths[i], descr := descrs[i]}
993+
let {infos, ..} ← n.foldM (init := TransferDict.empty) fun i _ s => do
994+
let hash := descrs[i].hash
995+
let url := service.artifactUrl hash scope
996+
return s.addIfNew url hash paths[i]
946997
transferArtifacts {scope, infos, kind := .put, key := service.impl.key}
947998

948999
/-! ### Output Transfer -/

tests/lake/tests/cache/test.sh

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,31 @@ test_exp -d "$CACHE_DIR"
271271
test_run cache clean
272272
test_exp ! -d "$CACHE_DIR"
273273

274+
# Verify cached artifacts are not re-downloaded and that
275+
# artifacts sharing a content hash across extensions are restored locally
276+
# (and thus that artifacts are deduplicated by hash)
277+
rm -rf "$CACHE_DIR"
278+
mkdir -p "$CACHE_DIR/artifacts"
279+
hsh=0123456789abcdef
280+
schema_ver=2026-03-17 # cache map schema version
281+
echo "arbitrary artifact contents" > "$CACHE_DIR/artifacts/$hsh.o"
282+
# An already-cached artifact is not re-downloaded
283+
cat <<EOF > .lake/dummy-outputs.jsonl
284+
"$schema_ver"
285+
["aaaaaaaaaaaaaaaa","$hsh.o"]
286+
EOF
287+
test_run cache get .lake/dummy-outputs.jsonl --scope=test
288+
# A shared content hash restores each extension by local copy
289+
test_exp ! -f "$CACHE_DIR/artifacts/$hsh.dup"
290+
cat <<EOF > .lake/dummy-outputs.jsonl
291+
"$schema_ver"
292+
["aaaaaaaaaaaaaaaa","$hsh.o"]
293+
["bbbbbbbbbbbbbbbb","$hsh.dup"]
294+
EOF
295+
test_run cache get .lake/dummy-outputs.jsonl --scope=test
296+
test_exp -f "$CACHE_DIR/artifacts/$hsh.dup"
297+
test_cmd cmp -s "$CACHE_DIR/artifacts/$hsh.o" "$CACHE_DIR/artifacts/$hsh.dup"
298+
274299
# Verify all artifacts restore from the cache and
275300
# use the build directory with `restoreAllArtifacts`
276301
test_cmd rm -rf "$CACHE_DIR" .lake/build

0 commit comments

Comments
 (0)