A notebook for testing sqlite-utils - mainly the upsert function to update a database when the unique constraint is violated

In [1]:
import pandas as pd
import numpy as np
import sqlite3
import sqlalchemy
from sqlite_utils import Database #datasette sqlite utilities

In [2]:
# download air quality monitoring sites
url_aqms = 'https://opendata.bristol.gov.uk/' \
'explore/dataset/air-quality-monitoring-sites/download/' \
'?format=csv&timezone=Europe/London&lang=en&use_labels_for_header=false&csv_separator=%3B'

aqms = pd.read_csv(url_aqms, sep = ";")

In [None]:
# %who

In [None]:
# %lsmagic

In [None]:
# %pwd
# %pinfo aqms

In [3]:
# change dtype for this field
aqms.colocated = aqms.colocated.fillna(0.0).astype('int', errors = 'ignore')

In [4]:
# df for the splitting of geopoint
latlon_df = aqms.geo_point_2d.str.split(",", n = 1, expand = True)

In [5]:
aqms['latitude'] = latlon_df[0].astype('float')
aqms['longitude'] = latlon_df[1].astype('float') #need long names for cluster map plugin

In [6]:
aqms.drop(columns = ['geo_point_2d'], inplace = True)

In [7]:
aqms.rename(columns = {'siteid': 'site_id'}, inplace = True)

In [8]:
# get the diffusion tube data
url_no2 = 'https://opendata.bristol.gov.uk/' \
'explore/dataset/no2-diffusion-tube-data/download/' \
'?format=csv&timezone=Europe/London&lang=en&use_labels_for_header=false&csv_separator=%3B'

usecols = ['siteid', 'count', 'conc_ugm3', 'year']

no2dt = pd.read_csv(url_no2, sep = ";", usecols = usecols)
no2dt.rename(columns = {'siteid': 'site_id'}, inplace = True)

In [9]:
# retrieve a sample of continuous data
# download using ods_api.ipynb
url_aqdc_1 = 'aqdc_df_smaller.csv'


In [10]:
# def read_aqdc_from_url(url):
#     drop_cols = ['location', 'geo_point_2d', 'datestart', 'dateend', 'current', 'instrumenttype']
#     df = pd.read_csv(url, sep = ";", usecols = lambda x: x not in drop_cols)
#     #aqdc['date_time'] = pd.to_datetime(aqdc.date_time)
#     df.rename(columns = {'siteid': 'site_id'}, inplace = True)
#     return df
    

In [11]:
# wrangle continuous data to add useful date time artefacts and clean
aqdc = pd.read_csv(url_aqdc_1, sep = ",")
aqdc.drop(columns = 'Unnamed: 0', inplace = True)
aqdc.rename(columns = {'siteid': 'site_id'}, inplace = True)
aqdc['date'] = aqdc['date_time'].str.slice(0, 10)
aqdc['hour'] = aqdc['date_time'].str.slice(11, 13).astype(int)
aqdc['year'] = aqdc['date_time'].str.slice(0, 4).astype(int)
aqdc['month'] = aqdc['date_time'].str.slice(6, 7).astype(int)
aqdc['day_of_month'] = aqdc['date_time'].str.slice(8, 10).astype(int)
aqdc['date_time'] = pd.to_datetime(aqdc.date_time)


In [12]:
aqdc

