Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 143 additions & 21 deletions src/lake/Lake/Config/Cache.lean
Original file line number Diff line number Diff line change
Expand Up @@ -682,24 +682,142 @@ public def downloadArtifact
let path := cache.artifactDir / descr.relPath
if (← path.pathExists) && !force then
return
logInfo s!"\
{scope}: downloading artifact {descr.hash}\
logInfo s!"{scope}: downloading artifact {descr.hash}\
\n local path: {path}\
\n remote URL: {url}"
downloadArtifactCore descr.hash url path

public def uploadArtifact
(contentHash : Hash) (art : FilePath) (service : CacheService) (scope : CacheServiceScope)
: LoggerIO Unit := do
let url := service.s3ArtifactUrl contentHash scope
logInfo s!"\
{scope}: uploading artifact {contentHash}\
\n local path: {art}\
\n remote URL: {url}"
uploadS3 art artifactContentType url service.impl.key

/-! ## Multi-Artifact Transfer -/

private inductive TransferKind
| get
| put

private structure TransferInfo where
url : String
path : FilePath
descr : ArtifactDescr

private structure TransferConfig where
kind : TransferKind
scope : CacheServiceScope
infos : Array TransferInfo

private structure TransferState where
didError : Bool := false
numSuccesses : Nat := 0

private partial def monitorTransferLoop
(cfg : TransferConfig) (h : IO.FS.Handle) (s : TransferState)
: LoggerIO TransferState := do
let line ← h.getLine
if line.trimAscii.isEmpty then
return s
else
let s ← (·.2) <$> StateT.run (s := s) do
match Json.parse line >>= fromJson? with
| .ok (res : JsonObject) =>
let some {url, path, descr} := getInfo? res
| logError s!"{cfg.scope}: unidentifiable transfer completed: {line.trimAscii}"
modify ({· with didError := true})
return
match res.get "http_code" with
| .ok 200
| .ok 201 =>
let action := match cfg.kind with | .get => "downloaded" | .put => "uploaded"
logInfo s!"{cfg.scope}: {action} artifact {descr.hash}\
\n local path: {path}\
\n remote URL: {url}"
let actualHash ← computeFileHash path
if actualHash != descr.hash then
logError s!"downloaded artifact does not have the expected hash"
IO.FS.removeFile path
modify ({· with didError := true})
else
modify fun s => {s with numSuccesses := s.numSuccesses + 1}
| code? =>
let msg? := res.getAs String "errormsg"
logError (mkFailureMsg descr.hash code? msg?)
if cfg.kind matches .get then
-- `curl --remove-on-error` can already do this, but only from 7.83 onwards
removeFileIfExists path
modify ({· with didError := true})
| .error e =>
logError s!"curl produced invalid JSON: {e}; received: {line.trimAscii}"
modify ({· with didError := true})
monitorTransferLoop cfg h s
where
getInfo? res :=
match res.getAs Nat "urlnum" with
| .ok i => cfg.infos[i]?
| _ => none
mkFailureMsg hash code? msg? : String := Id.run do
let action := match cfg.kind with | .get => "download" | .put => "upload"
let mut msg := s!"{cfg.scope}: failed to {action} artifact {hash}"
if let .ok code := code? then
msg := s!"{msg} (status code: {code})"
if let .ok errMsg := msg? then
msg := s!"{msg}: {errMsg}"
return msg

private def monitorTransfer
(cfg : TransferConfig) (args : Array String)
: LoggerIO Unit := do
let child ← IO.Process.spawn {
cmd := "curl", args
stdout := .piped, stderr := .piped
}
let s ← monitorTransferLoop cfg child.stderr {}
let rc ← child.wait
let stdout ← child.stdout.readToEnd
let mut didError := s.didError
if s.numSuccesses < cfg.infos.size then
let action := match cfg.kind with | .get => "download" | .put => "upload"
logError s!"{cfg.scope}: failed to {action} some artifacts"
didError := true
unless stdout.isEmpty do
logWarning s!"{cfg.scope}: curl produced unexpected output:\n{stdout.trimAsciiEnd}"
if rc != 0 then
logError s!"{cfg.scope}: curl exited with code {rc}"
didError := true
if s.didError then
failure

