In [1]:
import numpy as np
import pandas as pd
from pathlib import Path
import gzip
import json
import itertools
import io
import pickle
from numbers import Number

In [2]:
log_dir = Path('/Users/mairas/BTSync/Shared/hurma_data/sk-data-log')

In [3]:
log_filenames = log_dir.glob('*.log*')

In [4]:
def smart_open(fn):
    opener = gzip.open if fn.suffix=='.gz' else open
    return opener(fn)

In [5]:
def line_reader(f):
    for line in f:
        yield line if type(line)==str else line.decode('utf-8')

In [6]:
all_lines = itertools.chain.from_iterable(
    (line_reader(smart_open(lfn)) for lfn in log_filenames))

In [7]:
%%time
all_lines = list(all_lines)

CPU times: user 24.5 s, sys: 2.86 s, total: 27.3 s
Wall time: 31.8 s


In [8]:
len(all_lines)

7064114

In [9]:
l0 = all_lines[0]

In [10]:
print(l0)

{"updates":[{"source":{"label":"Arduino NMEA2000 Gateway","type":"NMEA2000","pgn":129025,"src":"1"},"timestamp":"2018-07-22T14:59:59.699Z","values":[{"path":"navigation.position","value":{"longitude":22.9369913,"latitude":59.808698}}]}],"context":"vessels.urn:mrn:imo:mmsi:230093890"}



In [11]:
def load_all_lines(all_lines):
    for line in all_lines:
        try:
            json_object = json.loads(line)
        except Exception as err:
            continue
        yield json_object

In [12]:
all_json = load_all_lines(all_lines)

In [13]:
enabled_paths = {
    'environment.depth.belowSurface',
    'environment.water.temperature',
    'environment.wind.angleApparent',
    'environment.wind.speedApparent',
    'navigation.courseOverGroundTrue',
    'navigation.headingMagnetic',
    'navigation.headingTrue',
    'navigation.magneticVariation',
    'navigation.position',
    'navigation.rateOfTurn',
    'navigation.speedOverGround',
    'navigation.speedThroughWater',
    'propulsion.port.coolantTemperature',
    'propulsion.port.exhaustTemperature',
    'propulsion.port.temperature',
}

In [14]:
def collect_data(all_json):
    data = {}
    for j in all_json:
        for upd in j['updates']:
            dt = np.datetime64(upd['timestamp'])
            for val in upd['values']:
                if val['path'] in enabled_paths:
                    if isinstance(val['value'], dict):
                        for val_k, val_v in val['value'].items():
                            if not isinstance(val_v, Number):
                                continue
                            k = val['path'] + '.' + val_k
                            data.setdefault(k, []).append((dt, val_v))
                    else:
                        if not isinstance(val['value'], Number):
                            continue
                        data.setdefault(val['path'], []).append((dt, val['value']))
    return data

In [15]:
%%time
data = collect_data(all_json)

  """


CPU times: user 2min 15s, sys: 3.99 s, total: 2min 19s
Wall time: 2min 34s


In [16]:
data.keys()

dict_keys(['navigation.position.longitude', 'navigation.position.latitude', 'environment.water.temperature', 'navigation.headingMagnetic', 'navigation.speedOverGround', 'navigation.courseOverGroundTrue', 'environment.wind.speedApparent', 'environment.wind.angleApparent', 'propulsion.port.exhaustTemperature', 'propulsion.port.temperature', 'propulsion.port.coolantTemperature', 'environment.depth.belowSurface', 'navigation.speedThroughWater', 'navigation.magneticVariation', 'navigation.rateOfTurn', 'navigation.headingTrue'])

In [17]:
def convert_to_pandas(data):
    pddata = {}
    for k, v in data.items():
        t = np.array([c[0] for c in v])
        d = np.array([c[1] for c in v])
        ser = pd.Series(d, index=t, name=k)
        pddata[k] = ser.loc[~ser.index.duplicated(keep='first')]
    return pddata

In [18]:
pddata = convert_to_pandas(data)

In [19]:
pddata['environment.wind.angleApparent']

2018-07-22 14:59:59.764   -3.106685
2018-07-22 14:59:59.884   -3.106685
2018-07-22 14:59:59.949   -3.106685
2018-07-22 15:00:00.074   -3.106685
2018-07-22 15:00:00.147   -3.106685
2018-07-22 15:00:00.253   -3.106685
2018-07-22 15:00:00.353   -3.106685
2018-07-22 15:00:00.454   -3.106685
2018-07-22 15:00:00.551   -3.106685
2018-07-22 15:00:00.669   -3.106685
2018-07-22 15:00:00.754   -3.106685
2018-07-22 15:00:00.868   -3.106685
2018-07-22 15:00:00.950   -3.106685
2018-07-22 15:00:01.074   -3.106685
2018-07-22 15:00:01.155   -3.106685
2018-07-22 15:00:01.254   -3.106685
2018-07-22 15:00:01.354   -3.054285
2018-07-22 15:00:01.470   -3.054285
2018-07-22 15:00:01.556   -3.054285
2018-07-22 15:00:01.653   -3.054285
2018-07-22 15:00:01.752   -3.054285
2018-07-22 15:00:01.857   -3.054285
2018-07-22 15:00:01.955   -3.054285
2018-07-22 15:00:02.059   -3.054285
2018-07-22 15:00:02.154   -3.054285
2018-07-22 15:00:02.276   -3.054285
2018-07-22 15:00:02.359   -3.054285
2018-07-22 15:00:02.452   -3

In [20]:
with (log_dir / "pddata.pickle").open('wb') as f:
    pickle.dump(pddata, f)