/
eea-direct.js
137 lines (123 loc) · 4.49 KB
/
eea-direct.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import { acceptableParameters } from '../lib/utils';
import { REQUEST_TIMEOUT } from '../lib/constants';
import { default as baseRequest } from 'request';
import { default as moment } from 'moment-timezone';
import tzlookup from 'tz-lookup';
import { MultiStream, DataStream, StringStream } from 'scramjet';
import { default as JSONStream } from 'JSONStream';
import log from '../lib/logger';
const request = baseRequest.defaults({timeout: REQUEST_TIMEOUT});
const stationsLink = 'http://battuta.s3.amazonaws.com/eea-stations-all.json';
export const name = 'eea-direct';
export function fetchStream (source) {
const out = new DataStream();
out.name = 'unused';
log.debug('Fetch stream called');
fetchMetadata(source)
.then((stations) => fetchPollutants(source, stations))
.then(stream => stream.pipe(out))
;
return out;
}
export async function fetchData (source, cb) {
try {
const stream = await fetchStream(source);
const measurements = await stream.toArray();
cb(null, {name: stream.name, measurements});
} catch (e) {
cb(e);
}
}
let _battuta = null;
function getBattutaStream () {
if (!_battuta) {
const requestObject = request({url: stationsLink});
_battuta = DataStream
.pipeline(
requestObject,
JSONStream.parse('*')
)
.catch(e => {
requestObject.abort();
e.stream.end();
throw e;
})
.keep(Infinity);
}
return _battuta.rewind();
}
async function fetchMetadata (source) {
return getBattutaStream()
.filter(({stationId}) => stationId.startsWith(source.country))
.accumulate((acc, item) => (acc[item.stationId] = item), {});
}
function fetchPollutants (source, stations) {
const pollutants = acceptableParameters.map(
(pollutant) => pollutant === 'pm25' ? 'PM2.5' : pollutant.toUpperCase()
);
return new MultiStream(
pollutants.map(pollutant => {
const url = source.url + source.country + '_' + pollutant + '.csv';
const timeLastInsert = moment().utc().subtract(2, 'hours');
let header;
return new StringStream()
.use(stream => {
const resp = request.get({url})
.on('response', ({statusCode}) => {
+statusCode !== 200
? stream.end()
: resp.pipe(stream);
});
return stream;
})
.CSVParse({header: false, delimiter: ',', skipEmptyLines: true})
.shift(1, columns => (header = columns[0]))
.filter(o => o.length === header.length)
.map(o => header.reduce((a, c, i) => { a[c] = o[i]; return a; }, {}))
// TODO: it would be good to provide the actual last fetch time so that we can filter already inserted items in a better way
.filter(o => moment(o.value_datetime_inserted).utc().isAfter(timeLastInsert))
// eslint-disable-next-line eqeqeq
.filter(o => o.value_validity == 1) // Purposefully using '==' in case the 1 is a string or number
.filter(o => o.value_numeric.trim() !== '') // Catch emptry string
.filter(o => o.station_code in stations)
.map(record => {
const matchedStation = stations[record.station_code];
const timeZone = tzlookup(matchedStation.latitude, matchedStation.longitude);
return {
location: record['station_code'],
city: matchedStation.city ? matchedStation.city : (
matchedStation.location ? matchedStation.location : source.city
),
coordinates: {
latitude: Number(matchedStation.latitude),
longitude: Number(matchedStation.longitude)
},
parameter: record['pollutant'].toLowerCase().replace('.', ''),
date: makeDate(record['value_datetime_end'], timeZone),
value: Number(record['value_numeric']),
unit: record['value_unit'],
attribution: [{
name: 'EEA',
url: source.sourceURL
}],
averagingPeriod: {
unit: 'hours',
value: 1
}
};
});
}))
.mux()
;
}
const makeDate = (date, timeZone) => {
// parse date, considering its utc offset
date = moment.parseZone(date);
// pass parsed date as a string plus station timeZone offset to generate local time.
const localDate = moment.tz(date.format(), timeZone);
// Force local data format to get around issue where +00:00 shows up as Z
return {
utc: date.toDate(),
local: localDate.format('YYYY-MM-DDTHH:mm:ssZ')
};
};