### Load data into Postgresql

As well as data wrangling in pandas and Dask, here we load data into Postgres - advantages - much less memory intensive than Pandas. Negatives - SQL queries can be complex.

Any power utility will be more than likely using a relational database to store structured data like this. 

We could alternatively use a NoSQL database (or even hadoop), but data structure wont change significatly and I expect Postgress to be more efficient than say MongoDB for this particular dataset.

In [1]:
import os

from fastai.structured import *
from fastai.column_data import *
import dask
import dask.dataframe as dd

#note we need to run pip install psycopg2 and pip install sqlalchemy into out fastai conda env
import psycopg2
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy import MetaData
from sqlalchemy import Table, text

  """)


In [2]:
pd.set_option('display.max_columns', None)

In [3]:
PARENT_PATH = PATH='../input/'
PATH='../input/merged_data/'

We use dask here to check our data format - dask is super fast for this

In [4]:
dtypes = {'Bank_holiday':'object',
          'LCL_day_uid': 'object',
          'LCLid': 'object'}

In [5]:
dd_melted = dd.read_csv(f'{PATH}/hh_melted_date_all_deltas.csv', dtype=dtypes, parse_dates=['day'])

In [6]:
dd_melted.head()

Unnamed: 0.2,Unnamed: 0,Unnamed: 0.1,LCLid,day,energy_median,energy_mean,energy_max,energy_count,energy_std,energy_sum,energy_min,LCL_day_uid,Bank_holiday,time,energy(kWh/hh),dayYear,dayMonth,dayWeek,dayDay,dayDayofweek,dayDayofyear,dayIs_month_end,dayIs_month_start,dayIs_quarter_end,dayIs_quarter_start,dayIs_year_end,dayIs_year_start,dayElapsed,temp_day,str_time,day_time,delta_minutes
0,0,0,MAC000002,2012-10-12,0.1385,0.154304,0.886,46.0,0.196034,7.098,0.0,MAC000002_2012-10-12,,1900-01-01 00:00:00,,2012,10,41,12,4,286,False,False,False,False,False,False,1350000000,1900-01-01,2018-10-13 00:00:00,2012-10-12 00:00:00,0
1,1,1,MAC000002,2012-10-12,0.1385,0.154304,0.886,46.0,0.196034,7.098,0.0,MAC000002_2012-10-12,,1900-01-01 00:30:00,0.0,2012,10,41,12,4,286,False,False,False,False,False,False,1350000000,1900-01-01,2018-10-13 00:30:00,2012-10-12 00:30:00,30
2,2,2,MAC000002,2012-10-12,0.1385,0.154304,0.886,46.0,0.196034,7.098,0.0,MAC000002_2012-10-12,,1900-01-01 01:00:00,0.0,2012,10,41,12,4,286,False,False,False,False,False,False,1350000000,1900-01-01,2018-10-13 01:00:00,2012-10-12 01:00:00,60
3,3,3,MAC000002,2012-10-12,0.1385,0.154304,0.886,46.0,0.196034,7.098,0.0,MAC000002_2012-10-12,,1900-01-01 01:30:00,0.0,2012,10,41,12,4,286,False,False,False,False,False,False,1350000000,1900-01-01,2018-10-13 01:30:00,2012-10-12 01:30:00,90
4,4,4,MAC000002,2012-10-12,0.1385,0.154304,0.886,46.0,0.196034,7.098,0.0,MAC000002_2012-10-12,,1900-01-01 02:00:00,0.0,2012,10,41,12,4,286,False,False,False,False,False,False,1350000000,1900-01-01,2018-10-13 02:00:00,2012-10-12 02:00:00,120


In [7]:
dd_melted = None

### PostgreSQL

Now we read and persist the data to postgres - we could alternatively use dask for further joins etc but postgres will be faster

In [4]:
db_password = os.environ['KAGGLER_PASSWORD']

In [5]:
engine = create_engine(f'postgresql+psycopg2://kaggler:{db_password}@localhost/london_smartmeter')
#alternatively if dont want to install postgres use sqlite
#engine = create_engine('sqlite:///hh_data.db')

In [35]:
# Create MetaData instance
metadata = MetaData(engine, reflect=True)
print(metadata.tables)

immutabledict({'information_households': Table('information_households', MetaData(bind=Engine(postgresql+psycopg2://kaggler:***@localhost/london_smartmeter)), Column('lclid', VARCHAR(), table=<information_households>, primary_key=True, nullable=False), Column('stdortou', VARCHAR(), table=<information_households>), Column('acorn', VARCHAR(), table=<information_households>), Column('acorn_grouped', VARCHAR(), table=<information_households>), Column('file', VARCHAR(), table=<information_households>), schema=None), 'hh_data': Table('hh_data', MetaData(bind=Engine(postgresql+psycopg2://kaggler:***@localhost/london_smartmeter)), Column('lclid', VARCHAR(), table=<hh_data>), Column('day', DATE(), table=<hh_data>), Column('energy_median', REAL(), table=<hh_data>), Column('energy_mean', REAL(), table=<hh_data>), Column('energy_max', REAL(), table=<hh_data>), Column('energy_count', REAL(), table=<hh_data>), Column('energy_std', REAL(), table=<hh_data>), Column('energy_sum', REAL(), table=<hh_data

  


In [7]:
#Drop the table before we re-load

hh_data_table = metadata.tables['hh_data']
hh_data_table.drop(engine)


In [10]:
hh_sql = 'CREATE TABLE "hh_data" (Unnamed_0 VARCHAR, Unnamed_1 VARCHAR, LCLid VARCHAR, day DATE, \
            energy_median REAL, energy_mean REAL, energy_max REAL, energy_count REAL, \
            energy_std REAL, energy_sum REAL, energy_min REAL, LCL_day_uid VARCHAR, \
            Bank_holiday VARCHAR, time VARCHAR, energy_kWh_hh REAL, dayYear INTEGER, \
            dayMonth INTEGER, dayWeek INTEGER, dayDay INTEGER, dayDayofweek INTEGER, \
            dayDayofyear INTEGER, dayIs_month_end BOOLEAN, dayIs_month_start BOOLEAN, \
            dayIs_quarter_end BOOLEAN, dayIs_quarter_start BOOLEAN, dayIs_year_end BOOLEAN, \
            dayIs_year_start BOOLEAN, dayElapsed BIGINT, temp_day VARCHAR, str_time TIME, \
            day_time VARCHAR, delta_minutes INTEGER);'

In [11]:
engine.execute(hh_sql)

<sqlalchemy.engine.result.ResultProxy at 0x7fc9b81aef60>

In [12]:
hh_file_path = os.path.abspath(f'{PATH}hh_melted_date_all_deltas.csv')

In [13]:
copy_sql = f"COPY hh_data FROM '{hh_file_path}' WITH CSV HEADER DELIMITER ','"


In [14]:
connection = engine.connect()
with connection.begin():
    connection.execute(copy_sql)

In [15]:
print('Done')

Done


We dont want too small a chunksize, reduce size if have memory issues

Note we manually specify names as the energy column has brackets

In [7]:
dtypes = {'Bank_holiday':'object',
          'LCL_day_uid': 'object',
          'LCLid': 'object'}

#this chunksize will use c. 40GB RAM - but takes too long, use COPY statement in following cell
chunksize=100
start = time.time()
for chunk in pd.read_csv(f'{PATH}/hh_melted_date_all_deltas.csv', chunksize=chunksize, dtype=dtypes, parse_dates=['day', 'str_time'], 
                         names=['Unnamed_0','Unnamed_1','LCLid','day','energy_median',
                                'energy_mean','energy_max','energy_count','energy_std',
                                'energy_sum','energy_min','LCL_day_uid','Bank_holiday',
                                'time','energy_kWh_hh','dayYear','dayMonth','dayWeek',
                                'dayDay','dayDayofweek','dayDayofyear','dayIs_month_end',
                                'dayIs_month_start','dayIs_quarter_end','dayIs_quarter_start',
                                'dayIs_year_end','dayIs_year_start','dayElapsed','temp_day',
                                'str_time','day_time','delta_minutes']
                                ):
    chunk.to_sql(name="hh_data", con=engine, if_exists="append", index=False)  #"name" is name of table 
    time_now = time.time()
    print(f'parsed row: {chunk.iloc[0, 1]}, elapsed: {time_now - start}')


In [10]:
rs = engine.execute('DELETE FROM hh_data')

In [18]:
#instead I used dbeaver to lod the csv using COPY 

In [37]:
#LCLid 	stdorToU 	Acorn 	Acorn_grouped 	file
ih_sql = 'CREATE TABLE "information_households" (LCLid VARCHAR, stdorToU VARCHAR, Acorn VARCHAR, Acorn_grouped VARCHAR, file VARCHAR, PRIMARY KEY (LCLid));'

In [38]:
engine.execute(ih_sql)

<sqlalchemy.engine.result.ResultProxy at 0x7fee9b8ff550>

In [18]:
#also used dbeaver to load ih csv

In [16]:
result = engine.execute('SELECT * from information_households')

In [17]:
result.rowcount

5566

In [46]:
hh_file_path = os.path.abspath(f'{PATH}hh_melted_date_all_deltas.csv')

In [47]:
copy_sql = f"COPY hh_data FROM '{hh_file_path}' WITH CSV HEADER DELIMITER ','"


In [None]:
engine.execute(copy_sql)

In [17]:
hh_one = engine.execute('SELECT * FROM hh_data').first()

In [18]:
hh_one

('143204643', '143204643', 'MAC004805', datetime.date(2013, 9, 25), 0.1245, 0.151208, 0.974, 48.0, 0.154383, 7.258, 0.041, 'MAC004805_2013-09-25', None, '1900-01-01 01:30:00', 0.062, 2013, 9, 39, 25, 2, 268, False, False, False, False, False, False, 1380067200, '1900-01-01', datetime.time(1, 30), '2013-09-25 01:30:00', 501210)

In [29]:
#superfluous columns in hh_data then dropped in database gui tool (dbeaver) - very simple

Create an ordered table - SQL admin experts say this is not required but is going to make my life easier

In [8]:
engine.execute('CREATE TABLE hh_calc AS SELECT * FROM hh_data ORDER BY LCLid, day, str_time;')

<sqlalchemy.engine.result.ResultProxy at 0x7f7351c85780>

In [62]:
hh_sql = 'SELECT LCLid, SUM(energy_kwh_hh) OVER (ORDER BY day, str_time) AS cum_energy FROM hh_calc ORDER BY day, str_time;'
result = engine.execute(hh_sql)

In [64]:
list(result)[:10]

[('MAC000145', None),
 ('MAC000156', None),
 ('MAC000155', None),
 ('MAC000147', None),
 ('MAC000157', None),
 ('MAC000154', None),
 ('MAC000150', None),
 ('MAC000146', None),
 ('MAC000151', None),
 ('MAC000152', None)]

### Export data

In [67]:
#note the lack of trailing semi-colon in the query string, as per the Postgres documentation
query = "SELECT LCLid, day, energy_kwh_hh, str_time FROM hh_calc"

outputquery = "COPY ({0}) TO STDOUT WITH CSV HEADER".format(query)

connection = engine.raw_connection()
with open(f'{PATH}/energy_only.csv', 'w') as f:
        connection.cursor().copy_expert(outputquery, f)

### Weather data

In [44]:
hh_weather = pd.read_feather(f'{PATH}/hh_weather_interpolated.feather')

In [45]:
hh_weather['time'] = pd.to_datetime(hh_weather['time'], format='%H:%M:%S').dt.time

In [46]:
hh_weather.head()

Unnamed: 0,visibility,windBearing,temperature,time,dewPoint,pressure,apparentTemperature,windSpeed,precipType,humidity,summary,day
0,5.97,104.0,10.24,00:00:00,8.86,1016.76,10.24,2.77,rain,0.91,Partly Cloudy,2011-11-11
1,5.425,101.5,10.0,00:30:00,8.845,1016.695,9.24,2.86,rain,0.925,Partly Cloudy,2011-11-11
2,4.88,99.0,9.76,01:00:00,8.83,1016.63,8.24,2.95,rain,0.94,Partly Cloudy,2011-11-11
3,4.29,98.5,9.61,01:30:00,8.81,1016.495,8.0,3.06,rain,0.95,Partly Cloudy,2011-11-11
4,3.7,98.0,9.46,02:00:00,8.79,1016.36,7.76,3.17,rain,0.96,Partly Cloudy,2011-11-11


In [47]:
#hh_weather['day'] = pd.to_datetime(hh_weather['day'], format='%Y-%m-%d')

In [48]:
hh_weather.dtypes

visibility             float64
windBearing            float64
temperature            float64
time                    object
dewPoint               float64
pressure               float64
apparentTemperature    float64
windSpeed              float64
precipType              object
humidity               float64
summary                 object
day                     object
dtype: object

In [49]:
hh_weather.to_sql('hh_weather', engine)

In [43]:
#hh_w = metadata.tables['hh_weather']
#hh_w.drop(engine)

### Household data

In [None]:
informations_households = pd.read_csv(f'{PATH}informations_households.csv', low_memory=False) 

In [None]:
lclids = informations_households['LCLid'].unique()

we could sort then split alphabetically, but lets split by acorn

In [None]:
lclids = sorted(lclids)

In [None]:
len(lclids), lclids[0]

### Acorn data

In [51]:
acorn_details = pd.read_csv(f'{PATH}acorn_details.csv', low_memory=False) 

In [52]:
acorn_details.head()

Unnamed: 0,MAIN CATEGORIES,CATEGORIES,REFERENCE,ACORN-A,ACORN-B,ACORN-C,ACORN-D,ACORN-E,ACORN-F,ACORN-G,ACORN-H,ACORN-I,ACORN-J,ACORN-K,ACORN-L,ACORN-M,ACORN-N,ACORN-O,ACORN-P,ACORN-Q
0,POPULATION,Age,Age 0-4,77.0,83.0,72.0,100.0,120.0,77.0,97.0,97.0,63.0,119.0,67.0,114.0,113.0,89.0,123.0,138.0,133.0
1,POPULATION,Age,Age 5-17,117.0,109.0,87.0,69.0,94.0,95.0,102.0,106.0,67.0,95.0,64.0,108.0,116.0,86.0,89.0,136.0,106.0
2,POPULATION,Age,Age 18-24,64.0,73.0,67.0,107.0,100.0,71.0,83.0,89.0,62.0,104.0,459.0,97.0,96.0,86.0,117.0,109.0,110.0
3,POPULATION,Age,Age 25-34,52.0,63.0,62.0,197.0,151.0,66.0,90.0,88.0,63.0,132.0,145.0,109.0,96.0,90.0,140.0,120.0,120.0
4,POPULATION,Age,Age 35-49,102.0,105.0,91.0,124.0,118.0,93.0,102.0,103.0,76.0,111.0,67.0,99.0,98.0,90.0,102.0,103.0,100.0


In [53]:
acorn_details.to_sql('acorn_details', engine)