# Exploratory analysis

In [2]:
!

HELSINKI_100x100m_3mo.nc4 xplore.ipynb


In [113]:
import xarray as xr
import datetime

dataset = "HELSINKI_100x100m_3mo.nc4"
xds = xr.open_dataset(dataset)
dimensions = set()
for k in xds.dims:
    dimensions.add(str(k))
dimensions

{'height', 'lat', 'lon', 'time'}

In [114]:
xds

## Time series database
### InfluxDB with Chronograph
See: `influx.py`, `batcher.py`

**InfluxClient** establishes a connection to the database. 

**Batcher** is implemented which sends updates to the database in batches for high throughput (Fast Writes)

In [6]:
def t4influx(s):
    s = s[:-3]
    t = datetime.datetime.strptime(s, '%Y-%m-%dT%H:%M:%S.%f')
    x = int(t.timestamp())
    while x<1e18:
        x = x*10
    return x

In [7]:
from influx import InfluxClient
tsd = InfluxClient()

tsd.send("mem,host=host1 used_percent=24.43234543 %s"%t4influx("2020-07-01T00:00:00.000000000"))

1 items sent!


In [81]:
xds.loc[dict(lat=60.12706, lon=
24.831879, time="2020-07-01T00:00:00.000000000")].get("fmi_rel_humid").values[0]

0.778756

In [71]:
variables = list(map(lambda x: str(x), xds.variables))
variables = list(filter(lambda x: x not in dimensions, variables))
variables

['fmi_no',
 'fmi_no2',
 'fmi_pm10p0',
 'fmi_pm2p5',
 'fmi_rel_humid',
 'fmi_so2',
 'fmi_temp_2m',
 'fmi_windspeed_10m',
 'megasense_aqi',
 'megasense_co',
 'megasense_no2',
 'megasense_o3',
 'megasense_pm10p0',
 'megasense_pm2p5']

In [13]:
from grid import Grid
g = Grid(xds.lat.values, xds.lon.values)
g.grid

{'cell_0_0': {'name': 'cell_0_0',
  'polygon': [(60.12706, 24.831879), (60.14149, 24.853558)]},
 'cell_0_1': {'name': 'cell_0_1',
  'polygon': [(60.12706, 24.855364), (60.14149, 24.877043)]},
 'cell_0_2': {'name': 'cell_0_2',
  'polygon': [(60.12706, 24.878849), (60.14149, 24.900528)]},
 'cell_0_3': {'name': 'cell_0_3',
  'polygon': [(60.12706, 24.902336), (60.14149, 24.924013)]},
 'cell_0_4': {'name': 'cell_0_4',
  'polygon': [(60.12706, 24.925821), (60.14149, 24.947498)]},
 'cell_0_5': {'name': 'cell_0_5',
  'polygon': [(60.12706, 24.949306), (60.14149, 24.970984)]},
 'cell_0_6': {'name': 'cell_0_6',
  'polygon': [(60.12706, 24.972792), (60.14149, 24.994469)]},
 'cell_0_7': {'name': 'cell_0_7',
  'polygon': [(60.12706, 24.996277), (60.14149, 25.017956)]},
 'cell_0_8': {'name': 'cell_0_8',
  'polygon': [(60.12706, 25.019762), (60.14149, 25.041441)]},
 'cell_0_9': {'name': 'cell_0_9',
  'polygon': [(60.12706, 25.043247), (60.14149, 25.059505)]},
 'cell_1_0': {'name': 'cell_1_0',
  'pol

In [112]:
import statistics
import math
import numpy as np

def gridLatLongs(lats, longs):
    ret = []
    for la in lats:
        for lo in longs:
            ret.append((la,lo))
            break # Just take the first value from the combination # Instead if mean computation
        break
    return ret

def processCell(t):
    llats = g.llats
    llongs = g.llongs
    for i in range(len(llats)):
        for j in range(len(llongs)):
            lls = gridLatLongs(llats[i], llongs[j])
            cell = "cell_%d_%d"%(i,j)
            fieldSet = getFieldSet(t, lls, statistics.mean)
            polygon = str(g.grid[cell]['polygon']).replace(" ", "")
            polygon = polygon.strip("[)(\[\]]")
            polygon = polygon.replace("),(", "__")
            polygon = polygon.replace(",", "_")
            r = "vwa,grid=%s,polygon=%s %s %d"%(cell, polygon, fieldSet, t4influx(t))
            # print(r)
            tsd.send(r)

def t4influx(s):
    s = s[:-3]
    t = datetime.datetime.strptime(s, '%Y-%m-%dT%H:%M:%S.%f')
    x = int(t.timestamp())
    while x<1e18:
        x = x*10
    return x

def getFieldSet(t, lls, reducer=statistics.mean):
    d = dict()
    for v in variables:
        d[v] = []

    for lat, lon in lls:
        for v in variables:
            dlocal = dict(lat=lat, lon=lon, time=t)
            valueObj = xds.loc[dlocal].get(v).values
            # print(v, type(valueObj), valueObj)
            if isinstance(valueObj, np.ndarray): 
                val = valueObj.item()
            else:
                val = valueObj
            val = 0.0 if math.isnan(val) else val
            d[v].append(val)
            
    fields = ""
    for v in variables:
        s = "%s=%f"%(v, reducer(d[v]))
        s = s.replace(" ", "_") # Just to be sure
        fields = fields + ("" if not len(fields) else ",") + s
    return fields

In [111]:
import statistics
from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    p = Pool(8)
    print(p.map(f, [1, 2, 3]))

    
for i,t in enumerate(xds.time.values):
    if i%3 == 0:
        processCell(str(t), g.llats, g.llongs)
    if i%500==0:
        pr = (len(xds.time.values)-i)*100//len(xds.time.values)
        print("Pushed till:%s, Remaining: %d percent"%(t, pr))
    
    

15 items sent!
78 items sent!
Pushed till:2020-06-01T00:00:00.000000000, Remaining: 100 percent
77 items sent!
76 items sent!
77 items sent!
78 items sent!
77 items sent!
78 items sent!
77 items sent!
77 items sent!
77 items sent!
77 items sent!
77 items sent!
76 items sent!
77 items sent!
77 items sent!
78 items sent!
77 items sent!
78 items sent!
73 items sent!
77 items sent!
77 items sent!
77 items sent!
77 items sent!
76 items sent!
77 items sent!
72 items sent!
71 items sent!
76 items sent!
77 items sent!
77 items sent!
78 items sent!
77 items sent!
77 items sent!
78 items sent!
67 items sent!
66 items sent!
72 items sent!
78 items sent!
70 items sent!
75 items sent!
77 items sent!
77 items sent!
78 items sent!
78 items sent!
77 items sent!
78 items sent!
78 items sent!
76 items sent!
78 items sent!
76 items sent!
77 items sent!
77 items sent!
77 items sent!
77 items sent!
77 items sent!
78 items sent!
78 items sent!
77 items sent!
73 items sent!
78 items sent!
77 items sent!
77 i

KeyboardInterrupt: 