Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add per-host limit to avoid triggering anti-DDoS protection #205

Merged
merged 3 commits into from
Jan 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions rust/cmsis-cffi/src/pack_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ pub unsafe extern "C" fn update_pdsc_poll(ptr: *mut UpdatePoll) -> bool {
}
}
};
mem::replace(boxed.borrow_mut(), next_state);
let _ = mem::replace(boxed.borrow_mut(), next_state);
ret
})
} else {
Expand Down Expand Up @@ -188,7 +188,7 @@ pub unsafe extern "C" fn update_pdsc_result(ptr: *mut UpdatePoll) -> *mut Update
UpdatePoll::Drained => (None, UpdatePoll::Drained),
UpdatePoll::Running(cont) => (None, UpdatePoll::Running(cont))
};
mem::replace(boxed.borrow_mut(), next_state);
let _ = mem::replace(boxed.borrow_mut(), next_state);
match ret {
Some(Ok(inner)) => Box::into_raw(Box::new(inner)),
Some(Err(inner)) => {
Expand Down
4 changes: 2 additions & 2 deletions rust/cmsis-pack/src/pdsc/component.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ impl FromElem for FileRef {
}
}

#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize)]
pub struct ComponentBuilder {
pub vendor: Option<String>,
pub class: Option<String>,
Expand Down Expand Up @@ -155,7 +155,7 @@ impl FromElem for ComponentBuilder {
}
}

#[derive(Debug)]
#[derive(Debug, Serialize)]
pub struct Bundle {
name: String,
class: String,
Expand Down
224 changes: 140 additions & 84 deletions rust/cmsis-pack/src/update/download.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::borrow::Borrow;
use std::fs::{create_dir_all, rename, OpenOptions};
use std::io::Write;
use std::path::{Path, PathBuf};
Expand All @@ -8,18 +7,18 @@ use futures::prelude::*;
use futures::stream::futures_unordered::FuturesUnordered;
use reqwest::{redirect, Url};
use reqwest::{Client, ClientBuilder, Response};
use tokio::task::JoinHandle;
use tokio::time::{sleep, Duration};

use crate::pack_index::{PdscRef, Pidx, Vidx};
use crate::pack_index::{PdscRef, Vidx};
use crate::pdsc::Package;
use crate::utils::parse::FromElem;
use bytes::Bytes;
use futures::StreamExt;
use std::collections::HashMap;

fn parse_vidx(body: Bytes) -> Result<Vidx, Error> {
let string = String::from_utf8_lossy(body.as_ref());
Vidx::from_string(string.borrow())
}
const CONCURRENCY : usize = 32;
const HOST_LIMIT : usize = 6;
const MAX_RETRIES : usize = 3;

fn pdsc_url(pdsc: &mut PdscRef) -> String {
if pdsc.url.ends_with('/') {
Expand Down Expand Up @@ -104,6 +103,42 @@ impl<'a> IntoDownload for &'a Package {
}
}


async fn save_response(response: Response, dest: PathBuf) -> Result<(usize, PathBuf), Error> {
let temp = dest.with_extension("part");
let file = OpenOptions::new().write(true).create(true).open(&temp);

let mut file = match file {
Err(err) => return Err(anyhow!(err.to_string())),
Ok(f) => f,
};

let mut fsize: usize = 0;
let mut stream = response.bytes_stream();
while let Some(chunk) = stream.next().await {
match chunk {
Ok(bytes) => {
fsize += bytes.len();

if let Err(err) = file.write_all(bytes.as_ref()) {
let _ = std::fs::remove_file(temp);
return Err(anyhow!(err.to_string()));
}
}
Err(err) => {
let _ = std::fs::remove_file(temp);
return Err(anyhow!(err.to_string()));
}
}
}
if let Err(err) = rename(&temp, &dest) {
let _ = std::fs::remove_file(temp);
return Err(anyhow!(err.to_string()));
}
Ok((fsize, dest))
}


pub trait DownloadProgress: Send {
fn size(&self, files: usize);
fn progress(&self, bytes: usize);
Expand Down Expand Up @@ -145,130 +180,150 @@ where
})
}

async fn save_response(&'a self, response: Response, dest: PathBuf) -> Result<PathBuf, Error> {
let temp = dest.with_extension("part");
let file = OpenOptions::new().write(true).create(true).open(&temp);

let mut file = match file {
Err(err) => return Err(anyhow!(err.to_string())),
Ok(f) => f,
};

let mut stream = response.bytes_stream();
while let Some(chunk) = stream.next().await {
match chunk {
Ok(bytes) => {
self.prog.progress(bytes.len());

if let Err(err) = file.write_all(bytes.as_ref()) {
std::fs::remove_file(temp);
return Err(anyhow!(err.to_string()));
}
}
Err(err) => {
std::fs::remove_file(temp);
return Err(anyhow!(err.to_string()));
}
}
}
if let Err(err) = rename(&temp, &dest) {
std::fs::remove_file(temp);
return Err(anyhow!(err.to_string()));
}
Ok(dest)
}

async fn download_file(&'a self, source: Url, dest: PathBuf) -> Result<PathBuf, Error> {
if dest.exists() {
return Ok(dest);
}
dest.parent().map(create_dir_all);
let res = self.client.get(source).send().await;

match res {
Ok(r) => {
let rc = r.status().as_u16();
if rc >= 400 {
Err(anyhow!(format!("Response code in invalid range: {}", rc).to_string()))
} else {
self.save_response(r, dest).await
}
},
Err(err) => Err(anyhow!(err.to_string())),
}
}

pub async fn download_iterator<I>(&'a self, iter: I) -> Vec<PathBuf>
where
I: IntoIterator + 'a,
<I as IntoIterator>::Item: IntoDownload,
{
let to_dl: Vec<(Url, PathBuf)> = iter
let mut to_dl: Vec<(Url, String, PathBuf)> = iter
.into_iter()
.filter_map(|i| {
if let Ok(uri) = i.into_uri() {
Some((uri, i.into_fd(self.config)))
let c = uri.clone();
if let Some(host) = c.host_str() {
Some((uri, host.to_string(), i.into_fd(self.config)))
} else {
None
}
} else {
None
}
})
.collect();
self.prog.size(to_dl.len());

let v = futures::stream::iter(to_dl.into_iter().map(|from| async move {
let r = self.download_file(from.0.clone(), from.1.clone()).await;
self.prog.complete();
match r {
Ok(p) => Some(p),
Err(e) => {
log::error!("download of {:?} failed: {}", from.0.clone(), e);
None
let mut hosts: HashMap<String, usize> = HashMap::new();
let mut results : Vec<PathBuf> = vec![];
let mut started : usize = 0;
let mut handles: Vec<JoinHandle<(String, usize, Option<PathBuf>)>> = vec![];

while !to_dl.is_empty() || !handles.is_empty() {
let mut wait_list: Vec<(Url, String, PathBuf)> = vec![];
let mut next: Vec<JoinHandle<(String, usize, Option<PathBuf>)>> = vec![];

while let Some(handle) = handles.pop() {
if handle.is_finished() {
let r = handle.await.unwrap();
*hosts.entry(r.0).or_insert(1) -= 1;
started -= 1;
self.prog.progress(r.1);
self.prog.complete();
if let Some(path) = r.2 {
results.push(path);
}
} else {
next.push(handle);
}
}
}))
.buffer_unordered(32)
.collect::<Vec<Option<PathBuf>>>()
.await;
v.into_iter().filter_map(|x| x).collect::<Vec<PathBuf>>()

while ! to_dl.is_empty() && started < CONCURRENCY {
let from = to_dl.pop().unwrap();
let host = from.1.clone();
let entry = hosts.entry(host).or_insert(0);
if *entry >= HOST_LIMIT {
wait_list.push(from);
} else {
let source = from.0.clone();
let host = from.1.clone();
let dest = from.2.clone();
if dest.exists() {
results.push(dest);
} else {
let client = self.client.clone();
let handle: JoinHandle<(String, usize, Option<PathBuf>)> = tokio::spawn(async move {
dest.parent().map(create_dir_all);
let res = client.get(source.clone()).send().await;
let res: Result<(usize, PathBuf), Error> = match res {
Ok(r) => {
let rc = r.status().as_u16();
if rc >= 400 {
Err(anyhow!(format!("Response code in invalid range: {}", rc).to_string()))
} else {
save_response(r, dest).await
}
},
Err(err) => {
Err(anyhow!(err.to_string()))
},
};
match res {
Ok(r) => {
(host, r.0, Some(r.1))
},
Err(err) => {
log::error!("download of {:?} failed: {}", source, err);
(host, 0, None)
}
}
});
handles.push(handle);
started += 1;
*entry += 1;
}
}
}

for w in wait_list {
to_dl.push(w);
}

for w in next {
handles.push(w);
}
sleep(Duration::from_millis(100)).await;
}

results
}

pub(crate) async fn update_vidx<I>(&'a self, list: I) -> Result<Vec<PathBuf>, Error>
where
I: IntoIterator + 'a,
<I as IntoIterator>::Item: Into<String>,
{
let mut downloaded: HashMap<String, i8> = HashMap::new();
let mut downloaded: HashMap<String, bool> = HashMap::new();
let mut failures: HashMap<String, usize> = HashMap::new();
let mut urls: Vec<String> = list.into_iter().map(|x| x.into()).collect();
let mut vidxs: Vec<Vidx> = Vec::new();
loop {
// Remove from list all duplicate URLs and those already downloaded
urls.dedup();
urls = urls
.into_iter()
.filter(|u| downloaded.get(u).unwrap_or(&0) < &1)
.filter(|u| !*downloaded.get(u).unwrap_or(&false))
.collect();

// TODO: Make this section asynchronous
let mut next: Vec<String> = Vec::new();
for url in &urls {
for url in urls {
match self.download_vidx(url.clone()).await {
Ok(t) => {
log::info!("Downloaded {}", url);
downloaded.insert(url.to_string(), 1);
downloaded.insert(url, true);
for v in &t.vendor_index {
let u = format!("{}{}.pidx", v.url, v.vendor);
if !downloaded.contains_key(&u) {
downloaded.insert(u.to_string(), 0);
downloaded.insert(u.clone(), false);
next.push(u);
}
}
vidxs.push(t);
}
Err(_err) => {
let r = downloaded.get(&url.to_string()).unwrap_or(&0);
if r > &-3 {
next.push(url.clone());
downloaded.insert(url.to_string(), r - &1);
let tries = failures.entry(url.clone()).or_insert(0);
*tries += 1;
if *tries < MAX_RETRIES {
next.push(url);
}
}
}
Expand Down Expand Up @@ -301,6 +356,7 @@ where
Vidx::from_string(req.text().await?.as_str())
}

#[allow(dead_code)]
pub(crate) fn download_vidx_list<I>(&'a self, list: I) -> impl Stream<Item = Option<Vidx>> + 'a
where
I: IntoIterator + 'a,
Expand Down