diff --git a/src/lake/Lake/Build/Common.lean b/src/lake/Lake/Build/Common.lean index d1f0205173ba..329470921ed0 100644 --- a/src/lake/Lake/Build/Common.lean +++ b/src/lake/Lake/Build/Common.lean @@ -574,7 +574,7 @@ public def resolveArtifact match (← getMTime path |>.toBaseIO) with | .ok mtime => return {descr, path, mtime} - | .error (.noFileOrDirectory ..) => + | .error (.noFileOrDirectory ..) => withLogErrorPos do -- we redownload artifacts on any error if let some service := service? then updateAction .fetch diff --git a/src/lake/Lake/Config/Cache.lean b/src/lake/Lake/Config/Cache.lean index a847407eb9a7..28a61b65c456 100644 --- a/src/lake/Lake/Config/Cache.lean +++ b/src/lake/Lake/Config/Cache.lean @@ -558,9 +558,10 @@ public def downloadArtifactCore (hash : Hash) (url : String) (path : FilePath) : download url path let actualHash ← computeFileHash path if actualHash != hash then - logError s!"{path}: downloaded artifact does not have the expected hash" + let errPos ← getLogPos + logError s!"{path}: downloaded artifact hash mismatch, got {actualHash}" IO.FS.removeFile path - failure + throw errPos /-- Uploads a file to an online bucket using the Amazon S3 protocol. -/ def uploadS3 @@ -700,8 +701,9 @@ public def uploadArtifact /-! ## Multi-Artifact Transfer -/ private inductive TransferKind -| get -| put + | get + | put + deriving DecidableEq private structure TransferInfo where url : String @@ -712,13 +714,14 @@ private structure TransferConfig where kind : TransferKind scope : CacheServiceScope infos : Array TransferInfo + key : String := "" private structure TransferState where didError : Bool := false numSuccesses : Nat := 0 -private partial def monitorTransferLoop - (cfg : TransferConfig) (h : IO.FS.Handle) (s : TransferState) +private partial def monitorTransfer + (cfg : TransferConfig) (h hOut : IO.FS.Handle) (s : TransferState) : LoggerIO TransferState := do let line ← h.getLine if line.trimAscii.isEmpty then @@ -726,58 +729,102 @@ private partial def monitorTransferLoop else let s ← (·.2) <$> StateT.run (s := s) do match Json.parse line >>= fromJson? with - | .ok (res : JsonObject) => - let some {url, path, descr} := getInfo? res + | .ok (out : JsonObject) => + let some info@{url, path, descr} := getInfo? out | logError s!"{cfg.scope}: unidentifiable transfer completed: {line.trimAscii}" modify ({· with didError := true}) return - match res.get "http_code" with + match out.get "http_code" with | .ok 200 | .ok 201 => - let action := match cfg.kind with | .get => "downloaded" | .put => "uploaded" - logInfo s!"{cfg.scope}: {action} artifact {descr.hash}\ - \n local path: {path}\ - \n remote URL: {url}" - let actualHash ← computeFileHash path - if actualHash != descr.hash then - logError s!"downloaded artifact does not have the expected hash" - IO.FS.removeFile path - modify ({· with didError := true}) - else + match cfg.kind with + | .get => + logInfo s!"{cfg.scope}: downloaded artifact {descr.hash}\ + \n local path: {path}\ + \n remote URL: {url}" + let actualHash ← computeFileHash path + if actualHash != descr.hash then + logError s!"{path}: downloaded artifact hash mismatch, got {actualHash}" + IO.FS.removeFile path + modify ({· with didError := true}) + else + modify fun s => {s with numSuccesses := s.numSuccesses + 1} + | .put => + logInfo s!"{cfg.scope}: uploaded artifact {descr.hash}\ + \n local path: {path}\ + \n remote URL: {url}" modify fun s => {s with numSuccesses := s.numSuccesses + 1} | code? => - let msg? := res.getAs String "errormsg" - logError (mkFailureMsg descr.hash code? msg?) - if cfg.kind matches .get then - -- `curl --remove-on-error` can already do this, but only from 7.83 onwards - removeFileIfExists path + handleFailure info code? out line modify ({· with didError := true}) | .error e => logError s!"curl produced invalid JSON: {e}; received: {line.trimAscii}" modify ({· with didError := true}) - monitorTransferLoop cfg h s + monitorTransfer cfg h hOut s where - getInfo? res := - match res.getAs Nat "urlnum" with + getInfo? out := + match out.getAs Nat "urlnum" with | .ok i => cfg.infos[i]? | _ => none - mkFailureMsg hash code? msg? : String := Id.run do + handleFailure info code? out line : LoggerIO Unit := do let action := match cfg.kind with | .get => "download" | .put => "upload" - let mut msg := s!"{cfg.scope}: failed to {action} artifact {hash}" + let mut msg := s!"{cfg.scope}: failed to {action} artifact {info.descr.hash}" if let .ok code := code? then msg := s!"{msg} (status code: {code})" - if let .ok errMsg := msg? then - msg := s!"{msg}: {errMsg}" - return msg - -private def monitorTransfer - (cfg : TransferConfig) (args : Array String) -: LoggerIO Unit := do + if let .ok errMsg := out.getAs String "errormsg" then + msg := s!"{msg}\n curl error: {errMsg}" + msg := s!"{msg}\ + \n local path: {info.path}\ + \n remote URL: {info.url}" + match cfg.kind with + | .get => + if let .ok size := out.getAs Nat "size_download" then + if size > 0 then + if let .ok contentType := out.getAs String "content_type" then + if contentType != artifactContentType then + if let .ok resp ← IO.FS.readFile info.path |>.toBaseIO then + msg := s!"{msg}\nunexpected response:\n{resp}" + removeFileIfExists info.path + | .put => + if let .ok size := out.getAs Nat "size_download" then + if size > 0 then + if let some resp := String.fromUTF8? (← hOut.read size.toUSize) then + msg := s!"{msg}\nunexpected response:\n{resp}" + logError msg + logVerbose s!"curl JSON: {line.trimAsciiEnd}" + +private def transferArtifacts + (cfg : TransferConfig) +: LoggerIO Unit := IO.FS.withTempFile fun h path => do + let args ← id do + match cfg.kind with + | .get => + cfg.infos.forM fun info => do + h.putStrLn s!"url = {info.url}" + h.putStrLn s!"-o {info.path.toString.quote}" + h.flush + return #[ + "-Z", "-X", "GET", "-L", + "--retry", "3", -- intermittent network errors can occur + "-s", "-w", "%{stderr}%{json}\n", "--config", path.toString + ] + | .put => + cfg.infos.forM fun info => do + h.putStrLn s!"-T {info.path.toString.quote}" + h.putStrLn s!"url = {info.url}" + h.flush + return #[ + "-Z", "-X", "PUT", "-L", + "-H", s!"Content-Type: {artifactContentType}", + "--retry", "3", -- intermittent network errors can occur + "--aws-sigv4", "aws:amz:auto:s3", "--user", cfg.key, + "-s", "-w", "%{stderr}%{json}\n", "--config", path.toString + ] let child ← IO.Process.spawn { cmd := "curl", args stdout := .piped, stderr := .piped } - let s ← monitorTransferLoop cfg child.stderr {} + let s ← monitorTransfer cfg child.stderr child.stdout {} let rc ← child.wait let stdout ← child.stdout.readToEnd let mut didError := s.didError @@ -797,27 +844,21 @@ public def downloadArtifacts (descrs : Array ArtifactDescr) (cache : Cache) (service : CacheService) (scope : CacheServiceScope) (force := false) : LoggerIO Unit := do - IO.FS.withTempFile fun h path => do - IO.FS.createDirAll cache.artifactDir - let infos ← descrs.foldlM (init := #[]) fun s descr => do - let path := cache.artifactDir / descr.relPath - if force then - removeFileIfExists path - else if (← path.pathExists) then - return s - let url := service.artifactUrl descr.hash scope - h.putStrLn s!"url = {url}" - h.putStrLn s!"-o {path.toString.quote}" - return s.push {url, path, descr} - if infos.isEmpty then - return - h.flush - IO.FS.createDirAll cache.artifactDir - monitorTransfer {scope, infos, kind := .get} #[ - "-Z", "-X", "GET", "-L", - "--retry", "3", -- intermittent network errors can occur - "-s", "-w", "%{stderr}%{json}\n", "--config", path.toString - ] + if descrs.isEmpty then + logWarning "no artifacts to download" + return + let infos ← descrs.foldlM (init := #[]) fun s descr => do + let path := cache.artifactDir / descr.relPath + if force then + removeFileIfExists path + else if (← path.pathExists) then + return s + let url := service.artifactUrl descr.hash scope + return s.push {url, path, descr} + if infos.isEmpty then + return + IO.FS.createDirAll cache.artifactDir + transferArtifacts {scope, infos, kind := .get} @[deprecated "Deprecated without replacement." (since := "2026-02-27")] public def downloadOutputArtifacts @@ -832,20 +873,13 @@ public def uploadArtifacts (descrs : Vector ArtifactDescr n) (paths : Vector FilePath n) (service : CacheService) (scope : CacheServiceScope) : LoggerIO Unit := do - IO.FS.withTempFile fun h path => do - let infos ← n.foldM (init := #[]) fun i _ s => do - let url := service.artifactUrl descrs[i].hash scope - h.putStrLn s!"-T {paths[i].toString.quote}" - h.putStrLn s!"url = {url}" - return s.push {url, path := paths[i], descr := descrs[i]} - h.flush - monitorTransfer {scope, infos, kind := .put} #[ - "-Z", "-X", "PUT", "-L", - "-H", s!"Content-Type: {artifactContentType}", - "--retry", "3", -- intermittent network errors can occur - "--aws-sigv4", "aws:amz:auto:s3", "--user", service.impl.key, - "-s", "-w", "%{stderr}%{json}\n", "--config", path.toString - ] + if n = 0 then + logWarning "no artifacts to upload" + return + let infos ← n.foldM (init := #[]) fun i _ s => do + let url := service.artifactUrl descrs[i].hash scope + return s.push {url, path := paths[i], descr := descrs[i]} + transferArtifacts {scope, infos, kind := .put, key := service.impl.key} /-! ### Output Transfer -/