Skip to content
Permalink
Browse files

Handle ping, update DB from web app

  • Loading branch information...
gaborcsardi committed Jul 23, 2015
1 parent 1ffbc6b commit aba300fa191937d352adb51866a9a7298ffd01d3
Showing with 309 additions and 12 deletions.
  1. +5 −4 db/create.sh
  2. +46 −0 lib/append_csv_to_db.js
  3. +41 −0 lib/clean_pkg_csv.js
  4. +39 −0 lib/clean_r_csv.js
  5. +27 −0 lib/download_csv.js
  6. +29 −0 lib/last_pkg.js
  7. +16 −0 lib/missing_days.js
  8. +15 −0 lib/missing_urls.js
  9. +23 −0 lib/update_pkg_db_day.js
  10. +23 −0 lib/update_r_db_day.js
  11. +10 −6 package.json
  12. +35 −2 routes/ping.js
@@ -1,4 +1,6 @@

APP=cranlogs

read -r -d '' CREATE_DB <<'EOF'
CREATE TABLE daily (
day DATE,
@@ -10,7 +12,7 @@ CREATE INDEX idx_daily_day ON daily(day);
CREATE INDEX idx_daily_package ON daily(package);
EOF

echo "$CREATE_DB" | dokku psql:restore_sql cranlogs
echo "$CREATE_DB" | dokku psql:restore_sql $APP

read -r -d '' ADD_VIEWS <<'EOF'
CREATE MATERIALIZED VIEW top_day AS
@@ -72,7 +74,7 @@ CREATE TRIGGER trig_refresh_views AFTER TRUNCATE OR INSERT OR UPDATE OR DELETE
EOF

echo "$ADD_VIEWS" | dokku psql:restore_sql cranlogs
echo "$ADD_VIEWS" | dokku psql:restore_sql $APP

read -r -d '' CREATE_DB_R <<'EOF'
CREATE TABLE dailyr (
@@ -84,7 +86,6 @@ CREATE TABLE dailyr (
CREATE INDEX idx_dailyr_day ON dailyr(day);
CREATE INDEX idx_dailyr_day_version ON dailyr(day, version);
CREATE INDEX idx_dailyr_day_os ON dailyr(day, os);
ALTER TABLE dailyr OWNER TO cranlogs;
EOF

echo "$CREATE_DB_R" | dokku psql:restore_sql cranlogs
echo "$CREATE_DB_R" | dokku psql:restore_sql $APP
@@ -0,0 +1,46 @@

var stream = require('stream');
var pg = require('pg');
var copy_from = require('pg-copy-streams').from;

var conString = process.env.DATABASE_URL;

function append_csv_to_db(csv, table, fields, callback) {
pg.connect(conString, function(err, client, done) {
if (err) { done(); callback(err); return; }
var conString = process.env.DATABASE_URL;
var ps = client.query(copy_from('COPY ' + table + ' FROM STDIN'));
var rs = new stream.Readable;
var we_done = false;
rs.pipe(ps)
.on('finish',
function() {
if (!we_done) {
we_done = true;
done();
callback(null, true);
}
})
.on('error',
function(err) {
if (!we_done) {
we_done = true;
done();
callback(err);
}
});
rs.resume();
csv.forEach(function(x) {
var str = '';
for (f in fields) {
if (f != 0) { str = str + '\t'; }
str = str + x[ fields[f] ];
}
str = str + '\n';
rs.push(str);
});
rs.push(null);
})
}

module.exports = append_csv_to_db;
@@ -0,0 +1,41 @@

function clean_pkg_csv(csv, callback) {

// Empty?
if (csv.length <= 1) { callback(null, csv); return; }

// First line is the header
var header = csv[0];
csv = csv.splice(1);

// Date, take from first record
var date = csv[1][ header.indexOf('date') ];

// Which field is the package?
var pkg_idx = header.indexOf('package');

// Take package names
var pkg = csv.map(function(x) { return x[pkg_idx]; });

// Count their downloads
var count = { };
pkg.forEach(function(x) {
if (count.hasOwnProperty(x)) {
count[x]++;
} else {
count[x] = 1;
}
});

// Unique package names
var upkg = Object.keys(count);

// Proper records
var recs = upkg
.map(function(x) {
return { 'day': date, 'package': x, 'count': count[x] };
});
callback(null, recs);
}

module.exports = clean_pkg_csv;
@@ -0,0 +1,39 @@

function clean_r_csv(csv, callback) {

// Empty?
if (csv.length <= 1) { callback(null, csv); return; }

// First line is the header
var header = csv[0];
csv = csv.splice(1);

// Date, take from first record
var date = csv[1][ header.indexOf('date') ];

// Which fields are version and os?
var ver_idx = header.indexOf('version');
var os_idx = header.indexOf('os');

// Count by version and os
var count = { };
csv.forEach(function(x) {
var key = x[os_idx] + '@' + x[ver_idx];
if (count.hasOwnProperty(key)) {
count[key] ++;
} else {
count[key] = 1;
}
})

var recs = [ ];
for (k in count) {
var os = k.split('@')[0];
var ver = k.split('@')[1];
recs.push({ 'day': date, 'version': ver, 'os': os,
'count': count[k] });
}
callback(null, recs);
}

module.exports = clean_r_csv;
@@ -0,0 +1,27 @@

var got = require('got');
var gunzip = require('zlib').gunzip;
var csv_parse = require('csv-parse');

function download_csv(url, callback) {

console.log('Getting ', url);
got(url,
{ headers: { 'user-agent':
'https://github.com/metacran/cranlogs.app'
},
encoding: null
},
function(err, data, res) {
if (err) { callback(err); return; }
gunzip(data, function(err, data) {
if (err) { callback(err); return; }
csv_parse(data, function(err, data) {
if (err) { callback(err); return; }
callback(null, data);
})
})
})
}

module.exports = download_csv;
@@ -0,0 +1,29 @@
var pg = require('pg');

var conString = process.env.DATABASE_URL;

function last_pkg(table, callback) {

pg.connect(conString, function(err, client, done) {

if (err) {
done(client);
return;
}

var q = 'SELECT MAX(day) FROM ' + table;
client.query(q, function(err, result) {
if (err) {
done();
callback(err);
return;
}

var day = new Date(result['rows'][0]['max'] || '2012-09-30');
callback(null, day);
done();
})
})
}

module.exports = last_pkg;
@@ -0,0 +1,16 @@

function missing_days(last_pkg, table, callback) {

last_pkg(table, function(err, day) {
if (err) { callback(err); return; }
var today = new Date();
var days = [];
while (day < today) {
day.setDate(day.getDate() + 1);
days.push(day.toISOString().slice(0, 10));
}
callback(null, days);
})
}

module.exports = missing_days;
@@ -0,0 +1,15 @@

var missing_days = require('../lib/missing_days');

function missing_urls(base_url, last, table, callback) {

missing_days(last, table, function(err, days) {
if (err) { callback(err); return; }
var urls = days.map(function(d) {
return base_url.replace('<date>', d);
})
callback(null, urls);
})
}

module.exports = missing_urls;
@@ -0,0 +1,23 @@

var download_csv = require('../lib/download_csv');
var clean_pkg_csv = require('../lib/clean_pkg_csv');
var append_csv_to_db = require('../lib/append_csv_to_db');

function update_pkg_db_day(url, callback) {

download_csv(url, function(err, csv) {
// If the file does not exist, that is fine
if (err && err.code == 404) { callback(null, false); return; }
if (err) { callback(err); return; }
clean_pkg_csv(csv, function(err, cleaned) {
append_csv_to_db(cleaned, 'daily',
[ 'day', 'package', 'count' ],
function(err, status) {
if (err) { callback(err); return; }
callback(null, status);
})
})
})
}

module.exports = update_pkg_db_day;
@@ -0,0 +1,23 @@

var download_csv = require('../lib/download_csv');
var clean_r_csv = require('../lib/clean_r_csv');
var append_csv_to_db = require('../lib/append_csv_to_db');

function update_r_db_day(url, callback) {

download_csv(url, function(err, csv) {
// If the file does not exist, that is fine
if (err && err.code == 404) { callback(null, false); return; }
if (err) { callback(err); return; }
clean_r_csv(csv, function(err, cleaned) {
append_csv_to_db(cleaned, 'dailyr',
[ 'day', 'version', 'os', 'count' ],
function(err, status) {
if (err) { callback(err); return; }
callback(null, status);
})
})
})
}

module.exports = update_r_db_day;
@@ -5,18 +5,22 @@
"scripts": {
"start": "node ./bin/www"
},
"engines": {
"node": "0.12.x"
"engines": {
"node": "0.12.x"
},
"dependencies": {
"express": "~4.9.0",
"async": "^1.4.0",
"body-parser": "~1.8.1",
"cookie-parser": "~1.3.3",
"morgan": "~1.3.0",
"serve-favicon": "~2.1.3",
"csv-parse": "^0.1.3",
"debug": "~2.0.0",
"pg": "~3.6.2",
"express": "~4.9.0",
"got": "~3.3.1",
"morgan": "~1.3.0",
"multiline": "~1.0.2",
"pg": "~3.6.2",
"pg-copy-streams": "~0.3.0",
"serve-favicon": "~2.1.3",
"whiskers": "~0.3.3"
}
}
@@ -1,13 +1,46 @@
var express = require('express');
var router = express.Router();
var async = require('async');

router.get("/", function(req, res) {
var last_pkg = require('../lib/last_pkg');
var missing_urls = require('../lib/missing_urls');
var update_pkg_db_day = require('../lib/update_pkg_db_day');
var update_r_db_day = require('../lib/update_r_db_day');

var conString = process.env.DATABASE_URL;

var base_url = 'http://cran-logs.rstudio.com/2015/<date>.csv.gz';
var r_base_url = 'http://cran-logs.rstudio.com/2015/<date>-r.csv.gz';

router.get('/', function(req, res) {

console.log("PING");
res.set('Content-Type', 'application/json')
.set(200)
.end('{ "operation": "ping",' +
' "message": "Thanks! Live long and prosper!" }');

update_pkg_db();
update_r_db();
});

function update_pkg_db() {
missing_urls(base_url, last_pkg, 'daily', function(err, urls) {
if (err) { console.log('Error ', err); return; }
async.mapLimit(urls, 2, update_pkg_db_day, function(err, results) {
if (err) { console.log('Error ', err); return; }
console.log('Pkg update successful');
});
});
}

function update_r_db() {
missing_urls(r_base_url, last_pkg, 'dailyr', function(err, urls) {
if (err) { console.log('Error ', err); return; }
async.mapLimit(urls, 2, update_r_db_day, function(err, results) {
if (err) { console.log('Error ', err); return; }
console.log('R update successful');
});
});
}

module.exports = router;

0 comments on commit aba300f

Please sign in to comment.
You can’t perform that action at this time.