public def downloadArtifacts
(descrs : Array ArtifactDescr) (cache : Cache)
(service : CacheService) (scope : CacheServiceScope) (force := false)
: LoggerIO Unit := do
let ok ← descrs.foldlM (init := true) fun ok descr =>
try
service.downloadArtifact descr cache scope force
return ok
catch _ =>
return false
unless ok do
error s!"{scope}: failed to download some artifacts"
IO.FS.withTempFile fun h path => do
IO.FS.createDirAll cache.artifactDir
let infos ← descrs.foldlM (init := #[]) fun s descr => do
let path := cache.artifactDir / descr.relPath
if force then
removeFileIfExists path
else if (← path.pathExists) then
return s
let url := service.artifactUrl descr.hash scope
h.putStrLn s!"url = {url}"
h.putStrLn s!"-o {path.toString.quote}"
return s.push {url, path, descr}
if infos.isEmpty then
return
h.flush
IO.FS.createDirAll cache.artifactDir
monitorTransfer {scope, infos, kind := .get} #[
"-Z", "-X", "GET", "-L",
"--retry", "3", -- intermittent network errors can occur
"-s", "-w", "%{stderr}%{json}\n", "--config", path.toString
]

@[deprecated "Deprecated without replacement." (since := "2026-02-27")]
public def downloadOutputArtifacts
Expand All @@ -710,20 +828,24 @@ public def downloadOutputArtifacts
let descrs ← map.collectOutputDescrs
service.downloadArtifacts descrs cache remoteScope force

public def uploadArtifact
(contentHash : Hash) (art : FilePath) (service : CacheService) (scope : CacheServiceScope)
: LoggerIO Unit := do
let url := service.s3ArtifactUrl contentHash scope
logInfo s!"\
{scope}: uploading artifact {contentHash}\
\n local path: {art}\
\n remote URL: {url}"
uploadS3 art artifactContentType url service.impl.key

public def uploadArtifacts
(descrs : Vector ArtifactDescr n) (paths : Vector FilePath n)
(service : CacheService) (scope : CacheServiceScope)
: LoggerIO Unit := n.forM fun n h => service.uploadArtifact descrs[n].hash paths[n] scope
: LoggerIO Unit := do
IO.FS.withTempFile fun h path => do
let infos ← n.foldM (init := #[]) fun i _ s => do
let url := service.artifactUrl descrs[i].hash scope
h.putStrLn s!"-T {paths[i].toString.quote}"
h.putStrLn s!"url = {url}"
return s.push {url, path := paths[i], descr := descrs[i]}
h.flush
monitorTransfer {scope, infos, kind := .put} #[
"-Z", "-X", "PUT", "-L",
"-H", s!"Content-Type: {artifactContentType}",
"--retry", "3", -- intermittent network errors can occur
"--aws-sigv4", "aws:amz:auto:s3", "--user", service.impl.key,
"-s", "-w", "%{stderr}%{json}\n", "--config", path.toString
]

/-! ### Output Transfer -/

Expand Down
6 changes: 6 additions & 0 deletions src/lake/Lake/Util/JsonObject.lean
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,16 @@ public def getJson? (obj : JsonObject) (prop : String) : Option Json :=
| none => throw s!"property not found: {prop}"
| some val => fromJson? val |>.mapError (s!"{prop}: {·}")

@[inline] public def getAs (α) [FromJson α] (obj : JsonObject) (prop : String) : Except String α :=
obj.get prop

@[inline] public def get? [FromJson α] (obj : JsonObject) (prop : String) : Except String (Option α) :=
match obj.getJson? prop with
| none => pure none
| some val => fromJson? val |>.mapError (s!"{prop}: {·}")

@[inline] public def getAs? (α) [FromJson α] (obj : JsonObject) (prop : String) : Except String (Option α) :=
obj.get? prop

@[macro_inline, expose] public def getD [FromJson α] (obj : JsonObject) (prop : String) (default : α) : Except String α := do
return (← obj.get? prop).getD default
Loading