From aba300fa191937d352adb51866a9a7298ffd01d3 Mon Sep 17 00:00:00 2001 From: Gabor Csardi Date: Thu, 23 Jul 2015 17:58:02 -0400 Subject: [PATCH] Handle ping, update DB from web app --- db/create.sh | 9 +++++---- lib/append_csv_to_db.js | 46 ++++++++++++++++++++++++++++++++++++++++++++++ lib/clean_pkg_csv.js | 41 +++++++++++++++++++++++++++++++++++++++++ lib/clean_r_csv.js | 39 +++++++++++++++++++++++++++++++++++++++ lib/download_csv.js | 27 +++++++++++++++++++++++++++ lib/last_pkg.js | 29 +++++++++++++++++++++++++++++ lib/missing_days.js | 16 ++++++++++++++++ lib/missing_urls.js | 15 +++++++++++++++ lib/update_pkg_db_day.js | 23 +++++++++++++++++++++++ lib/update_r_db_day.js | 23 +++++++++++++++++++++++ package.json | 16 ++++++++++------ routes/ping.js | 37 +++++++++++++++++++++++++++++++++++-- 12 files changed, 309 insertions(+), 12 deletions(-) create mode 100644 lib/append_csv_to_db.js create mode 100644 lib/clean_pkg_csv.js create mode 100644 lib/clean_r_csv.js create mode 100644 lib/download_csv.js create mode 100644 lib/last_pkg.js create mode 100644 lib/missing_days.js create mode 100644 lib/missing_urls.js create mode 100644 lib/update_pkg_db_day.js create mode 100644 lib/update_r_db_day.js diff --git a/db/create.sh b/db/create.sh index 65ccb3d..5ddfec6 100644 --- a/db/create.sh +++ b/db/create.sh @@ -1,4 +1,6 @@ +APP=cranlogs + read -r -d '' CREATE_DB <<'EOF' CREATE TABLE daily ( day DATE, @@ -10,7 +12,7 @@ CREATE INDEX idx_daily_day ON daily(day); CREATE INDEX idx_daily_package ON daily(package); EOF -echo "$CREATE_DB" | dokku psql:restore_sql cranlogs +echo "$CREATE_DB" | dokku psql:restore_sql $APP read -r -d '' ADD_VIEWS <<'EOF' CREATE MATERIALIZED VIEW top_day AS @@ -72,7 +74,7 @@ CREATE TRIGGER trig_refresh_views AFTER TRUNCATE OR INSERT OR UPDATE OR DELETE EOF -echo "$ADD_VIEWS" | dokku psql:restore_sql cranlogs +echo "$ADD_VIEWS" | dokku psql:restore_sql $APP read -r -d '' CREATE_DB_R <<'EOF' CREATE TABLE dailyr ( @@ -84,7 +86,6 @@ CREATE TABLE dailyr ( CREATE INDEX idx_dailyr_day ON dailyr(day); CREATE INDEX idx_dailyr_day_version ON dailyr(day, version); CREATE INDEX idx_dailyr_day_os ON dailyr(day, os); -ALTER TABLE dailyr OWNER TO cranlogs; EOF -echo "$CREATE_DB_R" | dokku psql:restore_sql cranlogs +echo "$CREATE_DB_R" | dokku psql:restore_sql $APP diff --git a/lib/append_csv_to_db.js b/lib/append_csv_to_db.js new file mode 100644 index 0000000..9cd8ebb --- /dev/null +++ b/lib/append_csv_to_db.js @@ -0,0 +1,46 @@ + +var stream = require('stream'); +var pg = require('pg'); +var copy_from = require('pg-copy-streams').from; + +var conString = process.env.DATABASE_URL; + +function append_csv_to_db(csv, table, fields, callback) { + pg.connect(conString, function(err, client, done) { + if (err) { done(); callback(err); return; } + var conString = process.env.DATABASE_URL; + var ps = client.query(copy_from('COPY ' + table + ' FROM STDIN')); + var rs = new stream.Readable; + var we_done = false; + rs.pipe(ps) + .on('finish', + function() { + if (!we_done) { + we_done = true; + done(); + callback(null, true); + } + }) + .on('error', + function(err) { + if (!we_done) { + we_done = true; + done(); + callback(err); + } + }); + rs.resume(); + csv.forEach(function(x) { + var str = ''; + for (f in fields) { + if (f != 0) { str = str + '\t'; } + str = str + x[ fields[f] ]; + } + str = str + '\n'; + rs.push(str); + }); + rs.push(null); + }) +} + +module.exports = append_csv_to_db; diff --git a/lib/clean_pkg_csv.js b/lib/clean_pkg_csv.js new file mode 100644 index 0000000..8a78186 --- /dev/null +++ b/lib/clean_pkg_csv.js @@ -0,0 +1,41 @@ + +function clean_pkg_csv(csv, callback) { + + // Empty? + if (csv.length <= 1) { callback(null, csv); return; } + + // First line is the header + var header = csv[0]; + csv = csv.splice(1); + + // Date, take from first record + var date = csv[1][ header.indexOf('date') ]; + + // Which field is the package? + var pkg_idx = header.indexOf('package'); + + // Take package names + var pkg = csv.map(function(x) { return x[pkg_idx]; }); + + // Count their downloads + var count = { }; + pkg.forEach(function(x) { + if (count.hasOwnProperty(x)) { + count[x]++; + } else { + count[x] = 1; + } + }); + + // Unique package names + var upkg = Object.keys(count); + + // Proper records + var recs = upkg + .map(function(x) { + return { 'day': date, 'package': x, 'count': count[x] }; + }); + callback(null, recs); +} + +module.exports = clean_pkg_csv; diff --git a/lib/clean_r_csv.js b/lib/clean_r_csv.js new file mode 100644 index 0000000..9e7d5c4 --- /dev/null +++ b/lib/clean_r_csv.js @@ -0,0 +1,39 @@ + +function clean_r_csv(csv, callback) { + + // Empty? + if (csv.length <= 1) { callback(null, csv); return; } + + // First line is the header + var header = csv[0]; + csv = csv.splice(1); + + // Date, take from first record + var date = csv[1][ header.indexOf('date') ]; + + // Which fields are version and os? + var ver_idx = header.indexOf('version'); + var os_idx = header.indexOf('os'); + + // Count by version and os + var count = { }; + csv.forEach(function(x) { + var key = x[os_idx] + '@' + x[ver_idx]; + if (count.hasOwnProperty(key)) { + count[key] ++; + } else { + count[key] = 1; + } + }) + + var recs = [ ]; + for (k in count) { + var os = k.split('@')[0]; + var ver = k.split('@')[1]; + recs.push({ 'day': date, 'version': ver, 'os': os, + 'count': count[k] }); + } + callback(null, recs); +} + +module.exports = clean_r_csv; diff --git a/lib/download_csv.js b/lib/download_csv.js new file mode 100644 index 0000000..f20231e --- /dev/null +++ b/lib/download_csv.js @@ -0,0 +1,27 @@ + +var got = require('got'); +var gunzip = require('zlib').gunzip; +var csv_parse = require('csv-parse'); + +function download_csv(url, callback) { + + console.log('Getting ', url); + got(url, + { headers: { 'user-agent': + 'https://github.com/metacran/cranlogs.app' + }, + encoding: null + }, + function(err, data, res) { + if (err) { callback(err); return; } + gunzip(data, function(err, data) { + if (err) { callback(err); return; } + csv_parse(data, function(err, data) { + if (err) { callback(err); return; } + callback(null, data); + }) + }) + }) +} + +module.exports = download_csv; diff --git a/lib/last_pkg.js b/lib/last_pkg.js new file mode 100644 index 0000000..5c670e1 --- /dev/null +++ b/lib/last_pkg.js @@ -0,0 +1,29 @@ +var pg = require('pg'); + +var conString = process.env.DATABASE_URL; + +function last_pkg(table, callback) { + + pg.connect(conString, function(err, client, done) { + + if (err) { + done(client); + return; + } + + var q = 'SELECT MAX(day) FROM ' + table; + client.query(q, function(err, result) { + if (err) { + done(); + callback(err); + return; + } + + var day = new Date(result['rows'][0]['max'] || '2012-09-30'); + callback(null, day); + done(); + }) + }) +} + +module.exports = last_pkg; diff --git a/lib/missing_days.js b/lib/missing_days.js new file mode 100644 index 0000000..3af111a --- /dev/null +++ b/lib/missing_days.js @@ -0,0 +1,16 @@ + +function missing_days(last_pkg, table, callback) { + + last_pkg(table, function(err, day) { + if (err) { callback(err); return; } + var today = new Date(); + var days = []; + while (day < today) { + day.setDate(day.getDate() + 1); + days.push(day.toISOString().slice(0, 10)); + } + callback(null, days); + }) +} + +module.exports = missing_days; diff --git a/lib/missing_urls.js b/lib/missing_urls.js new file mode 100644 index 0000000..d044c9b --- /dev/null +++ b/lib/missing_urls.js @@ -0,0 +1,15 @@ + +var missing_days = require('../lib/missing_days'); + +function missing_urls(base_url, last, table, callback) { + + missing_days(last, table, function(err, days) { + if (err) { callback(err); return; } + var urls = days.map(function(d) { + return base_url.replace('', d); + }) + callback(null, urls); + }) +} + +module.exports = missing_urls; diff --git a/lib/update_pkg_db_day.js b/lib/update_pkg_db_day.js new file mode 100644 index 0000000..392444e --- /dev/null +++ b/lib/update_pkg_db_day.js @@ -0,0 +1,23 @@ + +var download_csv = require('../lib/download_csv'); +var clean_pkg_csv = require('../lib/clean_pkg_csv'); +var append_csv_to_db = require('../lib/append_csv_to_db'); + +function update_pkg_db_day(url, callback) { + + download_csv(url, function(err, csv) { + // If the file does not exist, that is fine + if (err && err.code == 404) { callback(null, false); return; } + if (err) { callback(err); return; } + clean_pkg_csv(csv, function(err, cleaned) { + append_csv_to_db(cleaned, 'daily', + [ 'day', 'package', 'count' ], + function(err, status) { + if (err) { callback(err); return; } + callback(null, status); + }) + }) + }) +} + +module.exports = update_pkg_db_day; diff --git a/lib/update_r_db_day.js b/lib/update_r_db_day.js new file mode 100644 index 0000000..bfc488b --- /dev/null +++ b/lib/update_r_db_day.js @@ -0,0 +1,23 @@ + +var download_csv = require('../lib/download_csv'); +var clean_r_csv = require('../lib/clean_r_csv'); +var append_csv_to_db = require('../lib/append_csv_to_db'); + +function update_r_db_day(url, callback) { + + download_csv(url, function(err, csv) { + // If the file does not exist, that is fine + if (err && err.code == 404) { callback(null, false); return; } + if (err) { callback(err); return; } + clean_r_csv(csv, function(err, cleaned) { + append_csv_to_db(cleaned, 'dailyr', + [ 'day', 'version', 'os', 'count' ], + function(err, status) { + if (err) { callback(err); return; } + callback(null, status); + }) + }) + }) +} + +module.exports = update_r_db_day; diff --git a/package.json b/package.json index 7cd1bc4..de87161 100644 --- a/package.json +++ b/package.json @@ -5,18 +5,22 @@ "scripts": { "start": "node ./bin/www" }, - "engines": { - "node": "0.12.x" + "engines": { + "node": "0.12.x" }, "dependencies": { - "express": "~4.9.0", + "async": "^1.4.0", "body-parser": "~1.8.1", "cookie-parser": "~1.3.3", - "morgan": "~1.3.0", - "serve-favicon": "~2.1.3", + "csv-parse": "^0.1.3", "debug": "~2.0.0", - "pg": "~3.6.2", + "express": "~4.9.0", + "got": "~3.3.1", + "morgan": "~1.3.0", "multiline": "~1.0.2", + "pg": "~3.6.2", + "pg-copy-streams": "~0.3.0", + "serve-favicon": "~2.1.3", "whiskers": "~0.3.3" } } diff --git a/routes/ping.js b/routes/ping.js index bbac69c..a5e1a79 100644 --- a/routes/ping.js +++ b/routes/ping.js @@ -1,13 +1,46 @@ var express = require('express'); var router = express.Router(); +var async = require('async'); -router.get("/", function(req, res) { +var last_pkg = require('../lib/last_pkg'); +var missing_urls = require('../lib/missing_urls'); +var update_pkg_db_day = require('../lib/update_pkg_db_day'); +var update_r_db_day = require('../lib/update_r_db_day'); + +var conString = process.env.DATABASE_URL; + +var base_url = 'http://cran-logs.rstudio.com/2015/.csv.gz'; +var r_base_url = 'http://cran-logs.rstudio.com/2015/-r.csv.gz'; + +router.get('/', function(req, res) { - console.log("PING"); res.set('Content-Type', 'application/json') .set(200) .end('{ "operation": "ping",' + ' "message": "Thanks! Live long and prosper!" }'); + + update_pkg_db(); + update_r_db(); }); +function update_pkg_db() { + missing_urls(base_url, last_pkg, 'daily', function(err, urls) { + if (err) { console.log('Error ', err); return; } + async.mapLimit(urls, 2, update_pkg_db_day, function(err, results) { + if (err) { console.log('Error ', err); return; } + console.log('Pkg update successful'); + }); + }); +} + +function update_r_db() { + missing_urls(r_base_url, last_pkg, 'dailyr', function(err, urls) { + if (err) { console.log('Error ', err); return; } + async.mapLimit(urls, 2, update_r_db_day, function(err, results) { + if (err) { console.log('Error ', err); return; } + console.log('R update successful'); + }); + }); +} + module.exports = router;