From d0caa01b246e2659c819c91b41b4cf86a90ca150 Mon Sep 17 00:00:00 2001 From: Sean Griffin Date: Wed, 22 Mar 2017 10:11:44 -0400 Subject: [PATCH 1/2] Atomically update download counts in `/crates/downloads` The current process of updating the download counters does some fairly unneccessary punting to a background job. It's possible that this was a premature performance optimization, but based on the comment which was there before, I suspect that it was due to a misunderstanding of how locks work in PG and done this way in an attempt to make it less "racy". The comment mentioned that the update was non-atomic and racy. Neither of those statements are in any way true, which is why we use an RBDMS in the first place. Each transaction will lock the rows it updates until the transaction commits or rolls back, and any other transactions will block until then. With this change, all hits to `/download` will block each other on the final query, when they try to update the `metadata` table. However, as this is the final query before the transaction is committed, this shouldn't be a bottleneck until we reach millions of downloads per minute (at which point a periodic backround worker which does `SELECT sum(downloads) FROM crates` is probably simpler) --- src/krate.rs | 70 ++++++++++++++++++++++++++++------------------------ 1 file changed, 38 insertions(+), 32 deletions(-) diff --git a/src/krate.rs b/src/krate.rs index d7dae3f7455..1c51acc4c85 100644 --- a/src/krate.rs +++ b/src/krate.rs @@ -1034,40 +1034,46 @@ pub fn download(req: &mut Request) -> CargoResult { fn increment_download_counts(req: &Request, crate_name: &str, version: &str) -> CargoResult<()> { let tx = req.tx()?; - let stmt = tx.prepare("SELECT versions.id as version_id - FROM crates - INNER JOIN versions ON - crates.id = versions.crate_id - WHERE canon_crate_name(crates.name) = - canon_crate_name($1) - AND versions.num = $2 - LIMIT 1")?; - let rows = stmt.query(&[&crate_name, &version])?; + + let rows = tx.query("UPDATE crates SET downloads = downloads + 1 + WHERE canon_crate_name(name) = canon_crate_name($1) + RETURNING id", &[&crate_name])?; let row = rows.iter().next().chain_error(|| { - human("crate or version not found") + human("crate not found") })?; - let version_id: i32 = row.get("version_id"); - let now = ::now(); - - // Bump download counts. - // - // Note that this is *not* an atomic update, and that's somewhat - // intentional. It doesn't appear that postgres supports an atomic update of - // a counter, so we just do the hopefully "least racy" thing. This is - // largely ok because these download counters are just that, counters. No - // need to have super high-fidelity counter. - // - // Also, we only update the counter for *today*, nothing else. We have lots - // of other counters, but they're all updated later on via the - // update-downloads script. - let amt = tx.execute("UPDATE version_downloads - SET downloads = downloads + 1 - WHERE version_id = $1 AND date($2) = date(date)", - &[&version_id, &now])?; - if amt == 0 { - tx.execute("INSERT INTO version_downloads - (version_id) VALUES ($1)", &[&version_id])?; - } + let crate_id: i32 = row.get("id"); + + tx.execute("INSERT INTO crate_downloads (crate_id, downloads, date) + VALUES ($1, 1, CURRENT_DATE) + ON CONFLICT (crate_id, date) DO UPDATE + SET downloads = crate_downloads.downloads + 1", &[&crate_id])?; + + let rows = tx.query("UPDATE versions SET downloads = downloads + 1 + WHERE crate_id = $1 AND num = $2 + RETURNING id", &[&crate_id, &version])?; + let row = rows.iter().next().chain_error(|| { + human("version not found") + })?; + let version_id: i32 = row.get("id"); + + // Update the version_downloads table in a way that lets the + // update-downloads script know not to count this download + // (It will still be running at this point to process downloads + // from before this was deployed) + tx.execute("INSERT INTO version_downloads + (version_id, date, downloads, counted, processed) + VALUES ($1, CURRENT_DATE, 1, 1, 't') + ON CONFLICT (version_id, date) DO UPDATE + SET downloads = version_downloads.downloads + 1, + counted = version_downloads.counted + 1", + &[&version_id])?; + + // We update this last since doing it sooner would effectively cause an + // exclusive lock on incrementing download counts until the transaction + // commits, while we could potentially be doing the first 4 queries + // concurrently. + tx.execute("UPDATE metadata SET total_downloads = total_downloads + 1", &[])?; + Ok(()) } From 4d2dc8ec0f10ab7c138fa98ed595459229147c98 Mon Sep 17 00:00:00 2001 From: Sean Griffin Date: Wed, 22 Mar 2017 10:55:58 -0400 Subject: [PATCH 2/2] Remove the `update-downloads` binary Now that we've let the script catch up and are updating these counters when the download occurs, this binary (and the columns it relied on) are useless --- Procfile | 1 - .../down.sql | 6 + .../up.sql | 5 + src/bin/update-downloads.rs | 304 ------------------ src/download.rs | 6 - src/krate.rs | 13 +- src/schema.rs | 5 +- 7 files changed, 16 insertions(+), 324 deletions(-) create mode 100644 migrations/20170322144629_drop_update_downloads_columns_from_version_downloads/down.sql create mode 100644 migrations/20170322144629_drop_update_downloads_columns_from_version_downloads/up.sql delete mode 100644 src/bin/update-downloads.rs diff --git a/Procfile b/Procfile index b4fa48bf1c3..c54e3e17e6d 100644 --- a/Procfile +++ b/Procfile @@ -1,2 +1 @@ web: ./target/release/migrate && bin/diesel migration run && bin/start-nginx ./target/release/server -worker: ./target/release/update-downloads daemon 300 diff --git a/migrations/20170322144629_drop_update_downloads_columns_from_version_downloads/down.sql b/migrations/20170322144629_drop_update_downloads_columns_from_version_downloads/down.sql new file mode 100644 index 00000000000..12cb84d97b9 --- /dev/null +++ b/migrations/20170322144629_drop_update_downloads_columns_from_version_downloads/down.sql @@ -0,0 +1,6 @@ +ALTER TABLE version_downloads DROP CONSTRAINT version_downloads_pkey; +ALTER TABLE version_downloads ADD COLUMN id SERIAL PRIMARY KEY; +ALTER TABLE version_downloads ADD COLUMN counted INTEGER NOT NULL DEFAULT 0; +ALTER TABLE version_downloads ADD COLUMN processed BOOLEAN NOT NULL DEFAULT 'f'; +UPDATE version_downloads SET counted = downloads, processed = 't'; +CREATE UNIQUE INDEX version_downloads_unique ON version_downloads (version_id, date); diff --git a/migrations/20170322144629_drop_update_downloads_columns_from_version_downloads/up.sql b/migrations/20170322144629_drop_update_downloads_columns_from_version_downloads/up.sql new file mode 100644 index 00000000000..e4820d866fe --- /dev/null +++ b/migrations/20170322144629_drop_update_downloads_columns_from_version_downloads/up.sql @@ -0,0 +1,5 @@ +ALTER TABLE version_downloads DROP COLUMN id; +ALTER TABLE version_downloads DROP COLUMN counted; +ALTER TABLE version_downloads DROP COLUMN processed; +DROP INDEX version_downloads_unique; +ALTER TABLE version_downloads ADD PRIMARY KEY (version_id, date); diff --git a/src/bin/update-downloads.rs b/src/bin/update-downloads.rs deleted file mode 100644 index 299d97558aa..00000000000 --- a/src/bin/update-downloads.rs +++ /dev/null @@ -1,304 +0,0 @@ -#![deny(warnings)] - -extern crate cargo_registry; -extern crate chrono; -extern crate openssl; -extern crate postgres; -extern crate semver; -extern crate time; - -use std::collections::HashMap; -use std::env; -use std::time::Duration; - -use cargo_registry::{VersionDownload, Version, Model}; - -static LIMIT: i64 = 1000; - -#[allow(dead_code)] // dead in tests -fn main() { - let daemon = env::args().nth(1).as_ref().map(|s| &s[..]) - == Some("daemon"); - let sleep = env::args().nth(2).map(|s| s.parse().unwrap()); - loop { - let conn = cargo_registry::db::connect_now(); - update(&conn).unwrap(); - drop(conn); - if daemon { - std::thread::sleep(Duration::new(sleep.unwrap(), 0)); - } else { - break - } - } -} - -fn update(conn: &postgres::GenericConnection) -> postgres::Result<()> { - let mut max = 0; - loop { - // FIXME(rust-lang/rust#27401): weird declaration to make sure this - // variable gets dropped. - let tx; tx = conn.transaction()?; - { - let stmt = tx.prepare("SELECT * FROM version_downloads \ - WHERE processed = FALSE AND id > $1 - ORDER BY id ASC - LIMIT $2")?; - let mut rows = stmt.query(&[&max, &LIMIT])?; - match collect(&tx, &mut rows)? { - None => break, - Some(m) => max = m, - } - } - tx.set_commit(); - tx.finish()?; - } - Ok(()) -} - -fn collect(tx: &postgres::transaction::Transaction, - rows: &mut postgres::rows::Rows) -> postgres::Result> { - // Anything older than 24 hours ago will be frozen and will not be queried - // against again. - let now = chrono::UTC::now(); - let cutoff = now.naive_utc().date() - chrono::Duration::days(1); - - let mut map = HashMap::new(); - for row in rows.iter() { - let download: VersionDownload = Model::from_row(&row); - assert!(map.insert(download.id, download).is_none()); - } - println!("updating {} versions (cutoff {})", map.len(), now.to_rfc2822()); - if map.len() == 0 { - return Ok(None) - } - - let mut max = 0; - let mut total = 0; - for (id, download) in map.iter() { - if *id > max { - max = *id; - } - if download.date > cutoff && download.counted == download.downloads { - continue - } - let amt = download.downloads - download.counted; - - // Flag this row as having been processed if we're passed the cutoff, - // and unconditionally increment the number of counted downloads. - tx.execute("UPDATE version_downloads - SET processed = $2, counted = counted + $3 - WHERE id = $1", - &[id, &(download.date < cutoff), &amt])?; - total += amt as i64; - - if amt == 0 { - continue - } - - let crate_id = Version::find(tx, download.version_id).unwrap().crate_id; - - // Update the total number of version downloads - tx.execute("UPDATE versions - SET downloads = downloads + $1 - WHERE id = $2", - &[&amt, &download.version_id])?; - // Update the total number of crate downloads - tx.execute("UPDATE crates SET downloads = downloads + $1 - WHERE id = $2", &[&amt, &crate_id])?; - - // Update the total number of crate downloads for today - let cnt = tx.execute("UPDATE crate_downloads - SET downloads = downloads + $2 - WHERE crate_id = $1 AND date = $3", - &[&crate_id, &amt, &download.date])?; - if cnt == 0 { - tx.execute("INSERT INTO crate_downloads - (crate_id, downloads, date) - VALUES ($1, $2, $3)", - &[&crate_id, &amt, &download.date])?; - } - } - - // After everything else is done, update the global counter of total - // downloads. - tx.execute("UPDATE metadata SET total_downloads = total_downloads + $1", - &[&total])?; - - Ok(Some(max)) -} - -#[cfg(test)] -mod test { - use std::collections::HashMap; - - use time; - use time::Duration; - - use postgres; - use semver; - - use cargo_registry::{Version, Crate, User, Model, env}; - - fn conn() -> postgres::Connection { - postgres::Connection::connect(&env("TEST_DATABASE_URL")[..], - postgres::TlsMode::None).unwrap() - } - - fn user(conn: &postgres::transaction::Transaction) -> User{ - User::find_or_insert(conn, 2, "login", None, None, None, - "access_token").unwrap() - } - - fn crate_downloads(tx: &postgres::transaction::Transaction, id: i32, expected: usize) { - let stmt = tx.prepare("SELECT * FROM crate_downloads - WHERE crate_id = $1").unwrap(); - let dl: i32 = stmt.query(&[&id]).unwrap().iter() - .next().unwrap().get("downloads"); - assert_eq!(dl, expected as i32); - } - - #[test] - fn increment() { - let conn = conn(); - let tx = conn.transaction().unwrap(); - let user = user(&tx); - let krate = Crate::find_or_insert(&tx, "foo", user.id, &None, &None, - &None, &None, &None, &None, - &None, None).unwrap(); - let version = Version::insert(&tx, krate.id, - &semver::Version::parse("1.0.0").unwrap(), - &HashMap::new(), &[]).unwrap(); - tx.execute("INSERT INTO version_downloads \ - (version_id) - VALUES ($1)", - &[&version.id]).unwrap(); - tx.execute("INSERT INTO version_downloads \ - (version_id, date, processed) - VALUES ($1, current_date - interval '1 day', true)", - &[&version.id]).unwrap(); - ::update(&tx).unwrap(); - assert_eq!(Version::find(&tx, version.id).unwrap().downloads, 1); - assert_eq!(Crate::find(&tx, krate.id).unwrap().downloads, 1); - crate_downloads(&tx, krate.id, 1); - ::update(&tx).unwrap(); - assert_eq!(Version::find(&tx, version.id).unwrap().downloads, 1); - } - - #[test] - fn set_processed_true() { - let conn = conn(); - let tx = conn.transaction().unwrap(); - let user = user(&tx); - let krate = Crate::find_or_insert(&tx, "foo", user.id, &None, - &None, &None, &None, &None, - &None, &None, None).unwrap(); - let version = Version::insert(&tx, krate.id, - &semver::Version::parse("1.0.0").unwrap(), - &HashMap::new(), &[]).unwrap(); - tx.execute("INSERT INTO version_downloads \ - (version_id, downloads, counted, date, processed) - VALUES ($1, 2, 2, current_date - interval '2 days', false)", - &[&version.id]).unwrap(); - ::update(&tx).unwrap(); - let stmt = tx.prepare("SELECT processed FROM version_downloads - WHERE version_id = $1").unwrap(); - let processed: bool = stmt.query(&[&version.id]).unwrap().iter() - .next().unwrap().get("processed"); - assert!(processed); - } - - #[test] - fn dont_process_recent_row() { - let conn = conn(); - let tx = conn.transaction().unwrap(); - let user = user(&tx); - let krate = Crate::find_or_insert(&tx, "foo", user.id, &None, - &None, &None, &None, &None, - &None, &None, None).unwrap(); - let version = Version::insert(&tx, krate.id, - &semver::Version::parse("1.0.0").unwrap(), - &HashMap::new(), &[]).unwrap(); - let time = time::now_utc().to_timespec() - Duration::hours(2); - tx.execute("INSERT INTO version_downloads \ - (version_id, downloads, counted, date, processed) - VALUES ($1, 2, 2, date($2), false)", - &[&version.id, &time]).unwrap(); - ::update(&tx).unwrap(); - let stmt = tx.prepare("SELECT processed FROM version_downloads - WHERE version_id = $1").unwrap(); - let processed: bool = stmt.query(&[&version.id]).unwrap().iter() - .next().unwrap().get("processed"); - assert!(!processed); - } - - #[test] - fn increment_a_little() { - let conn = conn(); - let tx = conn.transaction().unwrap(); - let user = user(&tx); - let krate = Crate::find_or_insert(&tx, "foo", user.id, &None, - &None, &None, &None, &None, - &None, &None, None).unwrap(); - let version = Version::insert(&tx, krate.id, - &semver::Version::parse("1.0.0").unwrap(), - &HashMap::new(), &[]).unwrap(); - tx.execute("UPDATE versions - SET updated_at = current_date - interval '2 hours'", - &[]).unwrap(); - tx.execute("UPDATE crates - SET updated_at = current_date - interval '2 hours'", - &[]).unwrap(); - tx.execute("INSERT INTO version_downloads \ - (version_id, downloads, counted, date, processed) - VALUES ($1, 2, 1, current_date, false)", - &[&version.id]).unwrap(); - tx.execute("INSERT INTO version_downloads \ - (version_id, date) - VALUES ($1, current_date - interval '1 day')", - &[&version.id]).unwrap(); - - let version_before = Version::find(&tx, version.id).unwrap(); - let krate_before = Crate::find(&tx, krate.id).unwrap(); - ::update(&tx).unwrap(); - let version2 = Version::find(&tx, version.id).unwrap(); - assert_eq!(version2.downloads, 2); - assert_eq!(version2.updated_at, version_before.updated_at); - let krate2 = Crate::find(&tx, krate.id).unwrap(); - assert_eq!(krate2.downloads, 2); - assert_eq!(krate2.updated_at, krate_before.updated_at); - crate_downloads(&tx, krate.id, 1); - ::update(&tx).unwrap(); - assert_eq!(Version::find(&tx, version.id).unwrap().downloads, 2); - } - - #[test] - fn set_processed_no_set_updated_at() { - let conn = conn(); - let tx = conn.transaction().unwrap(); - let user = user(&tx); - let krate = Crate::find_or_insert(&tx, "foo", user.id, &None, - &None, &None, &None, &None, - &None, &None, None).unwrap(); - let version = Version::insert(&tx, krate.id, - &semver::Version::parse("1.0.0").unwrap(), - &HashMap::new(), &[]).unwrap(); - tx.execute("UPDATE versions - SET updated_at = current_date - interval '2 days'", - &[]).unwrap(); - tx.execute("UPDATE crates - SET updated_at = current_date - interval '2 days'", - &[]).unwrap(); - tx.execute("INSERT INTO version_downloads \ - (version_id, downloads, counted, date, processed) - VALUES ($1, 2, 2, current_date - interval '2 days', false)", - &[&version.id]).unwrap(); - - let version_before = Version::find(&tx, version.id).unwrap(); - let krate_before = Crate::find(&tx, krate.id).unwrap(); - ::update(&tx).unwrap(); - let version2 = Version::find(&tx, version.id).unwrap(); - assert_eq!(version2.updated_at, version_before.updated_at); - let krate2 = Crate::find(&tx, krate.id).unwrap(); - assert_eq!(krate2.updated_at, krate_before.updated_at); - } -} diff --git a/src/download.rs b/src/download.rs index 8c32374fb4f..16584f12c30 100644 --- a/src/download.rs +++ b/src/download.rs @@ -4,16 +4,13 @@ use pg::rows::Row; use Model; pub struct VersionDownload { - pub id: i32, pub version_id: i32, pub downloads: i32, - pub counted: i32, pub date: NaiveDate, } #[derive(RustcEncodable, RustcDecodable)] pub struct EncodableVersionDownload { - pub id: i32, pub version: i32, pub downloads: i32, pub date: String, @@ -22,7 +19,6 @@ pub struct EncodableVersionDownload { impl VersionDownload { pub fn encodable(self) -> EncodableVersionDownload { EncodableVersionDownload { - id: self.id, version: self.version_id, downloads: self.downloads, date: self.date.to_string(), @@ -33,10 +29,8 @@ impl VersionDownload { impl Model for VersionDownload { fn from_row(row: &Row) -> VersionDownload { VersionDownload { - id: row.get("id"), version_id: row.get("version_id"), downloads: row.get("downloads"), - counted: row.get("counted"), date: row.get("date"), } } diff --git a/src/krate.rs b/src/krate.rs index 1c51acc4c85..399658793b6 100644 --- a/src/krate.rs +++ b/src/krate.rs @@ -1046,7 +1046,8 @@ fn increment_download_counts(req: &Request, crate_name: &str, version: &str) -> tx.execute("INSERT INTO crate_downloads (crate_id, downloads, date) VALUES ($1, 1, CURRENT_DATE) ON CONFLICT (crate_id, date) DO UPDATE - SET downloads = crate_downloads.downloads + 1", &[&crate_id])?; + SET downloads = crate_downloads.downloads + 1", + &[&crate_id])?; let rows = tx.query("UPDATE versions SET downloads = downloads + 1 WHERE crate_id = $1 AND num = $2 @@ -1056,16 +1057,10 @@ fn increment_download_counts(req: &Request, crate_name: &str, version: &str) -> })?; let version_id: i32 = row.get("id"); - // Update the version_downloads table in a way that lets the - // update-downloads script know not to count this download - // (It will still be running at this point to process downloads - // from before this was deployed) tx.execute("INSERT INTO version_downloads - (version_id, date, downloads, counted, processed) - VALUES ($1, CURRENT_DATE, 1, 1, 't') + (version_id) VALUES ($1) ON CONFLICT (version_id, date) DO UPDATE - SET downloads = version_downloads.downloads + 1, - counted = version_downloads.counted + 1", + SET downloads = version_downloads.downloads + 1", &[&version_id])?; // We update this last since doing it sooner would effectively cause an diff --git a/src/schema.rs b/src/schema.rs index f6670d5b7e0..54061c7a2f9 100644 --- a/src/schema.rs +++ b/src/schema.rs @@ -150,13 +150,10 @@ table! { } table! { - version_downloads (id) { - id -> Int4, + version_downloads (version_id, date) { version_id -> Int4, downloads -> Int4, - counted -> Int4, date -> Date, - processed -> Bool, } }