In [None]:
import sqlite3
import orjson  # hurtigere json bibliotek
import pandas as pd
from zipfile import ZipFile
import time

In [None]:
con = sqlite3.connect("dmi.db")
con.execute('PRAGMA journal_mode = OFF;')
con.execute('PRAGMA synchronous = OFF;')
con.execute('PRAGMA cache_size = 1000000;')  # give it a GB
con.execute('PRAGMA locking_mode = EXCLUSIVE;')
cur = con.cursor()

In [None]:
cur.execute(
    '''CREATE TABLE IF NOT EXISTS features (
        stationId INT,
        observed_dt TEXT,
        parameterId INT,
        value REAL,
        PRIMARY KEY (stationId, parameterId, observed_dt)
        ) WITHOUT ROWID;
    ''')
con.commit()

cur.execute(
    '''CREATE TABLE IF NOT EXISTS stations (
        stationId INT,
        name TEXT,
        country TEXT,
        owner TEXT,
        type TEXT,
        status TEXT,
        stationHeight REAL,
        barometerHeight REAL,
        latitude REAL,
        longitude REAL,
        region INT,
        created_dt TEXT,
        operationFrom_dt TEXT,
        operationTo_dt TEXT,
        updated_dt TEXT,
        validFrom_dt TEXT,
        validTo_dt TEXT,
        wmoCountryCode INT,
        wmoStationId TEXT
        )
    ''')
con.commit()

cur.execute(
    '''CREATE TABLE IF NOT EXISTS parameters (
        parameterId INT,
        name TEXT,
        unit TEXT,
        description TEXT,
        update_frequency TEXT,
        available_denmark INT,
        available_greenland INT
        )
    ''')
con.commit()

Metrologisk observationsstationer hentet fra: https://confluence.govcloud.dk/display/FDAPI/Meteorological+Observation+Data+Stations

In [None]:
maalestationer = pd.read_csv('målestationer.csv')
columns_stations = [r[1] for r in cur.execute('PRAGMA table_info(stations)').fetchall()]
cur.executemany('insert into stations values ('+'?, '*(len(columns_stations)-1)+'?);', 
                maalestationer[columns_stations].values)
con.commit()

Parameter er hentet her fra: https://confluence.govcloud.dk/display/FDAPI/Meteorological+Observation+Data

In [None]:
parameter = pd.read_csv('parameter.csv')
parameter['parameterId'] = list(range(parameter.shape[0]))
columns_parameter = [r[1] for r in cur.execute('PRAGMA table_info(parameters)').fetchall()]
cur.executemany('insert into parameters values ('+'?, '*(len(columns_parameter)-1)+'?);', 
                parameter[columns_parameter].values)
con.commit()

all.zip filen som indeholder alle observationerne er hentet her fra https://confluence.govcloud.dk/display/FDAPI/Download

In [None]:
%%time
# tager ca. 4 timer

n_filer_behandlet = 0
pct_last_print = 0
t0 = time.time()
d_table = {}  # dict fordi der kan være duplicates på primary key
parameterIdMap = dict(parameter[['name', 'parameterId']].values)

fejl = []

try:
    with ZipFile('all.zip', 'r') as all_data:
        data_files = sorted(all_data.namelist())
        for df in data_files:
            for l in all_data.open(df):
                d = orjson.loads(l)
                if d['type'] == 'Feature':
                    d_prop = d['properties']
                    row_key = (int(d_prop.get('stationId')),
                               d_prop.get('observed', None),
                               parameterIdMap.get(d_prop.get('parameterId', None), None))
                    row_value = d_prop.get('value', None)
                    
                    if any((r==None for r in row_key)):
                        fejl.append(row_key)
                    else:
                        d_table[row_key] = row_value  # hvis der er duplicates overskives den forrige
                    
            n_filer_behandlet += 1
            if len(d_table) > 100000:
                cur.executemany('insert into features values (?, ?, ?, ?);',
                                ((s, o, p, v) for (s, o, p), v in d_table.items()))
                con.commit()
                d_table = {}
                t_eps = time.time()-t0
                pct_done = n_filer_behandlet/len(data_files)
                if pct_done-pct_last_print > 0.01:
                    print(f'{pct_done:.0%} filer behandlet. Tid gået {t_eps/60:.1f} minutter')
                    pct_last_print = pct_done

        cur.executemany('insert into features values (?, ?, ?, ?);',
                        ((s, o, p, v) for (s, o, p), v in d_table.items()))
        con.commit()
        t_eps = time.time()-t0
        pct_done = n_filer_behandlet/len(data_files)
        print(f'{pct_done:.0%} filer behandlet. Tid gået {t_eps/60:.1f} minutter')
        con.close()

except Exception as e:
    print(f'Fil: {df}')
    print(f'line: {l}')
    print('d_prop')
    print(d_prop)

    raise e