Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
270 changes: 128 additions & 142 deletions src/lib/vm/image/upload.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,44 +7,43 @@ import { clearInterval, setInterval } from "node:timers";
import retry from "async-retry";
import ora, { type Ora } from "ora";
import cliSpinners from "npm:cli-spinners";
import axios from "axios";
import { apiClient } from "../../../apiClient.ts";

async function readChunk(
filePath: string,
start: number,
chunkSize: number,
length: number,
onProgress?: (bytesRead: number) => void,
): Promise<Uint8Array> {
using file = await Deno.open(filePath, { read: true });
await file.seek(start, Deno.SeekMode.Start);

const buffer = new Uint8Array(chunkSize);
let totalBytesRead = 0;
let emptyReadCount = 0;
const maxEmptyReads = 100;

while (totalBytesRead < chunkSize) {
const bytesRead = await file.read(buffer.subarray(totalBytesRead));
if (bytesRead === null) {
// EOF reached
break;
}
if (bytesRead === 0) {
// No bytes read but not EOF, continue looping
emptyReadCount++;
if (emptyReadCount >= maxEmptyReads) {
throw new Error(
`Failed to read chunk: reached ${maxEmptyReads} consecutive empty reads without EOF`,
);
const file = await Deno.open(filePath, { read: true });
try {
await file.seek(start, Deno.SeekMode.Start);

const buffer = new Uint8Array(length);
let offset = 0;

while (offset < length) {
const bytesRead = await file.read(buffer.subarray(offset));
if (bytesRead === null) {
// EOF reached
break;
}
offset += bytesRead;
if (onProgress) {
onProgress(bytesRead);
}
continue;
}
// Non-empty read, reset counter
emptyReadCount = 0;
totalBytesRead += bytesRead;
}

return buffer.subarray(0, totalBytesRead);
if (offset !== length) {
throw new Error(
`Short read: expected ${length} bytes, got ${offset} bytes`,
);
}

return buffer;
} finally {
file.close();
}
}

const upload = new Command("upload")
Expand All @@ -67,6 +66,7 @@ const upload = new Command("upload")
let preparingSpinner: Ora | undefined;
let finalizingSpinner: Ora | undefined;
let spinnerTimer: NodeJS.Timeout | undefined;
let progressBar: cliProgress.SingleBar | undefined;

try {
preparingSpinner = ora(`Preparing upload for ${name}...`).start();
Expand Down Expand Up @@ -95,14 +95,16 @@ const upload = new Command("upload")
const fileSize = fileInfo.size;

// Calculate parts for progress tracking
// These magic numbers are not the hard limits, but we don't trust R2 to document them.
const minChunk = 6 * 1024 * 1024; // 6 MiB
const maxParts = 100;
const chunkSize = Math.max(
minChunk,
Math.ceil(fileSize / maxParts),
250 * 1024 * 1024,
); // 250 MiB
const minChunk = 5 * 1024 * 1024; // 5 MiB (minimum)
const defaultChunk = 64 * 1024 * 1024; // 64 MiB
const maxParts = 10000; // object storage supports up to 10k parts

// For files smaller than default chunk, use the whole file as one part
// Otherwise use default chunk size, but ensure we don't exceed maxParts
const chunkSize = fileSize <= defaultChunk
? Math.max(fileSize, minChunk)
: Math.max(minChunk, Math.ceil(fileSize / maxParts), defaultChunk);

const totalParts = Math.ceil(fileSize / chunkSize);

// Calculate upload parts metadata
Expand All @@ -121,8 +123,6 @@ const upload = new Command("upload")

// Create combined ora + progress bar with per-part progress tracking
const startTime = Date.now();
let lastSpeed = "0 B/s";

// Track progress per part to handle retries correctly
const partProgress = new Map<number, number>(); // part -> bytes uploaded

Expand All @@ -139,13 +139,13 @@ const upload = new Command("upload")
const spinner = cliSpinners.dots;
let spinnerIndex = 0;

const progressBar = new cliProgress.SingleBar({
progressBar = new cliProgress.SingleBar({
format:
`{spinner} Uploading [{bar}] {percentage}% | {uploadedMB}/{totalMB} MB | {speed}`,
barCompleteChar: "\u2588",
barIncompleteChar: "\u2591",
hideCursor: true,
forceRedraw: true,
forceRedraw: false,
});

progressBar.start(fileSize, 0, {
Expand All @@ -155,26 +155,14 @@ const upload = new Command("upload")
totalMB: (fileSize / (1024 * 1024)).toFixed(1),
});

// Create a timer to animate the spinner at the correct interval
spinnerTimer = setInterval(() => {
spinnerIndex++;
const totalBytesUploaded = getTotalBytesUploaded();
// Force a redraw to animate the spinner
progressBar.update(totalBytesUploaded, {
spinner: spinner.frames[spinnerIndex % spinner.frames.length],
speed: lastSpeed || "0 B/s",
uploadedMB: (totalBytesUploaded / (1024 * 1024)).toFixed(1),
totalMB: (fileSize / (1024 * 1024)).toFixed(1),
});
}, spinner.interval);

const updateProgress = (part: number, bytesUploaded: number) => {
const previousBytes = partProgress.get(part) || 0;
partProgress.set(part, previousBytes + bytesUploaded);
// Throttle UI updates to 200ms
const UI_UPDATE_INTERVAL_MS = 200;
let lastUIUpdate = 0;

const renderProgress = () => {
const totalBytesUploaded = getTotalBytesUploaded();
const elapsedTime = (Date.now() - startTime) / 1000; // seconds
const speed = totalBytesUploaded / elapsedTime; // bytes per second
const elapsedTime = (Date.now() - startTime) / 1000;
const speed = totalBytesUploaded / elapsedTime;

// Format speed
let speedStr: string;
Expand All @@ -186,9 +174,6 @@ const upload = new Command("upload")
speedStr = `${speed.toFixed(0)} B/s`;
}

// Store values for spinner animation
lastSpeed = speedStr;

progressBar.update(totalBytesUploaded, {
spinner: spinner.frames[spinnerIndex % spinner.frames.length],
speed: speedStr,
Expand All @@ -197,20 +182,23 @@ const upload = new Command("upload")
});
};

const resetPartProgress = (part: number) => {
const previousBytes = partProgress.get(part) || 0;
if (previousBytes > 0) {
partProgress.set(part, 0);

const totalBytesUploaded = getTotalBytesUploaded();
// Update progress bar to reflect the reset
progressBar.update(totalBytesUploaded, {
spinner: spinner.frames[spinnerIndex % spinner.frames.length],
speed: lastSpeed || "0 B/s",
uploadedMB: (totalBytesUploaded / (1024 * 1024)).toFixed(1),
totalMB: (fileSize / (1024 * 1024)).toFixed(1),
});
// Create a timer to animate the spinner and update progress
spinnerTimer = setInterval(() => {
spinnerIndex++;
const now = Date.now();
if (now - lastUIUpdate >= UI_UPDATE_INTERVAL_MS) {
renderProgress();
lastUIUpdate = now;
}
}, spinner.interval);

const updateProgress = (part: number, bytesUploaded: number) => {
const previousBytes = partProgress.get(part) || 0;
partProgress.set(part, previousBytes + bytesUploaded);
};

const resetPartProgress = (part: number) => {
partProgress.set(part, 0);
};

// Upload parts concurrently with specified concurrency limit
Expand All @@ -223,9 +211,15 @@ const upload = new Command("upload")
) => {
const chunkSize = end - start;

// Step 1: Fetch upload URL with retry
const url = await retry(
async () => {
// Upload the chunk with retry, fetching fresh URL each attempt
await retry(
async (bail: (e: Error) => void, attemptNumber: number) => {
// Reset progress for this part on retry (except first attempt)
if (attemptNumber > 1) {
resetPartProgress(part);
}

// Fetch fresh upload URL for this attempt
const response = await client.POST(
"/v1/vms/images/{image_id}/upload",
{
Expand All @@ -241,81 +235,66 @@ const upload = new Command("upload")
);

if (!response.response.ok || !response.data) {
const status = response.response.status;
const errorText = response.response.ok
? "No data in response"
: await response.response.text();
: await response.response.text().catch(() => "");

// Bail on non-transient 4xx errors (except 408 Request Timeout and 429 Too Many Requests)
if (
status >= 400 && status < 500 && status !== 408 &&
status !== 429
) {
bail(
new Error(
`Failed to get upload URL for part ${part}: ${status} ${response.response.statusText} - ${errorText}`,
),
);
return;
}

throw new Error(
`Failed to get upload URL for part ${part}: ${response.response.status} ${response.response.statusText} - ${errorText}`,
`Failed to get upload URL for part ${part}: ${status} ${response.response.statusText} - ${errorText}`,
);
}

return response.data.upload_url;
},
{
retries: 3,
factor: 2,
randomize: true,
},
);

// Step 2: Upload the chunk with retry
await retry(
async (_: unknown, _attemptNumber: number) => {
// Reset progress for this part on retry (except first attempt)
if (_attemptNumber > 1) {
resetPartProgress(part);
}

const chunk = await readChunk(filePath, start, chunkSize);

// Track upload progress with axios
let lastUploadedBytes = 0;
const url = response.data.upload_url;

try {
const res = await axios.put(url, chunk, {
headers: {
"Content-Type": "application/octet-stream",
"Content-Length": chunk.length.toString(),
},
onUploadProgress: (progressEvent) => {
const uploadedBytes = progressEvent.loaded || 0;
const deltaBytes = uploadedBytes - lastUploadedBytes;

if (deltaBytes > 0) {
updateProgress(part, deltaBytes);
lastUploadedBytes = uploadedBytes;
}
},
maxRedirects: 0,
});
// Read chunk from disk with progress tracking
const payload = await readChunk(
filePath,
start,
chunkSize,
(bytesRead) => {
updateProgress(part, bytesRead);
},
);

if (res.status < 200 || res.status >= 300) {
throw new Error(
`Part ${part} upload failed: ${res.status} ${res.statusText}`,
);
}
} catch (err) {
// Log Cloudflare/R2 specific errors
if (axios.isAxiosError(err)) {
const cfRay = err.response?.headers?.["cf-ray"];
const cfCacheStatus = err.response?.headers
?.["cf-cache-status"];
console.error(gray(`\nPart ${part} upload error:`));
console.error(
gray(
` Status: ${err.response?.status} ${
err.response?.statusText || ""
}`,
const res = await fetch(url, {
method: "PUT",
headers: {
"Content-Type": "application/octet-stream",
},
body: payload,
});

if (!res.ok) {
// Bail on non-transient 4xx errors (except 408 and 429)
if (
res.status >= 400 && res.status < 500 && res.status !== 408 &&
res.status !== 429
) {
bail(
new Error(
`Part ${part} upload failed: ${res.status} ${res.statusText}`,
),
);
console.error(gray(` Error code: ${err.code || "unknown"}`));
if (cfRay) console.error(gray(` Cloudflare Ray ID: ${cfRay}`));
if (cfCacheStatus) {
console.error(gray(` CF Cache Status: ${cfCacheStatus}`));
}
console.error(gray(` Message: ${err.message}`));
return;
}
throw err;

throw new Error(
`Part ${part} upload failed: ${res.status} ${res.statusText}`,
);
}
},
{
Expand Down Expand Up @@ -404,6 +383,13 @@ const upload = new Command("upload")
spinnerTimer = undefined;
}

// Stop progress bar
try {
progressBar?.stop();
} catch {
// Ignore if progress bar not started
}

// Stop any running spinners on error
if (preparingSpinner?.isSpinning) {
preparingSpinner.fail(
Expand Down