diff --git a/Procfile b/Procfile index b4fa48bf1c3..c54e3e17e6d 100644 --- a/Procfile +++ b/Procfile @@ -1,2 +1 @@ web: ./target/release/migrate && bin/diesel migration run && bin/start-nginx ./target/release/server -worker: ./target/release/update-downloads daemon 300 diff --git a/migrations/20170322144629_drop_update_downloads_columns_from_version_downloads/down.sql b/migrations/20170322144629_drop_update_downloads_columns_from_version_downloads/down.sql new file mode 100644 index 00000000000..12cb84d97b9 --- /dev/null +++ b/migrations/20170322144629_drop_update_downloads_columns_from_version_downloads/down.sql @@ -0,0 +1,6 @@ +ALTER TABLE version_downloads DROP CONSTRAINT version_downloads_pkey; +ALTER TABLE version_downloads ADD COLUMN id SERIAL PRIMARY KEY; +ALTER TABLE version_downloads ADD COLUMN counted INTEGER NOT NULL DEFAULT 0; +ALTER TABLE version_downloads ADD COLUMN processed BOOLEAN NOT NULL DEFAULT 'f'; +UPDATE version_downloads SET counted = downloads, processed = 't'; +CREATE UNIQUE INDEX version_downloads_unique ON version_downloads (version_id, date); diff --git a/migrations/20170322144629_drop_update_downloads_columns_from_version_downloads/up.sql b/migrations/20170322144629_drop_update_downloads_columns_from_version_downloads/up.sql new file mode 100644 index 00000000000..e4820d866fe --- /dev/null +++ b/migrations/20170322144629_drop_update_downloads_columns_from_version_downloads/up.sql @@ -0,0 +1,5 @@ +ALTER TABLE version_downloads DROP COLUMN id; +ALTER TABLE version_downloads DROP COLUMN counted; +ALTER TABLE version_downloads DROP COLUMN processed; +DROP INDEX version_downloads_unique; +ALTER TABLE version_downloads ADD PRIMARY KEY (version_id, date); diff --git a/src/bin/update-downloads.rs b/src/bin/update-downloads.rs deleted file mode 100644 index 299d97558aa..00000000000 --- a/src/bin/update-downloads.rs +++ /dev/null @@ -1,304 +0,0 @@ -#![deny(warnings)] - -extern crate cargo_registry; -extern crate chrono; -extern crate openssl; -extern crate postgres; -extern crate semver; -extern crate time; - -use std::collections::HashMap; -use std::env; -use std::time::Duration; - -use cargo_registry::{VersionDownload, Version, Model}; - -static LIMIT: i64 = 1000; - -#[allow(dead_code)] // dead in tests -fn main() { - let daemon = env::args().nth(1).as_ref().map(|s| &s[..]) - == Some("daemon"); - let sleep = env::args().nth(2).map(|s| s.parse().unwrap()); - loop { - let conn = cargo_registry::db::connect_now(); - update(&conn).unwrap(); - drop(conn); - if daemon { - std::thread::sleep(Duration::new(sleep.unwrap(), 0)); - } else { - break - } - } -} - -fn update(conn: &postgres::GenericConnection) -> postgres::Result<()> { - let mut max = 0; - loop { - // FIXME(rust-lang/rust#27401): weird declaration to make sure this - // variable gets dropped. - let tx; tx = conn.transaction()?; - { - let stmt = tx.prepare("SELECT * FROM version_downloads \ - WHERE processed = FALSE AND id > $1 - ORDER BY id ASC - LIMIT $2")?; - let mut rows = stmt.query(&[&max, &LIMIT])?; - match collect(&tx, &mut rows)? { - None => break, - Some(m) => max = m, - } - } - tx.set_commit(); - tx.finish()?; - } - Ok(()) -} - -fn collect(tx: &postgres::transaction::Transaction, - rows: &mut postgres::rows::Rows) -> postgres::Result> { - // Anything older than 24 hours ago will be frozen and will not be queried - // against again. - let now = chrono::UTC::now(); - let cutoff = now.naive_utc().date() - chrono::Duration::days(1); - - let mut map = HashMap::new(); - for row in rows.iter() { - let download: VersionDownload = Model::from_row(&row); - assert!(map.insert(download.id, download).is_none()); - } - println!("updating {} versions (cutoff {})", map.len(), now.to_rfc2822()); - if map.len() == 0 { - return Ok(None) - } - - let mut max = 0; - let mut total = 0; - for (id, download) in map.iter() { - if *id > max { - max = *id; - } - if download.date > cutoff && download.counted == download.downloads { - continue - } - let amt = download.downloads - download.counted; - - // Flag this row as having been processed if we're passed the cutoff, - // and unconditionally increment the number of counted downloads. - tx.execute("UPDATE version_downloads - SET processed = $2, counted = counted + $3 - WHERE id = $1", - &[id, &(download.date < cutoff), &amt])?; - total += amt as i64; - - if amt == 0 { - continue - } - - let crate_id = Version::find(tx, download.version_id).unwrap().crate_id; - - // Update the total number of version downloads - tx.execute("UPDATE versions - SET downloads = downloads + $1 - WHERE id = $2", - &[&amt, &download.version_id])?; - // Update the total number of crate downloads - tx.execute("UPDATE crates SET downloads = downloads + $1 - WHERE id = $2", &[&amt, &crate_id])?; - - // Update the total number of crate downloads for today - let cnt = tx.execute("UPDATE crate_downloads - SET downloads = downloads + $2 - WHERE crate_id = $1 AND date = $3", - &[&crate_id, &amt, &download.date])?; - if cnt == 0 { - tx.execute("INSERT INTO crate_downloads - (crate_id, downloads, date) - VALUES ($1, $2, $3)", - &[&crate_id, &amt, &download.date])?; - } - } - - // After everything else is done, update the global counter of total - // downloads. - tx.execute("UPDATE metadata SET total_downloads = total_downloads + $1", - &[&total])?; - - Ok(Some(max)) -} - -#[cfg(test)] -mod test { - use std::collections::HashMap; - - use time; - use time::Duration; - - use postgres; - use semver; - - use cargo_registry::{Version, Crate, User, Model, env}; - - fn conn() -> postgres::Connection { - postgres::Connection::connect(&env("TEST_DATABASE_URL")[..], - postgres::TlsMode::None).unwrap() - } - - fn user(conn: &postgres::transaction::Transaction) -> User{ - User::find_or_insert(conn, 2, "login", None, None, None, - "access_token").unwrap() - } - - fn crate_downloads(tx: &postgres::transaction::Transaction, id: i32, expected: usize) { - let stmt = tx.prepare("SELECT * FROM crate_downloads - WHERE crate_id = $1").unwrap(); - let dl: i32 = stmt.query(&[&id]).unwrap().iter() - .next().unwrap().get("downloads"); - assert_eq!(dl, expected as i32); - } - - #[test] - fn increment() { - let conn = conn(); - let tx = conn.transaction().unwrap(); - let user = user(&tx); - let krate = Crate::find_or_insert(&tx, "foo", user.id, &None, &None, - &None, &None, &None, &None, - &None, None).unwrap(); - let version = Version::insert(&tx, krate.id, - &semver::Version::parse("1.0.0").unwrap(), - &HashMap::new(), &[]).unwrap(); - tx.execute("INSERT INTO version_downloads \ - (version_id) - VALUES ($1)", - &[&version.id]).unwrap(); - tx.execute("INSERT INTO version_downloads \ - (version_id, date, processed) - VALUES ($1, current_date - interval '1 day', true)", - &[&version.id]).unwrap(); - ::update(&tx).unwrap(); - assert_eq!(Version::find(&tx, version.id).unwrap().downloads, 1); - assert_eq!(Crate::find(&tx, krate.id).unwrap().downloads, 1); - crate_downloads(&tx, krate.id, 1); - ::update(&tx).unwrap(); - assert_eq!(Version::find(&tx, version.id).unwrap().downloads, 1); - } - - #[test] - fn set_processed_true() { - let conn = conn(); - let tx = conn.transaction().unwrap(); - let user = user(&tx); - let krate = Crate::find_or_insert(&tx, "foo", user.id, &None, - &None, &None, &None, &None, - &None, &None, None).unwrap(); - let version = Version::insert(&tx, krate.id, - &semver::Version::parse("1.0.0").unwrap(), - &HashMap::new(), &[]).unwrap(); - tx.execute("INSERT INTO version_downloads \ - (version_id, downloads, counted, date, processed) - VALUES ($1, 2, 2, current_date - interval '2 days', false)", - &[&version.id]).unwrap(); - ::update(&tx).unwrap(); - let stmt = tx.prepare("SELECT processed FROM version_downloads - WHERE version_id = $1").unwrap(); - let processed: bool = stmt.query(&[&version.id]).unwrap().iter() - .next().unwrap().get("processed"); - assert!(processed); - } - - #[test] - fn dont_process_recent_row() { - let conn = conn(); - let tx = conn.transaction().unwrap(); - let user = user(&tx); - let krate = Crate::find_or_insert(&tx, "foo", user.id, &None, - &None, &None, &None, &None, - &None, &None, None).unwrap(); - let version = Version::insert(&tx, krate.id, - &semver::Version::parse("1.0.0").unwrap(), - &HashMap::new(), &[]).unwrap(); - let time = time::now_utc().to_timespec() - Duration::hours(2); - tx.execute("INSERT INTO version_downloads \ - (version_id, downloads, counted, date, processed) - VALUES ($1, 2, 2, date($2), false)", - &[&version.id, &time]).unwrap(); - ::update(&tx).unwrap(); - let stmt = tx.prepare("SELECT processed FROM version_downloads - WHERE version_id = $1").unwrap(); - let processed: bool = stmt.query(&[&version.id]).unwrap().iter() - .next().unwrap().get("processed"); - assert!(!processed); - } - - #[test] - fn increment_a_little() { - let conn = conn(); - let tx = conn.transaction().unwrap(); - let user = user(&tx); - let krate = Crate::find_or_insert(&tx, "foo", user.id, &None, - &None, &None, &None, &None, - &None, &None, None).unwrap(); - let version = Version::insert(&tx, krate.id, - &semver::Version::parse("1.0.0").unwrap(), - &HashMap::new(), &[]).unwrap(); - tx.execute("UPDATE versions - SET updated_at = current_date - interval '2 hours'", - &[]).unwrap(); - tx.execute("UPDATE crates - SET updated_at = current_date - interval '2 hours'", - &[]).unwrap(); - tx.execute("INSERT INTO version_downloads \ - (version_id, downloads, counted, date, processed) - VALUES ($1, 2, 1, current_date, false)", - &[&version.id]).unwrap(); - tx.execute("INSERT INTO version_downloads \ - (version_id, date) - VALUES ($1, current_date - interval '1 day')", - &[&version.id]).unwrap(); - - let version_before = Version::find(&tx, version.id).unwrap(); - let krate_before = Crate::find(&tx, krate.id).unwrap(); - ::update(&tx).unwrap(); - let version2 = Version::find(&tx, version.id).unwrap(); - assert_eq!(version2.downloads, 2); - assert_eq!(version2.updated_at, version_before.updated_at); - let krate2 = Crate::find(&tx, krate.id).unwrap(); - assert_eq!(krate2.downloads, 2); - assert_eq!(krate2.updated_at, krate_before.updated_at); - crate_downloads(&tx, krate.id, 1); - ::update(&tx).unwrap(); - assert_eq!(Version::find(&tx, version.id).unwrap().downloads, 2); - } - - #[test] - fn set_processed_no_set_updated_at() { - let conn = conn(); - let tx = conn.transaction().unwrap(); - let user = user(&tx); - let krate = Crate::find_or_insert(&tx, "foo", user.id, &None, - &None, &None, &None, &None, - &None, &None, None).unwrap(); - let version = Version::insert(&tx, krate.id, - &semver::Version::parse("1.0.0").unwrap(), - &HashMap::new(), &[]).unwrap(); - tx.execute("UPDATE versions - SET updated_at = current_date - interval '2 days'", - &[]).unwrap(); - tx.execute("UPDATE crates - SET updated_at = current_date - interval '2 days'", - &[]).unwrap(); - tx.execute("INSERT INTO version_downloads \ - (version_id, downloads, counted, date, processed) - VALUES ($1, 2, 2, current_date - interval '2 days', false)", - &[&version.id]).unwrap(); - - let version_before = Version::find(&tx, version.id).unwrap(); - let krate_before = Crate::find(&tx, krate.id).unwrap(); - ::update(&tx).unwrap(); - let version2 = Version::find(&tx, version.id).unwrap(); - assert_eq!(version2.updated_at, version_before.updated_at); - let krate2 = Crate::find(&tx, krate.id).unwrap(); - assert_eq!(krate2.updated_at, krate_before.updated_at); - } -} diff --git a/src/download.rs b/src/download.rs index 8c32374fb4f..16584f12c30 100644 --- a/src/download.rs +++ b/src/download.rs @@ -4,16 +4,13 @@ use pg::rows::Row; use Model; pub struct VersionDownload { - pub id: i32, pub version_id: i32, pub downloads: i32, - pub counted: i32, pub date: NaiveDate, } #[derive(RustcEncodable, RustcDecodable)] pub struct EncodableVersionDownload { - pub id: i32, pub version: i32, pub downloads: i32, pub date: String, @@ -22,7 +19,6 @@ pub struct EncodableVersionDownload { impl VersionDownload { pub fn encodable(self) -> EncodableVersionDownload { EncodableVersionDownload { - id: self.id, version: self.version_id, downloads: self.downloads, date: self.date.to_string(), @@ -33,10 +29,8 @@ impl VersionDownload { impl Model for VersionDownload { fn from_row(row: &Row) -> VersionDownload { VersionDownload { - id: row.get("id"), version_id: row.get("version_id"), downloads: row.get("downloads"), - counted: row.get("counted"), date: row.get("date"), } } diff --git a/src/krate.rs b/src/krate.rs index d7dae3f7455..399658793b6 100644 --- a/src/krate.rs +++ b/src/krate.rs @@ -1034,40 +1034,41 @@ pub fn download(req: &mut Request) -> CargoResult { fn increment_download_counts(req: &Request, crate_name: &str, version: &str) -> CargoResult<()> { let tx = req.tx()?; - let stmt = tx.prepare("SELECT versions.id as version_id - FROM crates - INNER JOIN versions ON - crates.id = versions.crate_id - WHERE canon_crate_name(crates.name) = - canon_crate_name($1) - AND versions.num = $2 - LIMIT 1")?; - let rows = stmt.query(&[&crate_name, &version])?; + + let rows = tx.query("UPDATE crates SET downloads = downloads + 1 + WHERE canon_crate_name(name) = canon_crate_name($1) + RETURNING id", &[&crate_name])?; let row = rows.iter().next().chain_error(|| { - human("crate or version not found") + human("crate not found") })?; - let version_id: i32 = row.get("version_id"); - let now = ::now(); - - // Bump download counts. - // - // Note that this is *not* an atomic update, and that's somewhat - // intentional. It doesn't appear that postgres supports an atomic update of - // a counter, so we just do the hopefully "least racy" thing. This is - // largely ok because these download counters are just that, counters. No - // need to have super high-fidelity counter. - // - // Also, we only update the counter for *today*, nothing else. We have lots - // of other counters, but they're all updated later on via the - // update-downloads script. - let amt = tx.execute("UPDATE version_downloads - SET downloads = downloads + 1 - WHERE version_id = $1 AND date($2) = date(date)", - &[&version_id, &now])?; - if amt == 0 { - tx.execute("INSERT INTO version_downloads - (version_id) VALUES ($1)", &[&version_id])?; - } + let crate_id: i32 = row.get("id"); + + tx.execute("INSERT INTO crate_downloads (crate_id, downloads, date) + VALUES ($1, 1, CURRENT_DATE) + ON CONFLICT (crate_id, date) DO UPDATE + SET downloads = crate_downloads.downloads + 1", + &[&crate_id])?; + + let rows = tx.query("UPDATE versions SET downloads = downloads + 1 + WHERE crate_id = $1 AND num = $2 + RETURNING id", &[&crate_id, &version])?; + let row = rows.iter().next().chain_error(|| { + human("version not found") + })?; + let version_id: i32 = row.get("id"); + + tx.execute("INSERT INTO version_downloads + (version_id) VALUES ($1) + ON CONFLICT (version_id, date) DO UPDATE + SET downloads = version_downloads.downloads + 1", + &[&version_id])?; + + // We update this last since doing it sooner would effectively cause an + // exclusive lock on incrementing download counts until the transaction + // commits, while we could potentially be doing the first 4 queries + // concurrently. + tx.execute("UPDATE metadata SET total_downloads = total_downloads + 1", &[])?; + Ok(()) } diff --git a/src/schema.rs b/src/schema.rs index f6670d5b7e0..54061c7a2f9 100644 --- a/src/schema.rs +++ b/src/schema.rs @@ -150,13 +150,10 @@ table! { } table! { - version_downloads (id) { - id -> Int4, + version_downloads (version_id, date) { version_id -> Int4, downloads -> Int4, - counted -> Int4, date -> Date, - processed -> Bool, } }