Permalink
Browse files

Handle ping, update DB from web app

  • Loading branch information...
1 parent 1ffbc6b commit aba300fa191937d352adb51866a9a7298ffd01d3 @gaborcsardi gaborcsardi committed Jul 23, 2015
Showing with 309 additions and 12 deletions.
  1. +5 −4 db/create.sh
  2. +46 −0 lib/append_csv_to_db.js
  3. +41 −0 lib/clean_pkg_csv.js
  4. +39 −0 lib/clean_r_csv.js
  5. +27 −0 lib/download_csv.js
  6. +29 −0 lib/last_pkg.js
  7. +16 −0 lib/missing_days.js
  8. +15 −0 lib/missing_urls.js
  9. +23 −0 lib/update_pkg_db_day.js
  10. +23 −0 lib/update_r_db_day.js
  11. +10 −6 package.json
  12. +35 −2 routes/ping.js
View
@@ -1,4 +1,6 @@
+APP=cranlogs
+
read -r -d '' CREATE_DB <<'EOF'
CREATE TABLE daily (
day DATE,
@@ -10,7 +12,7 @@ CREATE INDEX idx_daily_day ON daily(day);
CREATE INDEX idx_daily_package ON daily(package);
EOF
-echo "$CREATE_DB" | dokku psql:restore_sql cranlogs
+echo "$CREATE_DB" | dokku psql:restore_sql $APP
read -r -d '' ADD_VIEWS <<'EOF'
CREATE MATERIALIZED VIEW top_day AS
@@ -72,7 +74,7 @@ CREATE TRIGGER trig_refresh_views AFTER TRUNCATE OR INSERT OR UPDATE OR DELETE
EOF
-echo "$ADD_VIEWS" | dokku psql:restore_sql cranlogs
+echo "$ADD_VIEWS" | dokku psql:restore_sql $APP
read -r -d '' CREATE_DB_R <<'EOF'
CREATE TABLE dailyr (
@@ -84,7 +86,6 @@ CREATE TABLE dailyr (
CREATE INDEX idx_dailyr_day ON dailyr(day);
CREATE INDEX idx_dailyr_day_version ON dailyr(day, version);
CREATE INDEX idx_dailyr_day_os ON dailyr(day, os);
-ALTER TABLE dailyr OWNER TO cranlogs;
EOF
-echo "$CREATE_DB_R" | dokku psql:restore_sql cranlogs
+echo "$CREATE_DB_R" | dokku psql:restore_sql $APP
View
@@ -0,0 +1,46 @@
+
+var stream = require('stream');
+var pg = require('pg');
+var copy_from = require('pg-copy-streams').from;
+
+var conString = process.env.DATABASE_URL;
+
+function append_csv_to_db(csv, table, fields, callback) {
+ pg.connect(conString, function(err, client, done) {
+ if (err) { done(); callback(err); return; }
+ var conString = process.env.DATABASE_URL;
+ var ps = client.query(copy_from('COPY ' + table + ' FROM STDIN'));
+ var rs = new stream.Readable;
+ var we_done = false;
+ rs.pipe(ps)
+ .on('finish',
+ function() {
+ if (!we_done) {
+ we_done = true;
+ done();
+ callback(null, true);
+ }
+ })
+ .on('error',
+ function(err) {
+ if (!we_done) {
+ we_done = true;
+ done();
+ callback(err);
+ }
+ });
+ rs.resume();
+ csv.forEach(function(x) {
+ var str = '';
+ for (f in fields) {
+ if (f != 0) { str = str + '\t'; }
+ str = str + x[ fields[f] ];
+ }
+ str = str + '\n';
+ rs.push(str);
+ });
+ rs.push(null);
+ })
+}
+
+module.exports = append_csv_to_db;
View
@@ -0,0 +1,41 @@
+
+function clean_pkg_csv(csv, callback) {
+
+ // Empty?
+ if (csv.length <= 1) { callback(null, csv); return; }
+
+ // First line is the header
+ var header = csv[0];
+ csv = csv.splice(1);
+
+ // Date, take from first record
+ var date = csv[1][ header.indexOf('date') ];
+
+ // Which field is the package?
+ var pkg_idx = header.indexOf('package');
+
+ // Take package names
+ var pkg = csv.map(function(x) { return x[pkg_idx]; });
+
+ // Count their downloads
+ var count = { };
+ pkg.forEach(function(x) {
+ if (count.hasOwnProperty(x)) {
+ count[x]++;
+ } else {
+ count[x] = 1;
+ }
+ });
+
+ // Unique package names
+ var upkg = Object.keys(count);
+
+ // Proper records
+ var recs = upkg
+ .map(function(x) {
+ return { 'day': date, 'package': x, 'count': count[x] };
+ });
+ callback(null, recs);
+}
+
+module.exports = clean_pkg_csv;
View
@@ -0,0 +1,39 @@
+
+function clean_r_csv(csv, callback) {
+
+ // Empty?
+ if (csv.length <= 1) { callback(null, csv); return; }
+
+ // First line is the header
+ var header = csv[0];
+ csv = csv.splice(1);
+
+ // Date, take from first record
+ var date = csv[1][ header.indexOf('date') ];
+
+ // Which fields are version and os?
+ var ver_idx = header.indexOf('version');
+ var os_idx = header.indexOf('os');
+
+ // Count by version and os
+ var count = { };
+ csv.forEach(function(x) {
+ var key = x[os_idx] + '@' + x[ver_idx];
+ if (count.hasOwnProperty(key)) {
+ count[key] ++;
+ } else {
+ count[key] = 1;
+ }
+ })
+
+ var recs = [ ];
+ for (k in count) {
+ var os = k.split('@')[0];
+ var ver = k.split('@')[1];
+ recs.push({ 'day': date, 'version': ver, 'os': os,
+ 'count': count[k] });
+ }
+ callback(null, recs);
+}
+
+module.exports = clean_r_csv;
View
@@ -0,0 +1,27 @@
+
+var got = require('got');
+var gunzip = require('zlib').gunzip;
+var csv_parse = require('csv-parse');
+
+function download_csv(url, callback) {
+
+ console.log('Getting ', url);
+ got(url,
+ { headers: { 'user-agent':
+ 'https://github.com/metacran/cranlogs.app'
+ },
+ encoding: null
+ },
+ function(err, data, res) {
+ if (err) { callback(err); return; }
+ gunzip(data, function(err, data) {
+ if (err) { callback(err); return; }
+ csv_parse(data, function(err, data) {
+ if (err) { callback(err); return; }
+ callback(null, data);
+ })
+ })
+ })
+}
+
+module.exports = download_csv;
View
@@ -0,0 +1,29 @@
+var pg = require('pg');
+
+var conString = process.env.DATABASE_URL;
+
+function last_pkg(table, callback) {
+
+ pg.connect(conString, function(err, client, done) {
+
+ if (err) {
+ done(client);
+ return;
+ }
+
+ var q = 'SELECT MAX(day) FROM ' + table;
+ client.query(q, function(err, result) {
+ if (err) {
+ done();
+ callback(err);
+ return;
+ }
+
+ var day = new Date(result['rows'][0]['max'] || '2012-09-30');
+ callback(null, day);
+ done();
+ })
+ })
+}
+
+module.exports = last_pkg;
View
@@ -0,0 +1,16 @@
+
+function missing_days(last_pkg, table, callback) {
+
+ last_pkg(table, function(err, day) {
+ if (err) { callback(err); return; }
+ var today = new Date();
+ var days = [];
+ while (day < today) {
+ day.setDate(day.getDate() + 1);
+ days.push(day.toISOString().slice(0, 10));
+ }
+ callback(null, days);
+ })
+}
+
+module.exports = missing_days;
View
@@ -0,0 +1,15 @@
+
+var missing_days = require('../lib/missing_days');
+
+function missing_urls(base_url, last, table, callback) {
+
+ missing_days(last, table, function(err, days) {
+ if (err) { callback(err); return; }
+ var urls = days.map(function(d) {
+ return base_url.replace('<date>', d);
+ })
+ callback(null, urls);
+ })
+}
+
+module.exports = missing_urls;
View
@@ -0,0 +1,23 @@
+
+var download_csv = require('../lib/download_csv');
+var clean_pkg_csv = require('../lib/clean_pkg_csv');
+var append_csv_to_db = require('../lib/append_csv_to_db');
+
+function update_pkg_db_day(url, callback) {
+
+ download_csv(url, function(err, csv) {
+ // If the file does not exist, that is fine
+ if (err && err.code == 404) { callback(null, false); return; }
+ if (err) { callback(err); return; }
+ clean_pkg_csv(csv, function(err, cleaned) {
+ append_csv_to_db(cleaned, 'daily',
+ [ 'day', 'package', 'count' ],
+ function(err, status) {
+ if (err) { callback(err); return; }
+ callback(null, status);
+ })
+ })
+ })
+}
+
+module.exports = update_pkg_db_day;
View
@@ -0,0 +1,23 @@
+
+var download_csv = require('../lib/download_csv');
+var clean_r_csv = require('../lib/clean_r_csv');
+var append_csv_to_db = require('../lib/append_csv_to_db');
+
+function update_r_db_day(url, callback) {
+
+ download_csv(url, function(err, csv) {
+ // If the file does not exist, that is fine
+ if (err && err.code == 404) { callback(null, false); return; }
+ if (err) { callback(err); return; }
+ clean_r_csv(csv, function(err, cleaned) {
+ append_csv_to_db(cleaned, 'dailyr',
+ [ 'day', 'version', 'os', 'count' ],
+ function(err, status) {
+ if (err) { callback(err); return; }
+ callback(null, status);
+ })
+ })
+ })
+}
+
+module.exports = update_r_db_day;
View
@@ -5,18 +5,22 @@
"scripts": {
"start": "node ./bin/www"
},
- "engines": {
- "node": "0.12.x"
+ "engines": {
+ "node": "0.12.x"
},
"dependencies": {
- "express": "~4.9.0",
+ "async": "^1.4.0",
"body-parser": "~1.8.1",
"cookie-parser": "~1.3.3",
- "morgan": "~1.3.0",
- "serve-favicon": "~2.1.3",
+ "csv-parse": "^0.1.3",
"debug": "~2.0.0",
- "pg": "~3.6.2",
+ "express": "~4.9.0",
+ "got": "~3.3.1",
+ "morgan": "~1.3.0",
"multiline": "~1.0.2",
+ "pg": "~3.6.2",
+ "pg-copy-streams": "~0.3.0",
+ "serve-favicon": "~2.1.3",
"whiskers": "~0.3.3"
}
}
View
@@ -1,13 +1,46 @@
var express = require('express');
var router = express.Router();
+var async = require('async');
-router.get("/", function(req, res) {
+var last_pkg = require('../lib/last_pkg');
+var missing_urls = require('../lib/missing_urls');
+var update_pkg_db_day = require('../lib/update_pkg_db_day');
+var update_r_db_day = require('../lib/update_r_db_day');
+
+var conString = process.env.DATABASE_URL;
+
+var base_url = 'http://cran-logs.rstudio.com/2015/<date>.csv.gz';
+var r_base_url = 'http://cran-logs.rstudio.com/2015/<date>-r.csv.gz';
+
+router.get('/', function(req, res) {
- console.log("PING");
res.set('Content-Type', 'application/json')
.set(200)
.end('{ "operation": "ping",' +
' "message": "Thanks! Live long and prosper!" }');
+
+ update_pkg_db();
+ update_r_db();
});
+function update_pkg_db() {
+ missing_urls(base_url, last_pkg, 'daily', function(err, urls) {
+ if (err) { console.log('Error ', err); return; }
+ async.mapLimit(urls, 2, update_pkg_db_day, function(err, results) {
+ if (err) { console.log('Error ', err); return; }
+ console.log('Pkg update successful');
+ });
+ });
+}
+
+function update_r_db() {
+ missing_urls(r_base_url, last_pkg, 'dailyr', function(err, urls) {
+ if (err) { console.log('Error ', err); return; }
+ async.mapLimit(urls, 2, update_r_db_day, function(err, results) {
+ if (err) { console.log('Error ', err); return; }
+ console.log('R update successful');
+ });
+ });
+}
+
module.exports = router;

0 comments on commit aba300f

Please sign in to comment.