Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/lake/Lake/Build/Common.lean
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,7 @@ public def resolveArtifact
match (← getMTime path |>.toBaseIO) with
| .ok mtime =>
return {descr, path, mtime}
| .error (.noFileOrDirectory ..) =>
| .error (.noFileOrDirectory ..) => withLogErrorPos do
-- we redownload artifacts on any error
if let some service := service? then
updateAction .fetch
Expand Down
178 changes: 106 additions & 72 deletions src/lake/Lake/Config/Cache.lean
Original file line number Diff line number Diff line change
Expand Up @@ -558,9 +558,10 @@ public def downloadArtifactCore (hash : Hash) (url : String) (path : FilePath) :
download url path
let actualHash ← computeFileHash path
if actualHash != hash then
logError s!"{path}: downloaded artifact does not have the expected hash"
let errPos ← getLogPos
logError s!"{path}: downloaded artifact hash mismatch, got {actualHash}"
IO.FS.removeFile path
failure
throw errPos

/-- Uploads a file to an online bucket using the Amazon S3 protocol. -/
def uploadS3
Expand Down Expand Up @@ -700,8 +701,9 @@ public def uploadArtifact
/-! ## Multi-Artifact Transfer -/

private inductive TransferKind
| get
| put
| get
| put
deriving DecidableEq

private structure TransferInfo where
url : String
Expand All @@ -712,72 +714,117 @@ private structure TransferConfig where
kind : TransferKind
scope : CacheServiceScope
infos : Array TransferInfo
key : String := ""

private structure TransferState where
didError : Bool := false
numSuccesses : Nat := 0

private partial def monitorTransferLoop
(cfg : TransferConfig) (h : IO.FS.Handle) (s : TransferState)
private partial def monitorTransfer
(cfg : TransferConfig) (h hOut : IO.FS.Handle) (s : TransferState)
: LoggerIO TransferState := do
let line ← h.getLine
if line.trimAscii.isEmpty then
return s
else
let s ← (·.2) <$> StateT.run (s := s) do
match Json.parse line >>= fromJson? with
| .ok (res : JsonObject) =>
let some {url, path, descr} := getInfo? res
| .ok (out : JsonObject) =>
let some info@{url, path, descr} := getInfo? out
| logError s!"{cfg.scope}: unidentifiable transfer completed: {line.trimAscii}"
modify ({· with didError := true})
return
match res.get "http_code" with
match out.get "http_code" with
| .ok 200
| .ok 201 =>
let action := match cfg.kind with | .get => "downloaded" | .put => "uploaded"
logInfo s!"{cfg.scope}: {action} artifact {descr.hash}\
\n local path: {path}\
\n remote URL: {url}"
let actualHash ← computeFileHash path
if actualHash != descr.hash then
logError s!"downloaded artifact does not have the expected hash"
IO.FS.removeFile path
modify ({· with didError := true})
else
match cfg.kind with
| .get =>
logInfo s!"{cfg.scope}: downloaded artifact {descr.hash}\
\n local path: {path}\
\n remote URL: {url}"
let actualHash ← computeFileHash path
if actualHash != descr.hash then
logError s!"{path}: downloaded artifact hash mismatch, got {actualHash}"
IO.FS.removeFile path
modify ({· with didError := true})
else
modify fun s => {s with numSuccesses := s.numSuccesses + 1}
| .put =>
logInfo s!"{cfg.scope}: uploaded artifact {descr.hash}\
\n local path: {path}\
\n remote URL: {url}"
modify fun s => {s with numSuccesses := s.numSuccesses + 1}
| code? =>
let msg? := res.getAs String "errormsg"
logError (mkFailureMsg descr.hash code? msg?)
if cfg.kind matches .get then
-- `curl --remove-on-error` can already do this, but only from 7.83 onwards
removeFileIfExists path
handleFailure info code? out line
modify ({· with didError := true})
| .error e =>
logError s!"curl produced invalid JSON: {e}; received: {line.trimAscii}"
modify ({· with didError := true})
monitorTransferLoop cfg h s
monitorTransfer cfg h hOut s
where
getInfo? res :=
match res.getAs Nat "urlnum" with
getInfo? out :=
match out.getAs Nat "urlnum" with
| .ok i => cfg.infos[i]?
| _ => none
mkFailureMsg hash code? msg? : String := Id.run do
handleFailure info code? out line : LoggerIO Unit := do
let action := match cfg.kind with | .get => "download" | .put => "upload"
let mut msg := s!"{cfg.scope}: failed to {action} artifact {hash}"
let mut msg := s!"{cfg.scope}: failed to {action} artifact {info.descr.hash}"
if let .ok code := code? then
msg := s!"{msg} (status code: {code})"
if let .ok errMsg := msg? then
msg := s!"{msg}: {errMsg}"
return msg

