# ETL Demo

In [138]:
import os
import json
import glob
import time
import pandas as pd
import sqlalchemy

In [147]:
# set parameters
input_datadir = '../../data/dump/'
db_path = '/home/tankp/basic_data_platform/data/sg_weather.db'
dt_cut = 36*86400 # 36 hour expiry time

In [26]:
curtime = time.time()*1000

In [136]:
# get all files
files = glob.glob(input_datadir+'/rainfall/*')

In [137]:
# select relevant files 
sel_files = []
for filename in files:
    _timestamp = os.path.basename(filename).split('-')[0]
    _timestamp = int(_timestamp)
    dt = (curtime - _timestamp) / 1000.
    if dt < dt_cut:
        sel_files.append(filename)
    # end if
# end for

In [62]:
logger.info('%d files selected' % (len(sel_files),))

['../../data/dump//rainfall/1541136275491-rainfall.json',
 '../../data/dump//rainfall/1541137170736-rainfall.json']

In [141]:
def extract_data(filename):
    with open(filename) as fin:
        data = json.load(fin)
    # end with

    # get station id dictionary
    station_ids = {}
    for dat in data['metadata']['stations']:
        #at = dat[0]
        station_ids[dat['device_id']] = dat['name']
    # end for

    item_data = data['items'][0]
    # get datetime
    _date = str(item_data['timestamp'].split('T')[0])

    recs = []
    for dat in item_data['readings']:
        # get station
        _station = station_ids[dat['station_id']]
        # get value
        value = dat['value']
        # make to dict
        _item = {
          'Station': _station, 
          'Datetime': _date,
          'Daily Rainfall Total (mm)': value, 
        } # end item

        recs.append(_item)
    # end for

    return recs
# end def

In [144]:
Recs = []
for filename in sel_files:
    Recs.extend(extract_data(filename))
# end for

In [146]:
# make dataframe
df_content = pd.DataFrame(Recs)

keys = ['Station', 'Datetime', 'Daily Rainfall Total (mm)', 
        'Highest 30 Min Rainfall (mm)', 'Highest 60 Min Rainfall (mm)', 
        'Highest 120 Min Rainfall (mm)', 'Mean Temperature (C)', 
        'Maximum Temperature (C)', 'Minimum Temperature (C)', 
        'Mean Wind Speed (km/h)', 'Max Wind Speed (km/h)']
df = pd.DataFrame([], columns=keys)
df = df.merge(df_content, how='outer')
df = df[keys]

# drop duplicates
df = df.drop_duplicates(subset=['Station', 'Datetime'], keep='last')

In [149]:
# update to database
#logger.info('connecting database')
engine = sqlalchemy.create_engine('sqlite:///'+db_path)
df.to_sql('raw_data', con=engine, if_exists='replace', index=None)