diff --git a/import.js b/import.js index e9189003..578eda03 100644 --- a/import.js +++ b/import.js @@ -1,9 +1,8 @@ var peliasConfig = require( 'pelias-config' ).generate(); -var readStream = require('./src/readStream'); +var readStreamModule = require('./src/readStream'); var importStream = require('./src/importStream'); var peliasDbclient = require( 'pelias-dbclient' ); var peliasDocGenerators = require('./src/peliasDocGenerators'); -var wofRecordStream = require('./src/wofRecordStream'); var hierarchyFinder = require('./src/hierarchyFinder'); var checker = require('node-version-checker').default; @@ -43,24 +42,20 @@ var types = [ 'neighbourhood' ]; -var wofRecords = {}; +// a cache of only admin records, to be used to fill the hierarchy +// of other, lower admin records as well as venues +var wofAdminRecords = {}; -readStream(directory, types, wofRecords, function() { - console.log(Object.keys(wofRecords).length + ' records loaded'); +var readStream = readStreamModule.create(directory, types, wofAdminRecords); - // a stream of WOF records - var recordStream = wofRecordStream.createWofRecordsStream(wofRecords); +// how to convert WOF records to Pelias Documents +var documentGenerator = peliasDocGenerators.create( + hierarchyFinder.hierarchies_walker(wofAdminRecords)); - // how to convert WOF records to Pelias Documents - var documentGenerator = peliasDocGenerators.create( - hierarchyFinder.hierarchies_walker(wofRecords)); - - // the final destination of Pelias Documents - var dbClientStream = peliasDbclient(); - - // import WOF records into ES - importStream(recordStream, documentGenerator, dbClientStream, function() { - console.log('import finished'); - }); +// the final destination of Pelias Documents +var dbClientStream = peliasDbclient(); +// import WOF records into ES +importStream(readStream, documentGenerator, dbClientStream, function() { + console.log('import finished'); }); diff --git a/package.json b/package.json index b3fa5cad..0e721acd 100644 --- a/package.json +++ b/package.json @@ -28,6 +28,7 @@ "dependencies": { "batchflow": "^0.4.0", "clone": "^1.0.2", + "combined-stream": "^1.0.5", "csv-parse": "^1.0.0", "fs-extra": "^0.30.0", "iso3166-1": "^0.2.5", diff --git a/src/readStream.js b/src/readStream.js index ab70a4de..28749837 100644 --- a/src/readStream.js +++ b/src/readStream.js @@ -1,7 +1,9 @@ +var combinedStream = require('combined-stream'); var parse = require('csv-parse'); var fs = require('fs'); -var batch = require('batchflow'); -var sink = require('through2-sink'); +var through2 = require('through2'); + +var logger = require( 'pelias-logger' ).get( 'whosonfirst' ); var isValidId = require('./components/isValidId'); var fileIsReadable = require('./components/fileIsReadable'); @@ -12,31 +14,67 @@ var extractFields = require('./components/extractFields'); var recordHasName = require('./components/recordHasName'); /* - This function finds all the `latest` files in `meta/`, CSV parses them, - extracts the required fields, and assigns to a big collection -*/ -function readData(directory, types, wofRecords, callback) { - batch(types).parallel(2).each(function(idx, type, done) { - fs.createReadStream(directory + 'meta/wof-' + type + '-latest.csv') - .pipe(parse({ delimiter: ',', columns: true })) - .pipe(isValidId.create()) - .pipe(fileIsReadable.create(directory + 'data/')) - .pipe(loadJSON.create(directory + 'data/')) - .pipe(recordHasIdAndProperties.create()) - .pipe(isActiveRecord.create()) - .pipe(extractFields.create()) - .pipe(recordHasName.create()) - .pipe(sink.obj(function(wofRecord) { - wofRecords[wofRecord.id] = wofRecord; - })) - .on('finish', done); - - }).error(function(err) { - console.error(err); - }).end(function() { - callback(); + * Convert a base directory and list of types into a list of meta file paths + */ +function getMetaFilePaths(directory, types) { + return types.map(function(type) { + return directory + 'meta/wof-' + type + '-latest.csv'; + }); +} + +/* + * Given the path to a meta CSV file, return a stream of the individual records + * within that CSV file. + */ +function createOneMetaRecordStream(metaFilePath) { + return fs.createReadStream(metaFilePath) + .pipe(parse({ delimiter: ',', columns: true })); +} + +/* + * given a list of meta file paths, create a combined stream that reads all the + * records via the csv parser + */ +function createMetaRecordStream(metaFilePaths, types) { + var metaRecordStream = combinedStream.create(); + + metaFilePaths.forEach(function appendToCombinedStream(metaFilePath, idx) { + var type = types[idx]; + metaRecordStream.append( function ( next ){ + logger.info( 'Loading ' + type + ' records from ' + metaFilePath ); + next(createOneMetaRecordStream(metaFilePath)); + }); }); + return metaRecordStream; +} + +/* + This function creates a steram that finds all the `latest` files in `meta/`, + CSV parses them, extracts the required fields, stores only admin records for + later, and passes all records on for further processing +*/ +function createReadStream(directory, types, wofAdminRecords) { + var metaFilePaths = getMetaFilePaths(directory, types); + + return createMetaRecordStream(metaFilePaths, types) + .pipe(isValidId.create()) + .pipe(fileIsReadable.create(directory + 'data/')) + .pipe(loadJSON.create(directory + 'data/')) + .pipe(recordHasIdAndProperties.create()) + .pipe(isActiveRecord.create()) + .pipe(extractFields.create()) + .pipe(recordHasName.create()) + .pipe(through2.obj(function(wofRecord, enc, callback) { + // store admin records in memory to traverse the heirarchy + if (wofRecord.place_type !== 'venue') { + wofAdminRecords[wofRecord.id] = wofRecord; + } + + callback(null, wofRecord); + })); } -module.exports = readData; +module.exports = { + create: createReadStream +}; diff --git a/test/readStreamTest.js b/test/readStreamTest.js index cc9d9bb2..9d76854a 100644 --- a/test/readStreamTest.js +++ b/test/readStreamTest.js @@ -1,5 +1,6 @@ var tape = require('tape'); var fs = require('fs-extra'); +var sink = require('through2-sink'); var readStream = require('../src/readStream'); @@ -8,7 +9,7 @@ tape('readStream', function(test) { this test is not terribly attractive, i'm not happy with it but setup wasn't all that painful. */ - test.test('readStream should return from all requested types and populate wofRecords', function(t) { + test.test('readStream should return from all requested types and populate wofAdminRecords', function(t) { function setupTestEnvironment() { // remove tmp directory if for some reason it's been hanging around from a previous run fs.removeSync('tmp'); @@ -75,12 +76,13 @@ tape('readStream', function(test) { setupTestEnvironment(); - var wofRecords = {}; + var wofAdminRecords = {}; + var stream = readStream.create('./tmp/', ['type1', 'type2'], wofAdminRecords); - readStream('./tmp/', ['type1', 'type2'], wofRecords, function() { - t.equals(Object.keys(wofRecords).length, 2, 'there should be 2 records loaded'); + stream.pipe(sink.obj(function() {})).on('finish', function() { + t.equals(Object.keys(wofAdminRecords).length, 2, 'there should be 2 records loaded'); - t.deepEqual(wofRecords[1234567], { + t.deepEqual(wofAdminRecords[1234567], { id: 1234567, name: 'name 1', place_type: 'place type 1', @@ -94,7 +96,7 @@ tape('readStream', function(test) { popularity: 87654 }, 'id 1234567 should have been loaded'); - t.deepEqual(wofRecords[12345678], { + t.deepEqual(wofAdminRecords[12345678], { id: 12345678, name: 'name 2', place_type: 'place type 2',