Skip to content

Commit

Permalink
Improve readStream to support more records
Browse files Browse the repository at this point in the history
Previously, the WOF importer loaded all records into memory in one
stream, and then processed and indexed the records in Elasticsearch in a
second stream after the first stream was done.

This has several problems:
* It requires that all data can fit into memory. While this is not
  _so_ bad for WOF admin data, where a reasonably new machine can handle
  things just fine, it's horrible for venue data, where there are already
  10s of millions of records.
* Its slower: by separating the disk and network I/O sections, they
  can't be interleaved to speed things up.
* It doesn't give good feedback when running the importer that something
  is happening: the importer sits for several minutes loading records
  before the dbclient progress logs start displaying

This change fixes all those issues, by processing all records in a
single stream, starting at the highest hierarchy level, and finishing at
the lowest, so that all records always have the admin data they need to
be processed.

Fixes #101
Connects #7
Connects #94
  • Loading branch information
orangejulius committed Aug 4, 2016
1 parent 4b2e1a6 commit c500aaf
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 50 deletions.
31 changes: 13 additions & 18 deletions import.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
var peliasConfig = require( 'pelias-config' ).generate();
var readStream = require('./src/readStream');
var readStreamModule = require('./src/readStream');
var importStream = require('./src/importStream');
var peliasDbclient = require( 'pelias-dbclient' );
var peliasDocGenerators = require('./src/peliasDocGenerators');
var wofRecordStream = require('./src/wofRecordStream');
var hierarchyFinder = require('./src/hierarchyFinder');
var checker = require('node-version-checker').default;

Expand Down Expand Up @@ -43,24 +42,20 @@ var types = [
'neighbourhood'
];

var wofRecords = {};
// a cache of only admin records, to be used to fill the hierarchy
// of other, lower admin records as well as venues
var wofAdminRecords = {};

