Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 1 addition & 9 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,9 @@ jobs:
streamkit-${{ github.ref_name }}-linux-x64.tar.gz
streamkit-${{ github.ref_name }}-linux-x64.tar.gz.sha256

marketplace:
uses: ./.github/workflows/marketplace-build.yml
permissions:
contents: write
pull-requests: write
secrets: inherit

create-release:
name: Create GitHub Release
needs: [build-linux-x64, marketplace]
needs: [build-linux-x64]
runs-on: ubuntu-22.04
permissions:
contents: write
Expand Down Expand Up @@ -130,7 +123,6 @@ jobs:
files: |
artifacts/**/streamkit-*.tar.gz
artifacts/**/streamkit-*.tar.gz.sha256
artifacts/**/marketplace-bundles/*.tar.zst
body_path: changelog.md
draft: false
prerelease: ${{ contains(github.ref_name, '-rc') || contains(github.ref_name, '-beta') || contains(github.ref_name, '-alpha') }}
Expand Down
157 changes: 127 additions & 30 deletions apps/skit/src/marketplace_installer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ const REGISTRY_TIMEOUT_SECS: u64 = 20;
const REGISTRY_INDEX_TTL_SECS: u64 = 60;
const REGISTRY_MANIFEST_TTL_SECS: u64 = 60;
const MAX_JOB_HISTORY: usize = 200;
const DOWNLOAD_CONNECT_TIMEOUT_SECS: u64 = 30;
const DOWNLOAD_READ_TIMEOUT_SECS: u64 = 60;

#[derive(Debug, Clone, Deserialize)]
pub struct InstallPluginRequest {
Expand Down Expand Up @@ -300,6 +302,7 @@ impl InstallJobQueue {
tracker.mark_cancelled("Cancelled").await;
},
Err(InstallError::Other(err)) => {
tracing::error!(job_id = %context.job_id, error = %err, "Install job failed");
tracker.set_status(JobStatus::Failed, format!("Install failed: {err}")).await;
},
}
Expand Down Expand Up @@ -456,6 +459,7 @@ impl JobTracker {
}

async fn fail_step(&self, step_name: &str, error: String) {
tracing::error!(job_id = %self.job_id, step = %step_name, error = %error, "Install step failed");
self.queue
.update_job(&self.job_id, |info| {
if let Some(step) = info.steps.iter_mut().find(|step| step.name == step_name) {
Expand Down Expand Up @@ -533,7 +537,8 @@ impl PluginInstaller {
settings: PluginInstallerSettings,
) -> Result<Self> {
let download_client = Client::builder()
.timeout(Duration::from_secs(REGISTRY_TIMEOUT_SECS))
.connect_timeout(Duration::from_secs(DOWNLOAD_CONNECT_TIMEOUT_SECS))
.read_timeout(Duration::from_secs(DOWNLOAD_READ_TIMEOUT_SECS))
.redirect(reqwest::redirect::Policy::none())
.build()
.context("Failed to build bundle HTTP client")?;
Expand Down Expand Up @@ -935,33 +940,73 @@ impl PluginInstaller {
plugin_paths::validate_path_component("bundle file name", file_name)?;

let bundle_path = cache_dir.join(file_name);
let temp_path = cache_dir.join(format!(".download-{}", Uuid::new_v4()));
let temp_path = cache_dir.join(format!(".download-{file_name}"));

let mut hash_mismatch = false;
let download_result: Result<(), InstallError> = async {
// Check for existing partial download to enable resume.
let existing_len = tokio::fs::metadata(&temp_path).await.ok().map(|m| m.len());
let resume_from = existing_len.filter(|&len| len > 0);

let (response, final_url) = validated_get_response(
&self.download_client,
&self.marketplace_policy,
"bundle url",
&bundle_url,
Some(registry_origin),
None,
resume_from,
)
.await?;
let response = response
.error_for_status()
.with_context(|| format!("Bundle download failed for {final_url}"))?;
let total_bytes = response.content_length();

let is_partial = response.status() == reqwest::StatusCode::PARTIAL_CONTENT;
let total_bytes = if is_partial {
// For 206 responses, content_length is the remaining bytes.
response.content_length().map(|cl| cl + resume_from.unwrap_or(0))
} else {
response.content_length()
};
let mut stream = response.bytes_stream();

let mut file = tokio::fs::File::create(&temp_path).await.with_context(|| {
format!("Failed to create bundle file {temp_path}", temp_path = temp_path.display())
})?;
let mut hasher = Sha256::new();
let mut bytes_done = 0u64;
let (mut file, mut hasher, mut bytes_done) = if is_partial {
let offset = resume_from.unwrap_or(0);
// Re-hash existing bytes for SHA256 continuity.
let existing_data = tokio::fs::read(&temp_path).await.with_context(|| {
format!(
"Failed to read existing temp file {temp_path}",
temp_path = temp_path.display()
)
})?;
let mut hasher = Sha256::new();
hasher.update(&existing_data);
let file = tokio::fs::OpenOptions::new()
.append(true)
.open(&temp_path)
.await
.with_context(|| {
format!(
"Failed to open bundle file for append {temp_path}",
temp_path = temp_path.display()
)
})?;
(file, hasher, offset)
} else {
let file = tokio::fs::File::create(&temp_path).await.with_context(|| {
format!(
"Failed to create bundle file {temp_path}",
temp_path = temp_path.display()
)
})?;
(file, Sha256::new(), 0u64)
};

while let Some(chunk) = stream.next().await {
let chunk = chunk.with_context(|| "Failed to read bundle download stream")?;
if cancel.is_cancelled() {
let _ = file.flush().await;
return Err(InstallError::Cancelled);
}
file.write_all(&chunk).await.with_context(|| {
Expand Down Expand Up @@ -989,6 +1034,7 @@ impl PluginInstaller {
if !actual_hash.eq_ignore_ascii_case(&manifest.bundle.sha256) {
let expected = manifest.bundle.sha256.as_str();
let actual = actual_hash.as_str();
hash_mismatch = true;
return Err(
anyhow!("Bundle hash mismatch: expected {expected}, got {actual}").into()
);
Expand All @@ -999,7 +1045,11 @@ impl PluginInstaller {
.await;

if let Err(err) = download_result {
let _ = tokio::fs::remove_file(&temp_path).await;
// Only delete temp file on hash mismatch; keep it for network errors
// and cancellations so the next attempt can resume from the partial file.
if hash_mismatch {
let _ = tokio::fs::remove_file(&temp_path).await;
}
return Err(err);
}

Expand Down Expand Up @@ -1375,33 +1425,77 @@ impl PluginInstaller {
.marketplace_policy
.validate_url("model url", url, registry_origin.as_ref())
.await?;
let (response, final_url) = validated_get_response(
&self.download_client,
&self.marketplace_policy,
"model url",
&parsed,
registry_origin.as_ref(),
bearer_token,
)
.await?;
let response = response
.error_for_status()
.with_context(|| format!("Model download failed for {final_url}"))?;
let total_bytes = response.content_length().or(expected_size);
let mut stream = response.bytes_stream();

let temp_path = target_path.with_extension(format!("download-{}", Uuid::new_v4()));
let temp_path = target_path.with_extension("download-part");
let mut hash_mismatch = false;
let download_result: Result<(), InstallError> = async {
let mut file = tokio::fs::File::create(&temp_path).await.with_context(|| {
format!("Failed to create model file {temp_path}", temp_path = temp_path.display())
})?;
// Check for existing partial download to enable resume.
let existing_len = tokio::fs::metadata(&temp_path).await.ok().map(|m| m.len());
let resume_from = existing_len.filter(|&len| len > 0);

let mut hasher = expected_sha256.map(|_| Sha256::new());
let mut bytes_done = 0u64;
let (response, final_url) = validated_get_response(
&self.download_client,
&self.marketplace_policy,
"model url",
&parsed,
registry_origin.as_ref(),
bearer_token,
resume_from,
)
.await?;
let response = response
.error_for_status()
.with_context(|| format!("Model download failed for {final_url}"))?;

let is_partial = response.status() == reqwest::StatusCode::PARTIAL_CONTENT;
let total_bytes = if is_partial {
response.content_length().map(|cl| cl + resume_from.unwrap_or(0))
} else {
response.content_length()
}
.or(expected_size);
let mut stream = response.bytes_stream();

let (mut file, mut hasher, mut bytes_done) = if is_partial {
let offset = resume_from.unwrap_or(0);
// Re-hash existing bytes for SHA256 continuity.
let existing_data = tokio::fs::read(&temp_path).await.with_context(|| {
format!(
"Failed to read existing temp file {temp_path}",
temp_path = temp_path.display()
)
})?;
let h = expected_sha256.map(|_| {
let mut hasher = Sha256::new();
hasher.update(&existing_data);
hasher
});
let file = tokio::fs::OpenOptions::new()
.append(true)
.open(&temp_path)
.await
.with_context(|| {
format!(
"Failed to open model file for append {temp_path}",
temp_path = temp_path.display()
)
})?;
(file, h, offset)
} else {
let file = tokio::fs::File::create(&temp_path).await.with_context(|| {
format!(
"Failed to create model file {temp_path}",
temp_path = temp_path.display()
)
})?;
let h = expected_sha256.map(|_| Sha256::new());
(file, h, 0u64)
};

while let Some(chunk) = stream.next().await {
let chunk = chunk.with_context(|| "Failed to read model download stream")?;
if cancel.is_cancelled() {
let _ = file.flush().await;
return Err(InstallError::Cancelled);
}
file.write_all(&chunk).await.with_context(|| {
Expand Down Expand Up @@ -1440,6 +1534,7 @@ impl PluginInstaller {
if let (Some(expected_hash), Some(hasher)) = (expected_sha256, hasher) {
let actual_hash = to_hex(&hasher.finalize());
if !actual_hash.eq_ignore_ascii_case(expected_hash) {
hash_mismatch = true;
return Err(anyhow!(
"Model hash mismatch: expected {expected_hash}, got {actual_hash}"
)
Expand All @@ -1452,7 +1547,9 @@ impl PluginInstaller {
.await;

if let Err(err) = download_result {
let _ = tokio::fs::remove_file(&temp_path).await;
if hash_mismatch {
let _ = tokio::fs::remove_file(&temp_path).await;
}
return Err(err);
}

Expand Down
7 changes: 6 additions & 1 deletion apps/skit/src/marketplace_security.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ pub async fn validated_get_response(
start: &Url,
registry_origin: Option<&OriginKey>,
bearer_token: Option<&str>,
resume_from_byte: Option<u64>,
) -> Result<(Response, Url)> {
let mut current = start.clone();
let token_origin = if bearer_token.is_some() { Some(origin_key(start)?) } else { None };
Expand All @@ -113,6 +114,9 @@ pub async fn validated_get_response(
request = request.bearer_auth(token);
}
}
if let Some(offset) = resume_from_byte {
request = request.header(reqwest::header::RANGE, format!("bytes={offset}-"));
}
let response =
request.send().await.with_context(|| format!("Failed to fetch {label} {current}"))?;
if response.status().is_redirection() {
Expand Down Expand Up @@ -158,7 +162,8 @@ pub async fn validated_get_bytes(
bearer_token: Option<&str>,
) -> Result<Bytes> {
let (response, final_url) =
validated_get_response(client, policy, label, start, registry_origin, bearer_token).await?;
validated_get_response(client, policy, label, start, registry_origin, bearer_token, None)
.await?;
let response = response
.error_for_status()
.with_context(|| format!("{label} request failed for {final_url}"))?;
Expand Down
2 changes: 1 addition & 1 deletion dist/registry/plugins/supertonic/0.1.0/manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
"expected_size_bytes": 244451376,
"license": "MIT",
"license_url": "https://github.com/supertone-inc/supertonic/blob/main/LICENSE",
"sha256": "29e18bfdcbfbdd8bef25204b19be21d13fda36d4e66fe31c74e2a01dad457cec"
"sha256": "3c3ba6326cd6c8ee48d4c7322322d1f1f4ebf188bf4a7d80fc218babca186f41"
}
]
}
Loading
Loading