-
Notifications
You must be signed in to change notification settings - Fork 39
/
caaqm.js
134 lines (123 loc) · 4.32 KB
/
caaqm.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
'use strict';
import { REQUEST_TIMEOUT } from '../lib/constants';
import { default as baseRequest } from 'request';
import { default as moment } from 'moment-timezone';
import { acceptableParameters } from '../lib/utils';
import log from '../lib/logger';
import JSONStream from 'JSONStream';
import { DataStream } from 'scramjet';
import https from 'https';
import { FetchError, DATA_URL_ERROR } from '../lib/errors';
// From: https://github.com/node-fetch/node-fetch/issues/568#issuecomment-932200523
const agent = new https.Agent({
rejectUnauthorized: false
});
const request = baseRequest.defaults({timeout: REQUEST_TIMEOUT});
export const name = 'caaqm';
export async function fetchStream (source) {
const requestOptions = {
method: 'POST',
headers: {
'accept-language': 'en-US,en',
'content-type': 'application/x-www-form-urlencoded',
accept: 'application/json'
},
form: false
};
const options = Object.assign(requestOptions, {
url: source.url,
body: Buffer.from('{"region":"landing_dashboard"}').toString('base64'),
agent
});
const requestObject = request(options);
return DataStream
.pipeline(
requestObject,
JSONStream.parse('map.station_list.*')
)
.catch(e => {
requestObject.abort();
e.stream.end();
throw e;
})
.setOptions({ maxParallel: 20 })
.into((siteStream, site) => {
return siteStream.whenWrote({
station_id: site['station_id'],
station_name: site['station_name'],
coords: {
latitude: Number(site['latitude']),
longitude: Number(site['longitude'])
},
status: site['status']
});
}, new DataStream())
.filter((station) => {
return station.status === 'Live';// && station.station_id == 'site_293';
})
.into(
async (measurements, {coords, station_id: stationId}) => {
const options = Object.assign(requestOptions, {
url: 'https://app.cpcbccr.com/caaqms/caaqms_viewdata_v2',
body: Buffer.from(`{"site_id":"${stationId}"}`).toString('base64'),
resolveWithFullResponse: true,
timeout: 20000
});
try {
const body = await getInfo(options, stationId);
const {siteInfo, tableData: {bodyContent}} = JSON.parse(body);
await (
DataStream
.from(bodyContent)
.each(async p => {
let parameter = p.parameters.toLowerCase().replace('.', '');
parameter = (parameter === 'ozone') ? 'o3' : parameter;
// Make sure we want the pollutant
if (!acceptableParameters.includes(parameter)) {
return;
}
let m = {
averagingPeriod: {unit: 'hours', value: 0.25},
city: siteInfo.city,
location: siteInfo.siteName,
coordinates: coords,
attribution: [{
name: 'Central Pollution Control Board',
url: 'https://app.cpcbccr.com/ccr/#/caaqm-dashboard-all/caaqm-landing'
}],
parameter: parameter,
unit: p.unit,
value: Number(p.concentration)
};
// Date
// const date = moment.tz(`${p.date} ${p.time}`, 'DD MMM YYYY HH:mm', 'Asia/Kolkata');
const date = moment.tz(`${p.toDate}`, 'DD MMM YYYY HH:mm', 'Asia/Kolkata');
m.date = {utc: date.toDate(), local: date.format()};
await measurements.whenWrote(m);
})
.run()
);
} catch (e) {
const message = (e.statusCode)
? `Status code ${e.statusCode} received on http request for station`
: `${e.message}`;
throw new FetchError(DATA_URL_ERROR, source, e, message);
}
},
new DataStream()
)
;
}
async function getInfo (options, stationId) {
return new Promise((resolve, reject) => {
request.post(options, (err, res, body) => {
log.debug(`stationId: ${stationId}, statusCode: ${res ? res.statusCode : 'unknown'}`);
if (err) {
log.error(err ? `${err.message} for adapter: ${name} - stationId: ${stationId}` : 'error');
reject(err);
} else {
resolve(body);
}
});
});
}