diff --git a/src/lake/Lake/Config/Cache.lean b/src/lake/Lake/Config/Cache.lean index b961a5021cbd..a847407eb9a7 100644 --- a/src/lake/Lake/Config/Cache.lean +++ b/src/lake/Lake/Config/Cache.lean @@ -682,24 +682,142 @@ public def downloadArtifact let path := cache.artifactDir / descr.relPath if (← path.pathExists) && !force then return - logInfo s!"\ - {scope}: downloading artifact {descr.hash}\ + logInfo s!"{scope}: downloading artifact {descr.hash}\ \n local path: {path}\ \n remote URL: {url}" downloadArtifactCore descr.hash url path +public def uploadArtifact + (contentHash : Hash) (art : FilePath) (service : CacheService) (scope : CacheServiceScope) +: LoggerIO Unit := do + let url := service.s3ArtifactUrl contentHash scope + logInfo s!"\ + {scope}: uploading artifact {contentHash}\ + \n local path: {art}\ + \n remote URL: {url}" + uploadS3 art artifactContentType url service.impl.key + +/-! ## Multi-Artifact Transfer -/ + +private inductive TransferKind +| get +| put + +private structure TransferInfo where + url : String + path : FilePath + descr : ArtifactDescr + +private structure TransferConfig where + kind : TransferKind + scope : CacheServiceScope + infos : Array TransferInfo + +private structure TransferState where + didError : Bool := false + numSuccesses : Nat := 0 + +private partial def monitorTransferLoop + (cfg : TransferConfig) (h : IO.FS.Handle) (s : TransferState) +: LoggerIO TransferState := do + let line ← h.getLine + if line.trimAscii.isEmpty then + return s + else + let s ← (·.2) <$> StateT.run (s := s) do + match Json.parse line >>= fromJson? with + | .ok (res : JsonObject) => + let some {url, path, descr} := getInfo? res + | logError s!"{cfg.scope}: unidentifiable transfer completed: {line.trimAscii}" + modify ({· with didError := true}) + return + match res.get "http_code" with + | .ok 200 + | .ok 201 => + let action := match cfg.kind with | .get => "downloaded" | .put => "uploaded" + logInfo s!"{cfg.scope}: {action} artifact {descr.hash}\ + \n local path: {path}\ + \n remote URL: {url}" + let actualHash ← computeFileHash path + if actualHash != descr.hash then + logError s!"downloaded artifact does not have the expected hash" + IO.FS.removeFile path + modify ({· with didError := true}) + else + modify fun s => {s with numSuccesses := s.numSuccesses + 1} + | code? => + let msg? := res.getAs String "errormsg" + logError (mkFailureMsg descr.hash code? msg?) + if cfg.kind matches .get then + -- `curl --remove-on-error` can already do this, but only from 7.83 onwards + removeFileIfExists path + modify ({· with didError := true}) + | .error e => + logError s!"curl produced invalid JSON: {e}; received: {line.trimAscii}" + modify ({· with didError := true}) + monitorTransferLoop cfg h s +where + getInfo? res := + match res.getAs Nat "urlnum" with + | .ok i => cfg.infos[i]? + | _ => none + mkFailureMsg hash code? msg? : String := Id.run do + let action := match cfg.kind with | .get => "download" | .put => "upload" + let mut msg := s!"{cfg.scope}: failed to {action} artifact {hash}" + if let .ok code := code? then + msg := s!"{msg} (status code: {code})" + if let .ok errMsg := msg? then + msg := s!"{msg}: {errMsg}" + return msg + +private def monitorTransfer + (cfg : TransferConfig) (args : Array String) +: LoggerIO Unit := do + let child ← IO.Process.spawn { + cmd := "curl", args + stdout := .piped, stderr := .piped + } + let s ← monitorTransferLoop cfg child.stderr {} + let rc ← child.wait + let stdout ← child.stdout.readToEnd + let mut didError := s.didError + if s.numSuccesses < cfg.infos.size then + let action := match cfg.kind with | .get => "download" | .put => "upload" + logError s!"{cfg.scope}: failed to {action} some artifacts" + didError := true + unless stdout.isEmpty do + logWarning s!"{cfg.scope}: curl produced unexpected output:\n{stdout.trimAsciiEnd}" + if rc != 0 then + logError s!"{cfg.scope}: curl exited with code {rc}" + didError := true + if s.didError then + failure + public def downloadArtifacts (descrs : Array ArtifactDescr) (cache : Cache) (service : CacheService) (scope : CacheServiceScope) (force := false) : LoggerIO Unit := do - let ok ← descrs.foldlM (init := true) fun ok descr => - try - service.downloadArtifact descr cache scope force - return ok - catch _ => - return false - unless ok do - error s!"{scope}: failed to download some artifacts" + IO.FS.withTempFile fun h path => do + IO.FS.createDirAll cache.artifactDir + let infos ← descrs.foldlM (init := #[]) fun s descr => do + let path := cache.artifactDir / descr.relPath + if force then + removeFileIfExists path + else if (← path.pathExists) then + return s + let url := service.artifactUrl descr.hash scope + h.putStrLn s!"url = {url}" + h.putStrLn s!"-o {path.toString.quote}" + return s.push {url, path, descr} + if infos.isEmpty then + return + h.flush + IO.FS.createDirAll cache.artifactDir + monitorTransfer {scope, infos, kind := .get} #[ + "-Z", "-X", "GET", "-L", + "--retry", "3", -- intermittent network errors can occur + "-s", "-w", "%{stderr}%{json}\n", "--config", path.toString + ] @[deprecated "Deprecated without replacement." (since := "2026-02-27")] public def downloadOutputArtifacts @@ -710,20 +828,24 @@ public def downloadOutputArtifacts let descrs ← map.collectOutputDescrs service.downloadArtifacts descrs cache remoteScope force -public def uploadArtifact - (contentHash : Hash) (art : FilePath) (service : CacheService) (scope : CacheServiceScope) -: LoggerIO Unit := do - let url := service.s3ArtifactUrl contentHash scope - logInfo s!"\ - {scope}: uploading artifact {contentHash}\ - \n local path: {art}\ - \n remote URL: {url}" - uploadS3 art artifactContentType url service.impl.key - public def uploadArtifacts (descrs : Vector ArtifactDescr n) (paths : Vector FilePath n) (service : CacheService) (scope : CacheServiceScope) -: LoggerIO Unit := n.forM fun n h => service.uploadArtifact descrs[n].hash paths[n] scope +: LoggerIO Unit := do + IO.FS.withTempFile fun h path => do + let infos ← n.foldM (init := #[]) fun i _ s => do + let url := service.artifactUrl descrs[i].hash scope + h.putStrLn s!"-T {paths[i].toString.quote}" + h.putStrLn s!"url = {url}" + return s.push {url, path := paths[i], descr := descrs[i]} + h.flush + monitorTransfer {scope, infos, kind := .put} #[ + "-Z", "-X", "PUT", "-L", + "-H", s!"Content-Type: {artifactContentType}", + "--retry", "3", -- intermittent network errors can occur + "--aws-sigv4", "aws:amz:auto:s3", "--user", service.impl.key, + "-s", "-w", "%{stderr}%{json}\n", "--config", path.toString + ] /-! ### Output Transfer -/ diff --git a/src/lake/Lake/Util/JsonObject.lean b/src/lake/Lake/Util/JsonObject.lean index 565a77cfc576..691a010df017 100644 --- a/src/lake/Lake/Util/JsonObject.lean +++ b/src/lake/Lake/Util/JsonObject.lean @@ -63,10 +63,16 @@ public def getJson? (obj : JsonObject) (prop : String) : Option Json := | none => throw s!"property not found: {prop}" | some val => fromJson? val |>.mapError (s!"{prop}: {·}") +@[inline] public def getAs (α) [FromJson α] (obj : JsonObject) (prop : String) : Except String α := + obj.get prop + @[inline] public def get? [FromJson α] (obj : JsonObject) (prop : String) : Except String (Option α) := match obj.getJson? prop with | none => pure none | some val => fromJson? val |>.mapError (s!"{prop}: {·}") +@[inline] public def getAs? (α) [FromJson α] (obj : JsonObject) (prop : String) : Except String (Option α) := + obj.get? prop + @[macro_inline, expose] public def getD [FromJson α] (obj : JsonObject) (prop : String) (default : α) : Except String α := do return (← obj.get? prop).getD default