Unnamed: 0,date_time,site_id,nox,no2,no,pm10,pm25,temp,rh,date,hour,year,month,day_of_month
0,2019-04-29 23:00:00+00:00,501,122.250000,49.250000,47.750000,30.400,,,,2019-04-29,23,2019,4,29
1,2019-04-30 03:00:00+00:00,501,42.250000,35.250000,4.500000,20.275,,,,2019-04-30,3,2019,4,30
2,2019-05-01 06:00:00+00:00,501,82.000000,48.500000,21.750000,28.125,,,,2019-05-01,6,2019,5,1
3,2019-05-01 18:00:00+00:00,501,357.250000,117.750000,156.500000,27.700,,,,2019-05-01,18,2019,5,1
4,2019-05-02 14:00:00+00:00,501,194.500000,73.250000,79.250000,23.775,,,,2019-05-02,14,2019,5,2
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
865213,2022-11-14 16:00:00+00:00,463,119.510531,54.631625,42.342925,,,,,2022-11-14,16,2022,1,14
865214,2022-11-14 16:00:00+00:00,672,99.628500,52.838657,30.597570,,,,,2022-11-14,16,2022,1,14
865215,2022-11-14 17:00:00+00:00,463,145.496627,60.319720,55.533760,,,,,2022-11-14,17,2022,1,14
865216,2022-11-14 16:00:00+00:00,203,55.781782,31.465938,15.841056,,,,,2022-11-14,16,2022,1,14


In [13]:
print(aqdc.year.dtype)

int64


In [14]:
no2dt.dtypes

site_id        int64
year           int64
conc_ugm3    float64
count          int64
dtype: object

In [15]:
# data for inserting as dicts
aqdc_payload = aqdc.to_dict(orient = 'records')
no2dt_payload = no2dt.to_dict(orient = 'records')
aqms_payload = aqms.to_dict(orient = 'records')

In [16]:
db = Database('aq_data.db', recreate = True)

In [17]:
db['no2-diffusion-tube-data'].create({
    'site_id':int,
    'year': int,
    'conc_ugm3':float,
    'count':int
},
    pk = ['site_id', 'year']
)

<Table no2-diffusion-tube-data (site_id, year, conc_ugm3, count)>

In [18]:
db['air-quality-monitoring-sites'].create(
{
    'location':str,
   'site_id':int,
   'easting':int,
   'northing':int,
   'current':int,
   'pollutants':str,
   'instrumenttype':str,
   'exposure':int,
   'tube_kerb_distance_m':float,
   'rec_kerb_distance_m':float,
   'comments':str,
   'aqma':int,
   'detailed_location':str,
   'sample_height':float,
   'elevation':float,
   'grid_id':float,
   'locationclass':str,
   'photopath':str,
   'datestart':str,
   'dateend':str,
   'description':float,
   'colocated':int,
   'duplicate_triplicate':str,
   'laqm_locationclass':str,
   'latitude':float,
   'longitude':float
},
    pk = ['site_id']
)

<Table air-quality-monitoring-sites (location, site_id, easting, northing, current, pollutants, instrumenttype, exposure, tube_kerb_distance_m, rec_kerb_distance_m, comments, aqma, detailed_location, sample_height, elevation, grid_id, locationclass, photopath, datestart, dateend, description, colocated, duplicate_triplicate, laqm_locationclass, latitude, longitude)>

In [19]:
db["air-quality-data-continuous"].create({
   "site_id": int,
   "date_time": str,
   "date": str,
   "year":int,
   "month": int,
   "day_of_month":int,
   "hour":int,
   "nox": float,
   "no2": float,
   "no": float,
   "pm10": float,
   "pm25": float,
   "temp": float,
   "rh": float
    },
    pk = ("date_time", "site_id")
)

<Table air-quality-data-continuous (site_id, date_time, date, year, month, day_of_month, hour, nox, no2, no, pm10, pm25, temp, rh)>

In [20]:
# table.add_foreign_key(column, other_table, other_column)

db['air-quality-data-continuous'].add_foreign_key('site_id', 'air-quality-monitoring-sites', 'site_id')
db['air-quality-monitoring-sites'].add_foreign_key('site_id', 'air-quality-data-continuous', 'site_id')
db['no2-diffusion-tube-data'].add_foreign_key('site_id', 'air-quality-monitoring-sites', 'site_id')


<Table no2-diffusion-tube-data (site_id, year, conc_ugm3, count)>

In [21]:
db['no2-diffusion-tube-data'].insert_all(no2dt_payload)

<Table no2-diffusion-tube-data (site_id, year, conc_ugm3, count)>

