Skip to content

Commit

Permalink
initial incorporation of pip service
Browse files Browse the repository at this point in the history
  • Loading branch information
trescube committed Feb 3, 2017
1 parent 0ac617e commit fb86541
Show file tree
Hide file tree
Showing 24 changed files with 1,404 additions and 410 deletions.
4 changes: 2 additions & 2 deletions .jshintrc
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"latedef": false,
"newcap": true,
"noarg": true,
"noempty": true,
"noempty": false,
"nonbsp": true,
"nonew": true,
"plusplus": false,
Expand All @@ -19,4 +19,4 @@
"maxparams": 4,
"maxdepth": 4,
"maxlen": 140
}
}
14 changes: 8 additions & 6 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ const peliasConfig = require('pelias-config').generate();
require('./src/configValidation').validate(peliasConfig);

const _ = require('lodash');

const maxConcurrentReqs = _.get(peliasConfig, 'imports.adminLookup.maxConcurrentReqs', 1);
const datapath = peliasConfig.imports.whosonfirst.datapath;
const os = require('os');

module.exports = {
createLookupStream: require('./src/lookupStream')(maxConcurrentReqs),
createWofPipResolver: require('./src/httpPipResolver')(maxConcurrentReqs),
createLocalWofPipResolver: require('./src/localPipResolver')(datapath)
createLookupStream: function () {
const datapath = peliasConfig.imports.whosonfirst.datapath;
const resolver = require('./src/localPipResolver')(datapath);
const maxConcurrentReqs = _.get(peliasConfig, 'imports.adminLookup.maxConcurrentReqs', 1);

return require('./src/lookupStream')(resolver, maxConcurrentReqs);
}
};
15 changes: 11 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,20 @@
"who's on first"
],
"dependencies": {
"async": "^2.1.4",
"csv-parse": "^1.2.0",
"iso3166-1": "^0.2.8",
"joi": "^10.1.0",
"lodash": "^4.6.0",
"pelias-config": "2.4.0",
"pelias-logger": "0.1.0",
"pelias-parallel-stream": "0.0.2",
"pelias-wof-pip-service": "1.15.0",
"request": "^2.67.0",
"through2": "^2.0.0"
"simplify-js": "^1.2.1",
"suspend": "^0.7.0",
"through2": "^2.0.0",
"through2-filter": "^2.0.0",
"through2-map": "^3.0.0",
"through2-sink": "^1.0.0"
},
"devDependencies": {
"event-stream": "^3.3.2",
Expand All @@ -39,7 +45,8 @@
"proxyquire": "^1.7.10",
"semantic-release": "^6.3.2",
"tap-dot": "^1.0.1",
"tape": "^4.2.2"
"tape": "^4.2.2",
"temp": "^0.8.3"
},
"pre-commit": [
"lint",
Expand Down
79 changes: 0 additions & 79 deletions src/httpPipResolver.js

This file was deleted.

23 changes: 8 additions & 15 deletions src/localPipResolver.js
Original file line number Diff line number Diff line change
@@ -1,24 +1,20 @@
'use strict';

var logger = require('pelias-logger').get('wof-admin-lookup');
var createPIPService = require('pelias-wof-pip-service').create;
var createPIPService = require('./pip/index').create;

/**
* LocalPIPService class
*
* @param {object} [lookupService] optional, primarily used for testing
* @constructor
*/
function LocalPIPService(lookupService, datapath) {

this.lookupService = lookupService || null;
function LocalPIPService(datapath) {
var self = this;
createPIPService(datapath, function (err, service) {
self.lookupService = service;
});

if (!this.lookupService) {
var self = this;
createPIPService(datapath, function (err, service) {
self.lookupService = service;
});
}
}

/**
Expand All @@ -28,7 +24,6 @@ function LocalPIPService(lookupService, datapath) {
* @param callback
*/
LocalPIPService.prototype.lookup = function lookup(centroid, callback, search_layers) {

var self = this;

// in the case that the lookup service hasn't loaded yet, sleep and come back in 5 seconds
Expand Down Expand Up @@ -69,7 +64,7 @@ LocalPIPService.prototype.lookup = function lookup(centroid, callback, search_la
*/
LocalPIPService.prototype.end = function end() {
if (this.lookupService) {
logger.debug('Shutting down admin lookup service');
logger.info('Shutting down admin lookup service');
this.lookupService.end();
}
};
Expand All @@ -82,7 +77,5 @@ LocalPIPService.prototype.end = function end() {
* @returns {LocalPIPService}
*/
module.exports = function(datapath) {
return function(service) {
return new LocalPIPService(service, datapath);
};
return new LocalPIPService(datapath);
};
32 changes: 15 additions & 17 deletions src/lookupStream.js
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,9 @@ function hasAnyMultiples(result) {
});
}

function createLookupStream(resolver, maxConcurrentReqs) {
if (!resolver) {
throw new Error('createLookupStream requires a valid resolver to be passed in as the first parameter');
}

var stream = parallelStream(maxConcurrentReqs, function (doc, enc, callback) {
function createResolverStream(resolver) {
return function (doc, enc, callback) {
// console.error(`doc queued`);
// don't do anything if there's no centroid
if (_.isEmpty(doc.getCentroid())) {
return callback(null, doc);
Expand Down Expand Up @@ -127,14 +124,7 @@ function createLookupStream(resolver, maxConcurrentReqs) {

callback(null, doc);
}, getAdminLayers(doc.getLayer()));
},
function end() {
if (typeof resolver.end === 'function') {
resolver.end();
}
});

return stream;
};
}

function getCountryCode(result) {
Expand All @@ -144,8 +134,16 @@ function getCountryCode(result) {
return undefined;
}

module.exports = function(maxConcurrentReqs) {
return function(resolver) {
return createLookupStream(resolver, maxConcurrentReqs || 1);
module.exports = function(resolver, maxConcurrentReqs) {
if (!resolver) {
throw new Error('createLookupStream requires a valid resolver to be passed in as the first parameter');
}

const resolverStream = createResolverStream(resolver);
const end = (resolver) => {
return () => { resolver.end(); };
};

return parallelStream(maxConcurrentReqs, resolverStream, end(resolver));

};
Loading

0 comments on commit fb86541

Please sign in to comment.