Skip to content

Commit 31c98b4

Browse files
authored
admin/render_readmes: Use async reqwest client and diesel-async queries (#9892)
1 parent 6e97f9a commit 31c98b4

File tree

2 files changed

+95
-98
lines changed

2 files changed

+95
-98
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ postgres-native-tls = "=0.5.0"
9797
prometheus = { version = "=0.13.4", default-features = false }
9898
quick-xml = "=0.37.0"
9999
rand = "=0.8.5"
100-
reqwest = { version = "=0.12.9", features = ["blocking", "gzip", "json"] }
100+
reqwest = { version = "=0.12.9", features = ["gzip", "json"] }
101101
rss = { version = "=2.0.9", default-features = false, features = ["atom"] }
102102
secrecy = "=0.10.3"
103103
semver = { version = "=1.0.23", features = ["serde"] }

src/bin/crates-admin/render_readmes.rs

Lines changed: 94 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use crates_io::{
55
schema::{crates, readme_renderings, versions},
66
};
77
use std::path::PathBuf;
8-
use std::{io::Read, path::Path, sync::Arc, thread};
8+
use std::{io::Read, path::Path, sync::Arc};
99

1010
use chrono::{NaiveDateTime, Utc};
1111
use crates_io::storage::Storage;
@@ -14,12 +14,11 @@ use crates_io::util::diesel::prelude::*;
1414
use crates_io_markdown::text_to_html;
1515
use crates_io_tarball::{Manifest, StringOrBool};
1616
use diesel_async::async_connection_wrapper::AsyncConnectionWrapper;
17-
use diesel_async::AsyncPgConnection;
17+
use diesel_async::{AsyncPgConnection, RunQueryDsl};
1818
use flate2::read::GzDecoder;
19-
use reqwest::{blocking::Client, header};
19+
use reqwest::{header, Client};
2020
use std::str::FromStr;
2121
use tar::{self, Archive};
22-
use tokio::runtime::Handle;
2322

2423
const USER_AGENT: &str = "crates-admin";
2524

@@ -50,114 +49,107 @@ pub async fn run(opts: Opts) -> anyhow::Result<()> {
5049
.context("Failed to connect to the database")?;
5150

5251
let mut conn = AsyncConnectionWrapper::<AsyncPgConnection>::from(conn);
53-
spawn_blocking(move || {
54-
use diesel::RunQueryDsl;
5552

56-
let storage = Arc::new(Storage::from_environment());
53+
let storage = Arc::new(Storage::from_environment());
5754

58-
let start_time = Utc::now();
55+
let start_time = Utc::now();
5956

60-
let older_than = if let Some(ref time) = opts.older_than {
61-
NaiveDateTime::parse_from_str(time, "%Y-%m-%d %H:%M:%S")
62-
.context("Could not parse --older-than argument as a time")?
63-
} else {
64-
start_time.naive_utc()
65-
};
57+
let older_than = if let Some(ref time) = opts.older_than {
58+
NaiveDateTime::parse_from_str(time, "%Y-%m-%d %H:%M:%S")
59+
.context("Could not parse --older-than argument as a time")?
60+
} else {
61+
start_time.naive_utc()
62+
};
6663

67-
println!("Start time: {start_time}");
68-
println!("Rendering readmes older than: {older_than}");
64+
println!("Start time: {start_time}");
65+
println!("Rendering readmes older than: {older_than}");
66+
67+
let mut query = versions::table
68+
.inner_join(crates::table)
69+
.left_outer_join(readme_renderings::table)
70+
.filter(
71+
readme_renderings::rendered_at
72+
.lt(older_than)
73+
.or(readme_renderings::version_id.is_null()),
74+
)
75+
.select(versions::id)
76+
.into_boxed();
77+
78+
if let Some(crate_name) = opts.crate_name {
79+
println!("Rendering readmes for {crate_name}");
80+
query = query.filter(crates::name.eq(crate_name));
81+
}
6982

70-
let mut query = versions::table
71-
.inner_join(crates::table)
72-
.left_outer_join(readme_renderings::table)
73-
.filter(
74-
readme_renderings::rendered_at
75-
.lt(older_than)
76-
.or(readme_renderings::version_id.is_null()),
77-
)
78-
.select(versions::id)
79-
.into_boxed();
83+
let version_ids: Vec<i32> = query
84+
.load(&mut conn)
85+
.await
86+
.context("error loading version ids")?;
8087

81-
if let Some(crate_name) = opts.crate_name {
82-
println!("Rendering readmes for {crate_name}");
83-
query = query.filter(crates::name.eq(crate_name));
84-
}
88+
let total_versions = version_ids.len();
89+
println!("Rendering {total_versions} versions");
8590

86-
let version_ids: Vec<i32> = query.load(&mut conn).context("error loading version ids")?;
91+
let page_size = opts.page_size;
8792

88-
let total_versions = version_ids.len();
89-
println!("Rendering {total_versions} versions");
93+
let total_pages = total_versions / page_size;
94+
let total_pages = if total_versions % page_size == 0 {
95+
total_pages
96+
} else {
97+
total_pages + 1
98+
};
9099

91-
let page_size = opts.page_size;
100+
let client = Client::new();
92101

93-
let total_pages = total_versions / page_size;
94-
let total_pages = if total_versions % page_size == 0 {
102+
for (page_num, version_ids_chunk) in version_ids.chunks(page_size).enumerate() {
103+
println!(
104+
"= Page {} of {} ==================================",
105+
page_num + 1,
95106
total_pages
96-
} else {
97-
total_pages + 1
98-
};
107+
);
99108

100-
let client = Client::new();
101-
102-
for (page_num, version_ids_chunk) in version_ids.chunks(page_size).enumerate() {
103-
println!(
104-
"= Page {} of {} ==================================",
105-
page_num + 1,
106-
total_pages
107-
);
108-
109-
let versions: Vec<(Version, String)> = versions::table
110-
.inner_join(crates::table)
111-
.filter(versions::id.eq_any(version_ids_chunk))
112-
.select((Version::as_select(), crates::name))
113-
.load(&mut conn)
114-
.context("error loading versions")?;
115-
116-
let mut tasks = Vec::with_capacity(page_size);
117-
for (version, krate_name) in versions {
118-
Handle::current()
119-
.block_on(Version::record_readme_rendering(version.id, &mut conn))
120-
.context("Couldn't record rendering time")?;
121-
122-
let client = client.clone();
123-
let storage = storage.clone();
124-
let handle = thread::spawn::<_, anyhow::Result<()>>(move || {
125-
println!("[{}-{}] Rendering README...", krate_name, version.num);
126-
let readme = get_readme(&storage, &client, &version, &krate_name)?;
127-
if !readme.is_empty() {
128-
let rt = tokio::runtime::Builder::new_current_thread()
129-
.enable_all()
130-
.build()
131-
.context("Failed to initialize tokio runtime")?;
132-
133-
rt.block_on(storage.upload_readme(
134-
&krate_name,
135-
&version.num,
136-
readme.into(),
137-
))
109+
let versions: Vec<(Version, String)> = versions::table
110+
.inner_join(crates::table)
111+
.filter(versions::id.eq_any(version_ids_chunk))
112+
.select((Version::as_select(), crates::name))
113+
.load(&mut conn)
114+
.await
115+
.context("error loading versions")?;
116+
117+
let mut tasks = Vec::with_capacity(page_size);
118+
for (version, krate_name) in versions {
119+
Version::record_readme_rendering(version.id, &mut conn)
120+
.await
121+
.context("Couldn't record rendering time")?;
122+
123+
let client = client.clone();
124+
let storage = storage.clone();
125+
let handle = tokio::spawn(async move {
126+
println!("[{}-{}] Rendering README...", krate_name, version.num);
127+
let readme = get_readme(&storage, &client, &version, &krate_name).await?;
128+
if !readme.is_empty() {
129+
storage
130+
.upload_readme(&krate_name, &version.num, readme.into())
131+
.await
138132
.context("Failed to upload rendered README file to S3")?;
139-
}
140-
141-
Ok(())
142-
});
143-
tasks.push(handle);
144-
}
145-
for handle in tasks {
146-
match handle.join() {
147-
Err(err) => println!("Thread panicked: {err:?}"),
148-
Ok(Err(err)) => println!("Thread failed: {err:?}"),
149-
_ => {}
150133
}
134+
135+
Ok::<_, anyhow::Error>(())
136+
});
137+
tasks.push(handle);
138+
}
139+
for handle in tasks {
140+
match handle.await {
141+
Err(err) => println!("Task panicked: {err:?}"),
142+
Ok(Err(err)) => println!("Task failed: {err:?}"),
143+
_ => {}
151144
}
152145
}
146+
}
153147

154-
Ok(())
155-
})
156-
.await
148+
Ok(())
157149
}
158150

159151
/// Renders the readme of an uploaded crate version.
160-
fn get_readme(
152+
async fn get_readme(
161153
storage: &Storage,
162154
client: &Client,
163155
version: &Version,
@@ -173,18 +165,23 @@ fn get_readme(
173165
header::HeaderValue::from_static(USER_AGENT),
174166
);
175167
let request = client.get(location).headers(extra_headers);
176-
let response = request.send().context("Failed to fetch crate")?;
168+
let response = request.send().await.context("Failed to fetch crate")?;
177169

178170
if !response.status().is_success() {
179171
return Err(anyhow!(
180172
"Failed to get a 200 response: {}",
181-
response.text()?
173+
response.text().await?
182174
));
183175
}
184176

185-
let reader = GzDecoder::new(response);
186-
let archive = Archive::new(reader);
187-
render_pkg_readme(archive, &pkg_name)
177+
let body = response.bytes().await?;
178+
179+
spawn_blocking(move || {
180+
let reader = GzDecoder::new(&*body);
181+
let archive = Archive::new(reader);
182+
render_pkg_readme(archive, &pkg_name)
183+
})
184+
.await
188185
}
189186

190187
fn render_pkg_readme<R: Read>(mut archive: Archive<R>, pkg_name: &str) -> anyhow::Result<String> {

0 commit comments

Comments
 (0)