Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ postgres-native-tls = "=0.5.0"
prometheus = { version = "=0.13.4", default-features = false }
quick-xml = "=0.37.0"
rand = "=0.8.5"
reqwest = { version = "=0.12.9", features = ["blocking", "gzip", "json"] }
reqwest = { version = "=0.12.9", features = ["gzip", "json"] }
rss = { version = "=2.0.9", default-features = false, features = ["atom"] }
secrecy = "=0.10.3"
semver = { version = "=1.0.23", features = ["serde"] }
Expand Down
191 changes: 94 additions & 97 deletions src/bin/crates-admin/render_readmes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
schema::{crates, readme_renderings, versions},
};
use std::path::PathBuf;
use std::{io::Read, path::Path, sync::Arc, thread};
use std::{io::Read, path::Path, sync::Arc};

use chrono::{NaiveDateTime, Utc};
use crates_io::storage::Storage;
Expand All @@ -14,12 +14,11 @@
use crates_io_markdown::text_to_html;
use crates_io_tarball::{Manifest, StringOrBool};
use diesel_async::async_connection_wrapper::AsyncConnectionWrapper;
use diesel_async::AsyncPgConnection;
use diesel_async::{AsyncPgConnection, RunQueryDsl};
use flate2::read::GzDecoder;
use reqwest::{blocking::Client, header};
use reqwest::{header, Client};
use std::str::FromStr;
use tar::{self, Archive};
use tokio::runtime::Handle;

const USER_AGENT: &str = "crates-admin";

Expand Down Expand Up @@ -50,114 +49,107 @@
.context("Failed to connect to the database")?;

