Skip to content

Commit

Permalink
Update to scramjet version supporting pipeline method, add error hand…
Browse files Browse the repository at this point in the history
…ling in pipeline in adapters so that parsing and fetch errors are handled gracefully.
  • Loading branch information
MichalCz committed Dec 10, 2018
1 parent 3c471a1 commit bff0d5e
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 31 deletions.
21 changes: 10 additions & 11 deletions adapters/caaqm.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,18 @@ export async function fetchStream (source) {
body: Buffer.from('{"region":"landing_dashboard"}').toString('base64')
});

return DataStream
.from(
() => {
const response = request(options);
const parser = JSONStream.parse('map.station_list.*');
const output = new DataStream();

parser.on('error', (e) => output.raise(e));
response.on('error', (e) => output.raise(e));
const requestObject = request(options);

return response.pipe(parser).pipe(output);
}
return DataStream
.pipeline(
requestObject,
JSONStream.parse('map.station_list.*')
)
.catch(e => {
requestObject.abort();
e.stream.end();
throw e;
})
.setOptions({maxParallel: 5})
.into(
(siteStream, site) => {
Expand Down
15 changes: 11 additions & 4 deletions adapters/chinaaqidata.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,17 @@ exports.fetchStream = function (source) {
Key: 'airnow.json'
};

return s3.getObject(params)
.createReadStream()
.pipe(JSONStream.parse('features.*'))
.pipe(new DataStream())
const s3stream = s3.getObject(params).createReadStream();
return DataStream
.pipeline(
s3stream,
JSONStream.parse('features.*')
)
.catch(e => {
s3stream.abort();
e.stream.end();
throw e;
})
.use(stream => {
stream.name = 'unused';
return stream;
Expand Down
14 changes: 11 additions & 3 deletions adapters/eea-direct.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,17 @@ export async function fetchData (source, cb) {
let _battuta = null;
function getBattutaStream () {
if (!_battuta) {
_battuta = request({url: stationsLink})
.pipe(JSONStream.parse('*'))
.pipe(new DataStream())
const requestObject = request({url: stationsLink});
_battuta = DataStream
.pipeline(
requestObject,
JSONStream.parse('*')
)
.catch(e => {
requestObject.abort();
e.stream.end();
throw e;
})
.keep(Infinity);
}

Expand Down
11 changes: 10 additions & 1 deletion adapters/gios-poland.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,17 @@ export function fetchStream (source) {
} = source;

const stationUrl = `${url}/station/findAll`;
const requestObject = request.get(stationUrl);
return DataStream
.from(() => request.get(stationUrl).pipe(JSONStream('*')))
.pipeline(
requestObject,
JSONStream('*')
)
.catch(e => {
requestObject.abort();
e.stream.end();
throw e;
})
.map(
({
id,
Expand Down
22 changes: 11 additions & 11 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
"request-promise-native": "^1.0.5",
"require-dir": "^1.0.0",
"s3-upload-stream": "^1.0.7",
"scramjet": "^4.18.17",
"scramjet": "^4.19.0",
"ssl-root-cas": "^1.2.5",
"transliteration": "^1.6.6",
"tz-lookup": "^6.1.8",
Expand Down

0 comments on commit bff0d5e

Please sign in to comment.