In [279]:
import psycopg2 as pg2
import psycopg2.extras
import psycopg2.extensions
import pickle
import pandas as pd
import numpy as np
import pint_pandas
import tqdm
import os

# representation
* load all example data sets into the data structure at once
* measure memory use
* export the data sets to CSV
* reimport from CSV
* export the data sets to some kind of native storage format
* reimport from the native storage format


## read in the data

In [66]:
conn = pg2.connect("dbname=ev", cursor_factory=pg2.extras.NamedTupleCursor)

In [67]:
cur = conn.cursor()

In [87]:
def open_dequantify(fpath, deq_col):
    with open(fpath, 'rb') as fd:
        df = pickle.load(fd)
    df['Unit'] = str(df[deq_col].pint.units)
    df[deq_col] = df[deq_col].pint.magnitude
    return df

In [88]:
fao_emi_total_pd = open_dequantify('mem/fao_emi_total_wo.pck', 'Emissions')

In [59]:
cur.execute("""create table fao_emi_total (
Area text,
Date date,
Element text,
Item text,
Emissions double precision,
Unit text,
primary key (Area, Date, Element, Item)
)""")

In [60]:
cur.executemany("""insert into fao_emi_total (Area, Date, Element, Item, Emissions, Unit) values (%s, %s, %s, %s, %s, %s)""",
                (fao_emi_total_pd.itertuples(index=False, name=None)))

In [89]:
lak_emi_energy_pd = open_dequantify('mem/lak_emi_energy_wo.pck', 'Emissions')

In [73]:
cur.execute("""create table lak_emi_energy (
Area text,
Category text,
Date date,
Emissions double precision,
Unit text,
primary key (Area, Category, Date)
)""")

In [75]:
cur.executemany("""insert into lak_emi_energy (Area, Category, Date, Emissions, Unit) values (%s, %s, %s, %s, %s)""",
                (lak_emi_energy_pd.itertuples(index=False, name=None)))

In [76]:
conn.commit()

In [90]:
cmip_conc_co2_pd = open_dequantify('mem/cmip_conc_co2_wo.pck', 'CO2 concentration')

In [127]:
def adapt_period_cmip(period):
    if period.year < 1:
        year = 1 - period.year
        suffix = 'BC'
    else:
        year = period.year
        suffix = ''
    return pg2.extensions.AsIs(f"'{year:04d}-{period.month:02d}-01{suffix}'")

pg2.extensions.register_adapter(pd._libs.tslibs.period.Period, adapt_period_cmip)

In [184]:
def cast_date(value, cur):
    if value is None:
        return None
    
    if value.endswith('BC'):
        bc = True
        value = value.split()[0]
    else:
        bc = False
    
    y, m, d = value.split('-')
    y = int(y)
    if bc:
        y = 1-y  # np.datetime64 counts everything AD and has the year 0 AD
    return np.datetime64(f'{y:04d}-{m}-{d}')

extended_date = pg2.extensions.new_type((1082, ), 'date', cast_date)
pg2.extensions.register_type(extended_date)

In [126]:
cur.execute("""create table cmip_conc_co2 (
Area text,
Date date,
"CO2 concentration" double precision,
Unit text,
primary key (Area, Date)
)""")

In [128]:
cur.executemany("""insert into cmip_conc_co2 (Area, Date, "CO2 concentration", Unit) values (%s, %s, %s, %s)""",
                (cmip_conc_co2_pd.itertuples(index=False, name=None)))

In [129]:
conn.commit()

In [188]:
prm_emi_pd = open_dequantify('mem/prm_emi_wo.pck', 'Emissions')

In [189]:
prm_emi_pd

Unnamed: 0,Area,Category,Date,Entity,Scenario,Emissions,Unit
0,ABW,IPC1,1850-01-01,CH4,HISTCR,0.000387,gigagram
1,ABW,IPC1,1850-01-01,CH4,HISTTP,0.000387,gigagram
2,ABW,IPC1,1850-01-01,CO2,HISTCR,40.300000,gigagram
3,ABW,IPC1,1850-01-01,CO2,HISTTP,40.300000,gigagram
4,ABW,IPC1,1850-01-01,KYOTOGHG (CO2eq),HISTCR,40.300000,gigagram
...,...,...,...,...,...,...,...
4990919,ZWE,IPCMAGELV,2017-01-01,KYOTOGHG (CO2eq),HISTTP,6630.000000,gigagram
4990920,ZWE,IPCMAGELV,2017-01-01,KYOTOGHGAR4 (CO2eq),HISTCR,6040.000000,gigagram
4990921,ZWE,IPCMAGELV,2017-01-01,KYOTOGHGAR4 (CO2eq),HISTTP,6640.000000,gigagram
4990922,ZWE,IPCMAGELV,2017-01-01,N2O,HISTCR,15.600000,gigagram