let mut conn = AsyncConnectionWrapper::<AsyncPgConnection>::from(conn);
spawn_blocking(move || {
use diesel::RunQueryDsl;

let storage = Arc::new(Storage::from_environment());
let storage = Arc::new(Storage::from_environment());

Check warning on line 53 in src/bin/crates-admin/render_readmes.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/crates-admin/render_readmes.rs#L53

Added line #L53 was not covered by tests

let start_time = Utc::now();
let start_time = Utc::now();

Check warning on line 55 in src/bin/crates-admin/render_readmes.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/crates-admin/render_readmes.rs#L55

Added line #L55 was not covered by tests

let older_than = if let Some(ref time) = opts.older_than {
NaiveDateTime::parse_from_str(time, "%Y-%m-%d %H:%M:%S")
.context("Could not parse --older-than argument as a time")?
} else {
start_time.naive_utc()
};
let older_than = if let Some(ref time) = opts.older_than {
NaiveDateTime::parse_from_str(time, "%Y-%m-%d %H:%M:%S")
.context("Could not parse --older-than argument as a time")?

Check warning on line 59 in src/bin/crates-admin/render_readmes.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/crates-admin/render_readmes.rs#L57-L59

Added lines #L57 - L59 were not covered by tests
} else {
start_time.naive_utc()

Check warning on line 61 in src/bin/crates-admin/render_readmes.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/crates-admin/render_readmes.rs#L61

Added line #L61 was not covered by tests
};

println!("Start time: {start_time}");
println!("Rendering readmes older than: {older_than}");
println!("Start time: {start_time}");
println!("Rendering readmes older than: {older_than}");

let mut query = versions::table
.inner_join(crates::table)
.left_outer_join(readme_renderings::table)
.filter(
readme_renderings::rendered_at
.lt(older_than)
.or(readme_renderings::version_id.is_null()),
)
.select(versions::id)
.into_boxed();

Check warning on line 76 in src/bin/crates-admin/render_readmes.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/crates-admin/render_readmes.rs#L64-L76

Added lines #L64 - L76 were not covered by tests

if let Some(crate_name) = opts.crate_name {
println!("Rendering readmes for {crate_name}");
query = query.filter(crates::name.eq(crate_name));
}

Check warning on line 81 in src/bin/crates-admin/render_readmes.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/crates-admin/render_readmes.rs#L78-L81

Added lines #L78 - L81 were not covered by tests

let mut query = versions::table
.inner_join(crates::table)
.left_outer_join(readme_renderings::table)
.filter(
readme_renderings::rendered_at
.lt(older_than)
.or(readme_renderings::version_id.is_null()),
)
.select(versions::id)
.into_boxed();
let version_ids: Vec<i32> = query
.load(&mut conn)
.await
.context("error loading version ids")?;

Check warning on line 86 in src/bin/crates-admin/render_readmes.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/crates-admin/render_readmes.rs#L83-L86

Added lines #L83 - L86 were not covered by tests

if let Some(crate_name) = opts.crate_name {
println!("Rendering readmes for {crate_name}");
query = query.filter(crates::name.eq(crate_name));
}
let total_versions = version_ids.len();
println!("Rendering {total_versions} versions");

Check warning on line 89 in src/bin/crates-admin/render_readmes.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/crates-admin/render_readmes.rs#L88-L89

Added lines #L88 - L89 were not covered by tests

let version_ids: Vec<i32> = query.load(&mut conn).context("error loading version ids")?;
let page_size = opts.page_size;

Check warning on line 91 in src/bin/crates-admin/render_readmes.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/crates-admin/render_readmes.rs#L91

Added line #L91 was not covered by tests

let total_versions = version_ids.len();
println!("Rendering {total_versions} versions");
let total_pages = total_versions / page_size;
let total_pages = if total_versions % page_size == 0 {
total_pages

Check warning on line 95 in src/bin/crates-admin/render_readmes.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/crates-admin/render_readmes.rs#L93-L95

Added lines #L93 - L95 were not covered by tests
} else {
total_pages + 1

Check warning on line 97 in src/bin/crates-admin/render_readmes.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/crates-admin/render_readmes.rs#L97

Added line #L97 was not covered by tests
};

let page_size = opts.page_size;
let client = Client::new();

Check warning on line 100 in src/bin/crates-admin/render_readmes.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/crates-admin/render_readmes.rs#L100

Added line #L100 was not covered by tests

let total_pages = total_versions / page_size;
let total_pages = if total_versions % page_size == 0 {
for (page_num, version_ids_chunk) in version_ids.chunks(page_size).enumerate() {
println!(
"= Page {} of {} ==================================",
page_num + 1,

Check warning on line 105 in src/bin/crates-admin/render_readmes.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/crates-admin/render_readmes.rs#L102-L105

Added lines #L102 - L105 were not covered by tests
total_pages
} else {
total_pages + 1
};
);

Check warning on line 107 in src/bin/crates-admin/render_readmes.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/crates-admin/render_readmes.rs#L107

Added line #L107 was not covered by tests

let client = Client::new();

for (page_num, version_ids_chunk) in version_ids.chunks(page_size).enumerate() {
println!(
"= Page {} of {} ==================================",
page_num + 1,
total_pages
);

let versions: Vec<(Version, String)> = versions::table
.inner_join(crates::table)
.filter(versions::id.eq_any(version_ids_chunk))
.select((Version::as_select(), crates::name))
.load(&mut conn)
.context("error loading versions")?;

let mut tasks = Vec::with_capacity(page_size);
for (version, krate_name) in versions {
Handle::current()
.block_on(Version::record_readme_rendering(version.id, &mut conn))
.context("Couldn't record rendering time")?;

let client = client.clone();
let storage = storage.clone();
let handle = thread::spawn::<_, anyhow::Result<()>>(move || {
println!("[{}-{}] Rendering README...", krate_name, version.num);
let readme = get_readme(&storage, &client, &version, &krate_name)?;
if !readme.is_empty() {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.context("Failed to initialize tokio runtime")?;

rt.block_on(storage.upload_readme(
&krate_name,
&version.num,
readme.into(),
))
let versions: Vec<(Version, String)> = versions::table
.inner_join(crates::table)
.filter(versions::id.eq_any(version_ids_chunk))
.select((Version::as_select(), crates::name))
.load(&mut conn)
.await
.context("error loading versions")?;

Check warning on line 115 in src/bin/crates-admin/render_readmes.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/crates-admin/render_readmes.rs#L109-L115

Added lines #L109 - L115 were not covered by tests

let mut tasks = Vec::with_capacity(page_size);
for (version, krate_name) in versions {
Version::record_readme_rendering(version.id, &mut conn)
.await
.context("Couldn't record rendering time")?;

Check warning on line 121 in src/bin/crates-admin/render_readmes.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/crates-admin/render_readmes.rs#L117-L121

Added lines #L117 - L121 were not covered by tests

let client = client.clone();
let storage = storage.clone();
let handle = tokio::spawn(async move {
println!("[{}-{}] Rendering README...", krate_name, version.num);
let readme = get_readme(&storage, &client, &version, &krate_name).await?;
if !readme.is_empty() {
storage
.upload_readme(&krate_name, &version.num, readme.into())
.await

Check warning on line 131 in src/bin/crates-admin/render_readmes.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/crates-admin/render_readmes.rs#L123-L131

Added lines #L123 - L131 were not covered by tests
.context("Failed to upload rendered README file to S3")?;
}

Ok(())
});
tasks.push(handle);
}
for handle in tasks {
match handle.join() {
Err(err) => println!("Thread panicked: {err:?}"),
Ok(Err(err)) => println!("Thread failed: {err:?}"),
_ => {}
}

Ok::<_, anyhow::Error>(())
});
tasks.push(handle);
}
for handle in tasks {
match handle.await {
Err(err) => println!("Task panicked: {err:?}"),
Ok(Err(err)) => println!("Task failed: {err:?}"),
_ => {}

Check warning on line 143 in src/bin/crates-admin/render_readmes.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/crates-admin/render_readmes.rs#L135-L143

Added lines #L135 - L143 were not covered by tests
}
}
}

Ok(())
})
.await
Ok(())

Check warning on line 148 in src/bin/crates-admin/render_readmes.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/crates-admin/render_readmes.rs#L148

Added line #L148 was not covered by tests
}

/// Renders the readme of an uploaded crate version.
fn get_readme(
async fn get_readme(

Check warning on line 152 in src/bin/crates-admin/render_readmes.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/crates-admin/render_readmes.rs#L152

Added line #L152 was not covered by tests
storage: &Storage,
client: &Client,
version: &Version,
Expand All @@ -173,18 +165,23 @@
header::HeaderValue::from_static(USER_AGENT),
);
let request = client.get(location).headers(extra_headers);
let response = request.send().context("Failed to fetch crate")?;
let response = request.send().await.context("Failed to fetch crate")?;

Check warning on line 168 in src/bin/crates-admin/render_readmes.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/crates-admin/render_readmes.rs#L168

Added line #L168 was not covered by tests

if !response.status().is_success() {
return Err(anyhow!(
"Failed to get a 200 response: {}",
response.text()?
response.text().await?

Check warning on line 173 in src/bin/crates-admin/render_readmes.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/crates-admin/render_readmes.rs#L173

Added line #L173 was not covered by tests
));
}

let reader = GzDecoder::new(response);
let archive = Archive::new(reader);
render_pkg_readme(archive, &pkg_name)
let body = response.bytes().await?;

Check warning on line 177 in src/bin/crates-admin/render_readmes.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/crates-admin/render_readmes.rs#L177

Added line #L177 was not covered by tests

spawn_blocking(move || {
let reader = GzDecoder::new(&*body);
let archive = Archive::new(reader);
render_pkg_readme(archive, &pkg_name)
})
.await

Check warning on line 184 in src/bin/crates-admin/render_readmes.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/crates-admin/render_readmes.rs#L179-L184

Added lines #L179 - L184 were not covered by tests
}

fn render_pkg_readme<R: Read>(mut archive: Archive<R>, pkg_name: &str) -> anyhow::Result<String> {
Expand Down