Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Merge pull request #2 from scottlabs/master
Add tests, support for new format, support for local files
  • Loading branch information
tmrudick committed Mar 25, 2015
2 parents 5541eaf + 7f36988 commit 8936960
Show file tree
Hide file tree
Showing 12 changed files with 424,548 additions and 23 deletions.
13 changes: 13 additions & 0 deletions README.md
Expand Up @@ -47,6 +47,19 @@ You can also use this module with a callback if you aren't into streams.
console.log(data);
});

You can also specify a local file.

turnstiles('/path/to/file/turnstile_131214.txt', function(data) {
console.log(data);
});

You can specify an optional list of options like so:

turnstiles('/path/to/file/turnstile_131214.txt', function(data) {
console.log(data);
}, { header: false });

The only option at this point is 'header', which if set to false will output data without header.
Issues
------

Expand Down
2 changes: 1 addition & 1 deletion bin/mta-turnstiles
Expand Up @@ -19,7 +19,7 @@ if (program.args.length !== 1) {
var outputStream = program.file ? fs.createWriteStream(program.file) : process.stdout,
dataUrl = program.args[0];

if (url.parse(dataUrl).protocol != 'http:') {
if ( !fs.existsSync(dataUrl) && url.parse(dataUrl).protocol != 'http:') {
throw new Error('Invalid file URL');
}

Expand Down
66 changes: 53 additions & 13 deletions lib/streams/turnstile.js
Expand Up @@ -2,25 +2,60 @@ var util = require('util');
var Transform = require('stream').Transform;
util.inherits(TurnstileStream, Transform);

var HEADER = 'C/A,UNIT,SCP,STATION,LINENAME,DIVISION,DATE,TIME,DESC,ENTRIES,EXITS';
function parseLine(line, callback) {
var line = line.split(',');

var format = getFormat(line);

var ca = line.shift(),
remote = line.shift(),
scp = line.shift();

while (line.length > 0) {
callback({
remote: remote,
date: line.shift(),
time: line.shift(),
description: line.shift(),
entries: line.shift(),
exits: line.shift().trim()
});
remote = line.shift(),
scp = line.shift();

if ( format === 'v1' ) {
while (line.length > 0) {
callback({
remote: remote,
station: '',
linename: '',
division: '',
date: line.shift(),
time: line.shift(),
description: line.shift(),
entries: line.shift(),
exits: line.shift().trim()
});
}
} else if ( format === 'v2' ) {
// check that this is not a header
if ( line.join(',').indexOf('STATION,LINENAME,DIVISION') === -1 ) {
while (line.length > 0) {

callback({
remote: remote,
station: line.shift(),
linename: line.shift(),
division: line.shift(),
date: line.shift(),
time: line.shift(),
description: line.shift(),
entries: line.shift(),
exits: line.shift().trim()
});
}
}

}
}

function getFormat(row) {
if ( row.length === 43 ) {
return 'v1';
} else if ( row.length === 11 ) {
return 'v2';
}
};

function TurnstileStream() {
if (!(this instanceof TurnstileStream)) {
return new TurnstileStream();
Expand All @@ -33,7 +68,7 @@ module.exports = TurnstileStream;

TurnstileStream.prototype._transform = function(chunk, encoding, done) {
var self = this,
data = chunk.toString();
data = chunk.toString();

// Add any half completed bytes
if (this._buffer) {
Expand All @@ -49,6 +84,7 @@ TurnstileStream.prototype._transform = function(chunk, encoding, done) {
parseLine(turnstile, self.push.bind(self));
});


done();
};

Expand All @@ -60,3 +96,7 @@ TurnstileStream.prototype._flush = function(done) {
this._buffer = null;
done();
};

// for testing
TurnstileStream.prototype.parseLine = parseLine;
TurnstileStream.prototype.getFormat = getFormat;
5 changes: 5 additions & 0 deletions lib/stripHeader.js
@@ -0,0 +1,5 @@
module.exports = function(content) {
content = content.split('\n');
content.shift();
return content.join('\n');
};
32 changes: 26 additions & 6 deletions lib/turnstiles.js
@@ -1,19 +1,39 @@
var TurnstileStream = require('./streams/turnstile'),
StationStream = require('./streams/station'),
CsvStream = require('./streams/csv'),
request = require('request');
stripHeader = require('./stripHeader'),
request = require('request'),
fs = require('fs');

var fs = require('fs');
module.exports = function(url, callback) {
var stream = request(url)
module.exports = function(path, callback, options) {
if ( ! options ) { options = {}; }
var stream;
if ( fs.existsSync(path) ) {
stream = fs.createReadStream(path)
.pipe(TurnstileStream())
.pipe(StationStream())
.pipe(CsvStream());
} else {
stream = request.get(path)
.pipe(TurnstileStream())
.pipe(StationStream())
.pipe(CsvStream());
}

if (callback) {
var content = '';
stream.on('data', function(data) { content += data; console.log('content'); });
stream.on('end', function() { console.log('end'); callback(content); });
stream.on('data', function(data) {
content += data;
});
stream.on('end', function() {
if ( options.header === false ) {
content = stripHeader(content);
}
callback(content);
});
stream.on('error', function(err) {
console.error('error', err);
});
}

return stream;
Expand Down
10 changes: 7 additions & 3 deletions package.json
@@ -1,10 +1,13 @@
{
"name": "mta-turnstiles",
"version": "1.0.1",
"version": "1.0.3",
"description": "Downloads and parses the New York City MTA subway turnstile data",
"main": "lib/turnstiles.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
"test": "mocha test/test.js"
},
"bin": {
"mta-turnstiles": "bin/mta-turnstiles"
},
"bin": {
"mta-turnstiles": "bin/mta-turnstiles"
Expand All @@ -27,6 +30,7 @@
"request": "~2.27.0"
},
"devDependencies": {
"mocha": "~1.14.0"
"chai": "^1.10.0",
"mocha": "^2.1.0"
}
}

0 comments on commit 8936960

Please sign in to comment.