In [190]:
cur.execute("""create table prm_emi (
Area text,
Category text,
Date date,
Entity text,
Scenario text,
Emissions double precision,
Unit text,
primary key (Area, Category, Date, Entity, Scenario)
)""")

In [192]:
cur.executemany("""insert into prm_emi (Area, Category, Date, Entity, Scenario, Emissions, Unit) values (%s, %s, %s, %s, %s, %s, %s)""",
                (prm_emi_pd.itertuples(index=False, name=None)))

In [194]:
conn.commit()

In [220]:
with open(f'mem/primap_tables_wo.pck', 'rb') as fd:
    primap_tables_pd = pickle.load(fd)
    

In [221]:
cur.execute("""create table primap_metadata (
primap_metadata_id serial unique,
Category text,
Class text,
Description text,
Note text,
Scenario text,
Source text,
Type text,
Entity text,
Unit text,
primary key (Category, Class, Scenario, Source, Type, Entity)
)""")

In [222]:
cur.execute("""create table primap_data (
primap_metadata_id integer references primap_metadata (primap_metadata_id),
Area text,
Date date,
Value double precision,
primary key (primap_metadata_id, Area, Date)
)""")

In [223]:
for key in tqdm.tqdm_notebook(primap_tables_pd):
    meta, df = primap_tables_pd[key]
    
    entity = df.columns[-1]
    unit = str(df[entity].pint.units)
    df[entity] = df[entity].pint.magnitude
    
    cur.execute("""insert into primap_metadata (Category, Class, Description, Note, Scenario, Source, Type, Entity, Unit)
                   values (%s, %s, %s, %s, %s, %s, %s, %s, %s)
                   returning primap_metadata_id""",
                (meta['category'], meta['class'], meta['descr'], meta['note'], meta['scenario'], meta['source'], meta['type'], entity, unit))
    metadata_id = cur.fetchone()[0]
    
    cur.executemany(f"""insert into primap_data (primap_metadata_id, Area, Date, Value)
                        values ({metadata_id}, %s, %s, %s)""",
                    df.itertuples(index=False, name=None))

HBox(children=(IntProgress(value=0, max=4118), HTML(value='')))




In [224]:
conn.commit()

## Measure memory use

In [225]:
!systemctl status postgresql@12-main.service