private def monitorTransfer
(cfg : TransferConfig) (args : Array String)
: LoggerIO Unit := do
if let .ok errMsg := out.getAs String "errormsg" then
msg := s!"{msg}\n curl error: {errMsg}"
msg := s!"{msg}\
\n local path: {info.path}\
\n remote URL: {info.url}"
match cfg.kind with
| .get =>
if let .ok size := out.getAs Nat "size_download" then
if size > 0 then
if let .ok contentType := out.getAs String "content_type" then
if contentType != artifactContentType then
if let .ok resp ← IO.FS.readFile info.path |>.toBaseIO then
msg := s!"{msg}\nunexpected response:\n{resp}"
removeFileIfExists info.path
| .put =>
if let .ok size := out.getAs Nat "size_download" then
if size > 0 then
if let some resp := String.fromUTF8? (← hOut.read size.toUSize) then
msg := s!"{msg}\nunexpected response:\n{resp}"
logError msg
logVerbose s!"curl JSON: {line.trimAsciiEnd}"

private def transferArtifacts
(cfg : TransferConfig)
: LoggerIO Unit := IO.FS.withTempFile fun h path => do
let args ← id do
match cfg.kind with
| .get =>
cfg.infos.forM fun info => do
h.putStrLn s!"url = {info.url}"
h.putStrLn s!"-o {info.path.toString.quote}"
h.flush
return #[
"-Z", "-X", "GET", "-L",
"--retry", "3", -- intermittent network errors can occur
"-s", "-w", "%{stderr}%{json}\n", "--config", path.toString
]
| .put =>
cfg.infos.forM fun info => do
h.putStrLn s!"-T {info.path.toString.quote}"
h.putStrLn s!"url = {info.url}"
h.flush
return #[
"-Z", "-X", "PUT", "-L",
"-H", s!"Content-Type: {artifactContentType}",
"--retry", "3", -- intermittent network errors can occur
"--aws-sigv4", "aws:amz:auto:s3", "--user", cfg.key,
"-s", "-w", "%{stderr}%{json}\n", "--config", path.toString
]
let child ← IO.Process.spawn {
cmd := "curl", args
stdout := .piped, stderr := .piped
}
let s ← monitorTransferLoop cfg child.stderr {}
let s ← monitorTransfer cfg child.stderr child.stdout {}
let rc ← child.wait
let stdout ← child.stdout.readToEnd
let mut didError := s.didError
Expand All @@ -797,27 +844,21 @@ public def downloadArtifacts
(descrs : Array ArtifactDescr) (cache : Cache)
(service : CacheService) (scope : CacheServiceScope) (force := false)
: LoggerIO Unit := do
IO.FS.withTempFile fun h path => do
IO.FS.createDirAll cache.artifactDir
let infos ← descrs.foldlM (init := #[]) fun s descr => do
let path := cache.artifactDir / descr.relPath
if force then
removeFileIfExists path
else if (← path.pathExists) then
return s
let url := service.artifactUrl descr.hash scope
h.putStrLn s!"url = {url}"
h.putStrLn s!"-o {path.toString.quote}"
return s.push {url, path, descr}
if infos.isEmpty then
return
h.flush
IO.FS.createDirAll cache.artifactDir
monitorTransfer {scope, infos, kind := .get} #[
"-Z", "-X", "GET", "-L",
"--retry", "3", -- intermittent network errors can occur
"-s", "-w", "%{stderr}%{json}\n", "--config", path.toString
]
if descrs.isEmpty then
logWarning "no artifacts to download"
return
let infos ← descrs.foldlM (init := #[]) fun s descr => do
let path := cache.artifactDir / descr.relPath
if force then
removeFileIfExists path
else if (← path.pathExists) then
return s
let url := service.artifactUrl descr.hash scope
return s.push {url, path, descr}
if infos.isEmpty then
return
IO.FS.createDirAll cache.artifactDir
transferArtifacts {scope, infos, kind := .get}

@[deprecated "Deprecated without replacement." (since := "2026-02-27")]
public def downloadOutputArtifacts
Expand All @@ -832,20 +873,13 @@ public def uploadArtifacts
(descrs : Vector ArtifactDescr n) (paths : Vector FilePath n)
(service : CacheService) (scope : CacheServiceScope)
: LoggerIO Unit := do
IO.FS.withTempFile fun h path => do
let infos ← n.foldM (init := #[]) fun i _ s => do
let url := service.artifactUrl descrs[i].hash scope
h.putStrLn s!"-T {paths[i].toString.quote}"
h.putStrLn s!"url = {url}"
return s.push {url, path := paths[i], descr := descrs[i]}
h.flush
monitorTransfer {scope, infos, kind := .put} #[
"-Z", "-X", "PUT", "-L",
"-H", s!"Content-Type: {artifactContentType}",
"--retry", "3", -- intermittent network errors can occur
"--aws-sigv4", "aws:amz:auto:s3", "--user", service.impl.key,
"-s", "-w", "%{stderr}%{json}\n", "--config", path.toString
]
if n = 0 then
logWarning "no artifacts to upload"
return
let infos ← n.foldM (init := #[]) fun i _ s => do
let url := service.artifactUrl descrs[i].hash scope
return s.push {url, path := paths[i], descr := descrs[i]}
transferArtifacts {scope, infos, kind := .put, key := service.impl.key}

/-! ### Output Transfer -/

Expand Down
Loading