readStream(directory, types, wofRecords, function() {
console.log(Object.keys(wofRecords).length + ' records loaded');
var readStream = readStreamModule.create(directory, types, wofAdminRecords);

// a stream of WOF records
var recordStream = wofRecordStream.createWofRecordsStream(wofRecords);
// how to convert WOF records to Pelias Documents
var documentGenerator = peliasDocGenerators.create(
hierarchyFinder.hierarchies_walker(wofAdminRecords));

// how to convert WOF records to Pelias Documents
var documentGenerator = peliasDocGenerators.create(
hierarchyFinder.hierarchies_walker(wofRecords));

// the final destination of Pelias Documents
var dbClientStream = peliasDbclient();

// import WOF records into ES
importStream(recordStream, documentGenerator, dbClientStream, function() {
console.log('import finished');
});
// the final destination of Pelias Documents
var dbClientStream = peliasDbclient();

// import WOF records into ES
importStream(readStream, documentGenerator, dbClientStream, function() {
console.log('import finished');
});
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
"dependencies": {
"batchflow": "^0.4.0",
"clone": "^1.0.2",
"combined-stream": "^1.0.5",
"csv-parse": "^1.0.0",
"fs-extra": "^0.30.0",
"iso3166-1": "^0.2.5",
Expand Down
90 changes: 64 additions & 26 deletions src/readStream.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
var combinedStream = require('combined-stream');
var parse = require('csv-parse');
var fs = require('fs');
var batch = require('batchflow');
var sink = require('through2-sink');
var through2 = require('through2');

var logger = require( 'pelias-logger' ).get( 'whosonfirst' );

var isValidId = require('./components/isValidId');
var fileIsReadable = require('./components/fileIsReadable');
Expand All @@ -12,31 +14,67 @@ var extractFields = require('./components/extractFields');
var recordHasName = require('./components/recordHasName');

/*
This function finds all the `latest` files in `meta/`, CSV parses them,
extracts the required fields, and assigns to a big collection
*/
function readData(directory, types, wofRecords, callback) {
batch(types).parallel(2).each(function(idx, type, done) {
fs.createReadStream(directory + 'meta/wof-' + type + '-latest.csv')
.pipe(parse({ delimiter: ',', columns: true }))
.pipe(isValidId.create())
.pipe(fileIsReadable.create(directory + 'data/'))
.pipe(loadJSON.create(directory + 'data/'))
.pipe(recordHasIdAndProperties.create())
.pipe(isActiveRecord.create())
.pipe(extractFields.create())
.pipe(recordHasName.create())
.pipe(sink.obj(function(wofRecord) {
wofRecords[wofRecord.id] = wofRecord;
}))
.on('finish', done);

}).error(function(err) {
console.error(err);
}).end(function() {
callback();
* Convert a base directory and list of types into a list of meta file paths
*/
function getMetaFilePaths(directory, types) {
return types.map(function(type) {
return directory + 'meta/wof-' + type + '-latest.csv';
});
}

/*
* Given the path to a meta CSV file, return a stream of the individual records
* within that CSV file.
*/
function createOneMetaRecordStream(metaFilePath) {
return fs.createReadStream(metaFilePath)
.pipe(parse({ delimiter: ',', columns: true }));
}

/*
* given a list of meta file paths, create a combined stream that reads all the
* records via the csv parser
*/
function createMetaRecordStream(metaFilePaths, types) {
var metaRecordStream = combinedStream.create();

metaFilePaths.forEach(function appendToCombinedStream(metaFilePath, idx) {
var type = types[idx];
metaRecordStream.append( function ( next ){
logger.info( 'Loading ' + type + ' records from ' + metaFilePath );
next(createOneMetaRecordStream(metaFilePath));
});
});

return metaRecordStream;
}

/*
This function creates a steram that finds all the `latest` files in `meta/`,
CSV parses them, extracts the required fields, stores only admin records for
later, and passes all records on for further processing
*/
function createReadStream(directory, types, wofAdminRecords) {
var metaFilePaths = getMetaFilePaths(directory, types);

return createMetaRecordStream(metaFilePaths, types)
.pipe(isValidId.create())
.pipe(fileIsReadable.create(directory + 'data/'))
.pipe(loadJSON.create(directory + 'data/'))
.pipe(recordHasIdAndProperties.create())
.pipe(isActiveRecord.create())
.pipe(extractFields.create())
.pipe(recordHasName.create())
.pipe(through2.obj(function(wofRecord, enc, callback) {
// store admin records in memory to traverse the heirarchy
if (wofRecord.place_type !== 'venue') {
wofAdminRecords[wofRecord.id] = wofRecord;
}

callback(null, wofRecord);
}));
}

module.exports = readData;
module.exports = {
create: createReadStream
};
14 changes: 8 additions & 6 deletions test/readStreamTest.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
var tape = require('tape');
var fs = require('fs-extra');
var sink = require('through2-sink');

var readStream = require('../src/readStream');

Expand All @@ -8,7 +9,7 @@ tape('readStream', function(test) {
this test is not terribly attractive, i'm not happy with it but setup wasn't
all that painful.
*/
test.test('readStream should return from all requested types and populate wofRecords', function(t) {
test.test('readStream should return from all requested types and populate wofAdminRecords', function(t) {
function setupTestEnvironment() {
// remove tmp directory if for some reason it's been hanging around from a previous run
fs.removeSync('tmp');
Expand Down Expand Up @@ -75,12 +76,13 @@ tape('readStream', function(test) {

setupTestEnvironment();

var wofRecords = {};
var wofAdminRecords = {};
var stream = readStream.create('./tmp/', ['type1', 'type2'], wofAdminRecords);

readStream('./tmp/', ['type1', 'type2'], wofRecords, function() {
t.equals(Object.keys(wofRecords).length, 2, 'there should be 2 records loaded');
stream.pipe(sink.obj(function() {})).on('finish', function() {
t.equals(Object.keys(wofAdminRecords).length, 2, 'there should be 2 records loaded');

t.deepEqual(wofRecords[1234567], {
t.deepEqual(wofAdminRecords[1234567], {
id: 1234567,
name: 'name 1',
place_type: 'place type 1',
Expand All @@ -94,7 +96,7 @@ tape('readStream', function(test) {
popularity: 87654
}, 'id 1234567 should have been loaded');

t.deepEqual(wofRecords[12345678], {
t.deepEqual(wofAdminRecords[12345678], {
id: 12345678,
name: 'name 2',
place_type: 'place type 2',
Expand Down

0 comments on commit c500aaf

Please sign in to comment.