[0;1;32m●[0m postgresql@12-main.service - PostgreSQL Cluster 12-main
     Loaded: loaded (]8;;file://lt874/lib/systemd/system/postgresql@.service/lib/systemd/system/postgresql@.service]8;;; enabled-runtime; vendor preset: enabled)
     Active: [0;1;32mactive (running)[0m since Mon 2020-09-21 15:34:07 CEST; 20h ago
   Main PID: 23721 (postgres)
      Tasks: 16 (limit: 18915)
     Memory: 2.3G
     CGroup: /system.slice/system-postgresql.slice/postgresql@12-main.service
             ├─ 23721 /usr/lib/postgresql/12/bin/postgres -D /var/lib/postgresq…
             ├─ 23723 postgres: 12/main: checkpointer
             ├─ 23724 postgres: 12/main: background writer
             ├─ 23725 postgres: 12/main: walwriter
             ├─ 23726 postgres: 12/main: autovacuum launcher
             ├─ 23727 postgres: 12/main: stats collector
             ├─ 23728 postgres: 12/main: logical replication launcher
             ├─ 27206 postgres: 12/main: pflueger ev [local] idle
             ├─ 3453

In [233]:
sizes = pd.read_sql_query("""SELECT relname, relpages * 8 as relsize_kib
FROM pg_class
where relnamespace = 2200
ORDER BY relpages DESC;""",
                 con="postgres:///ev")
sizes

Unnamed: 0,relname,relsize_kib
0,primap_data,681688
1,prm_emi,438696
2,primap_data_pkey,412864
3,prm_emi_pkey,271448
4,fao_emi_total,50456
5,fao_emi_total_pkey,39912
6,cmip_conc_co2,4464
7,cmip_conc_co2_pkey,1816
8,primap_metadata,1432
9,primap_metadata_pkey,384


In [235]:
sizes['relsize_kib'].sum() / 1024  # MiB

1859.0625

In [236]:
!ls -lah mem/

total 674M
drwxrwxr-x  2 pflueger pflueger 4,0K Aug 27 10:55 .
drwxrwxr-x 10 pflueger pflueger 4,0K Sep 22 12:14 ..
-rw-rw-r--  1 pflueger pflueger 1,2M Aug 27 18:49 cmip_conc_co2.pck
-rw-rw-r--  1 pflueger pflueger 1,2M Aug 27 18:53 cmip_conc_co2_wo.pck
-rw-rw-r--  1 pflueger pflueger 8,8M Aug 27 18:49 fao_emi_total.pck
-rw-rw-r--  1 pflueger pflueger 9,7M Aug 27 18:53 fao_emi_total_wo.pck
-rw-rw-r--  1 pflueger pflueger  40K Aug 27 18:49 lak_emi_energy.pck
-rw-rw-r--  1 pflueger pflueger  50K Aug 27 18:53 lak_emi_energy_wo.pck
-rw-rw-r--  1 pflueger pflueger 205M Aug 27 18:49 primap_tables.pck
-rw-rw-r--  1 pflueger pflueger 244M Aug 27 18:53 primap_tables_wo.pck
-rw-rw-r--  1 pflueger pflueger 105M Aug 27 18:49 prm_emi.pck
-rw-rw-r--  1 pflueger pflueger 100M Aug 27 18:53 prm_emi_wo.pck


## I/O
* export the data sets to CSV
* reimport from CSV
* export the data sets to some kind of native storage format
* reimport from the native storage format

In [247]:
for table in ('prm_emi', 'fao_emi_total', 'cmip_conc_co2', 'lak_emi_energy'):
    with open(f'psql/{table}.csv', 'wb') as fd:
        cur.copy_expert(f"""copy {table}
        to stdout
        ( format csv, header true )
        """, fd)

In [248]:
!ls -la psql/

total 300504
drwxrwxr-x  2 pflueger pflueger      4096 Sep 22 18:00 .
drwxrwxr-x 11 pflueger pflueger      4096 Sep 22 18:00 ..
-rw-rw-r--  1 pflueger pflueger   3697625 Sep 22 18:00 cmip_conc_co2.csv
-rw-rw-r--  1 pflueger pflueger  34561578 Sep 22 18:00 fao_emi_total.csv
-rw-rw-r--  1 pflueger pflueger    188021 Sep 22 18:00 lak_emi_energy.csv
-rw-rw-r--  1 pflueger pflueger 269255848 Sep 22 18:00 prm_emi.csv


In [251]:
cur.execute("""select primap_metadata_id from primap_metadata""")
for pmid, in tqdm.tqdm_notebook(cur.fetchall()):
    with open(f'psql/primap/{pmid}.csv', 'wb') as fd:
        cur.copy_expert(f"""copy (select * from primap_data where primap_metadata_id = {pmid})
        to stdout
        ( format csv, header true )
        """, fd)
        
with open('psql/primap_metadata.csv', 'wb') as fd:
    cur.copy_expert("""copy primap_metadata to stdout ( format csv, header true)""", fd)

HBox(children=(IntProgress(value=0, max=4118), HTML(value='')))




In [275]:
# reimport
# copy from does not infer data types or create tables
# that is annoying, so have to do some stuff to create tables
data_types = {'date': 'date', 'emissions': 'double precision', 'CO2 concentration': 'double precision'}

for table in ('prm_emi', 'fao_emi_total', 'cmip_conc_co2', 'lak_emi_energy'):
    with open(f'psql/{table}.csv', 'rb') as fd:
        fline = fd.readline().decode()
        headers = fline.strip().split(',')
        header_spec = ', '.join((f'"{header}" {data_types.get(header, "text")}' for header in headers))
        primary_key = ', '.join((header for header in headers if header not in ['emissions', 'CO2 concentration', 'unit']))
        
        sql = f"""create table {table}_r ({header_spec}, primary key ({primary_key}))"""
        cur.execute(sql)
        
        cur.copy_expert(f"""copy {table}_r from stdout ( format csv, header false )""", fd)

In [277]:
cur.execute("""create table primap_metadata_r (
primap_metadata_id serial unique,
Category text,
Class text,
Description text,
Note text,
Scenario text,
Source text,
Type text,
Entity text,
Unit text,
primary key (Category, Class, Scenario, Source, Type, Entity)
)""")

with open('psql/primap_metadata.csv', 'rb') as fd:
    cur.copy_expert("""copy primap_metadata_r from stdout ( format csv, header true)""", fd)
    

In [281]:
cur.execute("""create table primap_data_r (
primap_metadata_id integer references primap_metadata (primap_metadata_id),
Area text,
Date date,
Value double precision,
primary key (primap_metadata_id, Area, Date)
)""")

for fpath in tqdm.tqdm_notebook(os.listdir('psql/primap/')):
    with open(f'psql/primap/{fpath}', 'rb') as fd:
        cur.copy_expert(f"""copy primap_data_r
        from stdout
        ( format csv, header true )
        """, fd)
        

HBox(children=(IntProgress(value=0, max=4118), HTML(value='')))




In [282]:
conn.commit()

In [None]:
# native: use pg_dump.