In [22]:
db['air-quality-monitoring-sites'].insert_all(aqms_payload)

<Table air-quality-monitoring-sites (location, site_id, easting, northing, current, pollutants, instrumenttype, exposure, tube_kerb_distance_m, rec_kerb_distance_m, comments, aqma, detailed_location, sample_height, elevation, grid_id, locationclass, photopath, datestart, dateend, description, colocated, duplicate_triplicate, laqm_locationclass, latitude, longitude)>

In [23]:
db['air-quality-data-continuous'].insert_all(aqdc_payload)

<Table air-quality-data-continuous (site_id, date_time, date, year, month, day_of_month, hour, nox, no2, no, pm10, pm25, temp, rh)>

In [24]:
print(db.schema)

CREATE TABLE [no2-diffusion-tube-data] (
   [site_id] INTEGER,
   [year] INTEGER,
   [conc_ugm3] FLOAT,
   [count] INTEGER,
   PRIMARY KEY ([site_id], [year]),
   FOREIGN KEY([site_id]) REFERENCES [air-quality-monitoring-sites]([site_id])
);
CREATE TABLE [air-quality-monitoring-sites] (
   [location] TEXT,
   [site_id] INTEGER PRIMARY KEY,
   [easting] INTEGER,
   [northing] INTEGER,
   [current] INTEGER,
   [pollutants] TEXT,
   [instrumenttype] TEXT,
   [exposure] INTEGER,
   [tube_kerb_distance_m] FLOAT,
   [rec_kerb_distance_m] FLOAT,
   [comments] TEXT,
   [aqma] INTEGER,
   [detailed_location] TEXT,
   [sample_height] FLOAT,
   [elevation] FLOAT,
   [grid_id] FLOAT,
   [locationclass] TEXT,
   [photopath] TEXT,
   [datestart] TEXT,
   [dateend] TEXT,
   [description] FLOAT,
   [colocated] INTEGER,
   [duplicate_triplicate] TEXT,
   [laqm_locationclass] TEXT,
   [latitude] FLOAT,
   [longitude] FLOAT,
   FOREIGN KEY([site_id]) REFERENCES [air-quality-data-continuous]([site_id])
);

This section sets up spatial index to enable spatial operations on the data

In [25]:
conn = sqlite3.connect("aq_data.db")
# Lead the spatialite extension:
conn.enable_load_extension(True)

In [26]:
conn.load_extension("/usr/lib/x86_64-linux-gnu/mod_spatialite")

In [27]:
# Initialize spatial metadata for this database:
conn.execute("select InitSpatialMetadata(1)")
# Add a geometry column called point_geom to our museums table:
conn.execute(
    "SELECT AddGeometryColumn('air-quality-monitoring-sites', 'point_geom', 4326, 'POINT', 2);"
)

<sqlite3.Cursor at 0x7f5cb2138260>

In [28]:
# Now update that geometry column with the lat/lon points
conn.execute(
    """
    UPDATE 'air-quality-monitoring-sites' SET
    point_geom = GeomFromText('POINT('||"longitude"||' '||"latitude"||')',4326);
"""
)

<sqlite3.Cursor at 0x7f5cb21383b0>

In [29]:
# Now add a spatial index to that column
conn.execute(
    'select CreateSpatialIndex("air-quality-monitoring-sites", "point_geom");'
)

<sqlite3.Cursor at 0x7f5cb2138490>

In [30]:
# If you don't commit your changes will not be persisted:
conn.commit()
conn.close()

Now set up some Views for easy access to commonly desired snapshots of the data

In [31]:
if db['no2_diffusion_tube_locations_vw'].exists():
    db['no2_diffusion_tube_locations_vw'].drop()

In [32]:
db.create_view("no2_diffusion_tube_locations_vw", """
SELECT
    'no2-diffusion-tube-data'.site_id,
    location,
    year,
    round(conc_ugm3, 1) conc_ugm3,
    easting,
    northing,
    round(latitude, 6) latitude,
    round(longitude, 6) longitude,
    count
FROM 'air-quality-monitoring-sites'
INNER JOIN 'no2-diffusion-tube-data'
ON 'air-quality-monitoring-sites'.site_id = 'no2-diffusion-tube-data'.site_id
ORDER BY 'no2-diffusion-tube-data'.site_id, year
""")

<Database <sqlite3.Connection object at 0x7f5cd0359e40>>

In [33]:
if db["annual_mean_continuous_vw"].exists():
    db["annual_mean_continuous_vw"].drop()

In [34]:
db.create_view("annual_mean_continuous_vw", """
SELECT 
    'air-quality-data-continuous'.site_id,
    year,
    ROUND(AVG(no2), 1) mean_no2,
    ROUND(AVG(pm10), 1) mean_pm10,
    ROUND(AVG(pm25), 1) as mean_pm25
FROM 'air-quality-data-continuous'
GROUP BY 'air-quality-data-continuous'.site_id, year
ORDER BY year, 'air-quality-data-continuous'.site_id
""")

<Database <sqlite3.Connection object at 0x7f5cd0359e40>>

In [47]:
if db['annual_mean_locations_vw'].exists():
    db['annual_mean_locations_vw'].drop()

In [48]:
#not working
db.create_view("annual_mean_locations_vw", """
SELECT 
    'annual_mean_continuous_vw'.site_id,
    'air-quality-monitoring-sites'.location,
    year,
    mean_no2,
    mean_pm10,
    mean_pm25,
    ROUND(latitude, 6) latitude,
    ROUND(longitude, 6) longitude
FROM 'annual_mean_continuous_vw'
INNER JOIN 'air-quality-monitoring-sites'
ON 'air-quality-monitoring-sites'.site_id = 'annual_mean_continuous_vw'.site_id
""")




<Database <sqlite3.Connection object at 0x7f5cd0359e40>>

In [38]:
# print(db.schema)

In [None]:
!ls -l

In [None]:
# for row in db['aqdc.db'].rows:
#     print(row)

In [None]:
# for row in db.query("select * from aqdc where siteid = 203"):
#     print(row)

In [22]:
db.table_names()

['no2dt', 'aqms', 'aqdc']

In [None]:
# print(db.schema)

In [None]:
con = sqlite3.connect('aq_data.db')
cur = con.cursor()

In [None]:
con.close()

In [None]:
query = """
SELECT *
FROM aqdc
WHERE site_id = 203
"""
df_aqdc = pd.read_sql_query(query, con, parse_dates = {'date_time':"%Y-%m-%dT%H:%M:%S%z"}) # parse the date col in read operation

In [None]:
df_aqdc['date_time']

In [None]:
query = """
SELECT *
FROM no2dt
WHERE site_id = 4 AND year = 2021
"""
df_no2dt = pd.read_sql_query(query, con)

In [None]:
df_no2dt

In [None]:
query = """
SELECT *
FROM aqms
WHERE instrumenttype = "Continuous (Reference)" AND current
"""
df_aqms = pd.read_sql_query(query, con)

In [None]:
df_aqms

In [None]:
# con.close()

In [None]:
query = "SELECT* FROM aqdc"
df_aqdc = pd.read_sql_query(query,  con)

In [None]:
df_aqdc

In [None]:
!ls -l

In [None]:
url_aqdc_upsert = 'nov2-5.csv'

In [None]:
aqdc_upsert = read_aqdc_from_url(url_aqdc_upsert)

In [None]:
aqdc_upsert

In [None]:
aqdc_upsert_payload = aqdc_upsert.to_dict(orient = 'records')

In [None]:
db['aqdc'].upsert_all(aqdc_upsert_payload, pk = ['date_time','site_id'])

In [None]:
query = """
SELECT *
FROM aqdc
"""
df_aqdc = pd.read_sql_query(query, con, parse_dates = {'date_time':"%Y-%m-%dT%H:%M:%S%z"}) # parse the date col in read operation

In [None]:
df_aqdc