# SQLite to PostgreSQL via Pandas Dataframe

## Imports

In [1]:
import pathlib
import wget
import zipfile

import pandas as pd
from sqlalchemy import create_engine

## Source URL and Download Directory 

In [2]:
# Source URL
url = r"https://www.opengeodata.nrw.de/produkte/umwelt_klima/wasser/grundwasser/hygrisc/OpenHygrisC_gw-messstellen-messwerte_EPSG25832_Sqlite.zip"

# Target directory
datapath = "../data/OpenGeodata.NRW/OpenHygrisC/"


## Download zip file if necessary.

In [3]:
zipfilename = url.split(r"/")[-1]

print(f"Downloading {zipfilename} to directory {datapath}")

p = pathlib.Path(datapath)
p.mkdir(parents=True,exist_ok=True)

f =  pathlib.Path(datapath + zipfilename)

if not f.is_file():
    wget.download(url, out=datapath)
else:
    print(f"Warning: {f} already exists. Skip download.")

Downloading OpenHygrisC_gw-messstellen-messwerte_EPSG25832_Sqlite.zip to directory ../data/OpenGeodata.NRW/OpenHygrisC/


## Unzip if necessary.

In [4]:


print(f"unzip {zipfilename}")

sqlitepathname = datapath + f.stem

if not pathlib.Path(sqlitepathname).exists():
    with zipfile.ZipFile(f, 'r') as zip_ref:
        zip_ref.extractall(sqlitepathname)
else:
    print(f"Warning: directory {sqlitepathname} already exists. Skip unzip.")

unzip OpenHygrisC_gw-messstellen-messwerte_EPSG25832_Sqlite.zip


## What is in the unzipped folder?

In [5]:
p = pathlib.Path(sqlitepathname)
list(p.glob("*"))

[PosixPath('../data/OpenGeodata.NRW/OpenHygrisC/OpenHygrisC_gw-messstellen-messwerte_EPSG25832_Sqlite/katalog_stoff.sqlite'),
 PosixPath('../data/OpenGeodata.NRW/OpenHygrisC/OpenHygrisC_gw-messstellen-messwerte_EPSG25832_Sqlite/opendata.gw_messstelle.sqlite'),
 PosixPath('../data/OpenGeodata.NRW/OpenHygrisC/OpenHygrisC_gw-messstellen-messwerte_EPSG25832_Sqlite/katalog_gemeinde.sqlite'),
 PosixPath('../data/OpenGeodata.NRW/OpenHygrisC/OpenHygrisC_gw-messstellen-messwerte_EPSG25832_Sqlite/opendata.gw_chemischer_messwert.sqlite')]

## Assign file names to variables.

In [6]:
chem_path       = next(p.glob("*chem*"))  # Chemistry
stoff_path      = next(p.glob("*stoff*")) # Substance, physico-chemical Quantity, Parameter
messstelle_path = next(p.glob("*messstelle*")) # GW Well, Station
gemeinde_path   = next(p.glob("*gemeinde*")) # Municipality

print(f"{messstelle_path.name = }")
print(f"{chem_path.name       = }")
print(f"{stoff_path.name      = }")
print(f"{gemeinde_path.name   = }")


messstelle_path.name = 'opendata.gw_messstelle.sqlite'
chem_path.name       = 'opendata.gw_chemischer_messwert.sqlite'
stoff_path.name      = 'katalog_stoff.sqlite'
gemeinde_path.name   = 'katalog_gemeinde.sqlite'


## Focus on groundwater quality data (= chemistry) only.

In [7]:
str(chem_path)

'../data/OpenGeodata.NRW/OpenHygrisC/OpenHygrisC_gw-messstellen-messwerte_EPSG25832_Sqlite/opendata.gw_chemischer_messwert.sqlite'

In [8]:
import sqlalchemy
sqlite_uri = r"sqlite:///" + str(chem_path)
sqlite_uri



'sqlite:///../data/OpenGeodata.NRW/OpenHygrisC/OpenHygrisC_gw-messstellen-messwerte_EPSG25832_Sqlite/opendata.gw_chemischer_messwert.sqlite'

## Use Jupyter SQL Magic for a quick look into the database.

In [9]:
%reload_ext sql
%config SqlMagic.autolimit = 10
#%sql sqlite:///../data/OpenGeodata.NRW/OpenHygrisC/OpenHygrisC_gw-messstellen-messwerte_EPSG25832_Sqlite/opendata.gw_chemischer_messwert.sqlite
#%sql sqlite_uri
from sqlalchemy import create_engine

engine = create_engine(sqlite_uri)
%sql engine

In [10]:
%%sql
SELECT * FROM sqlite_master

type,name,tbl_name,rootpage,sql
table,opendata_gw_chemischer_messwert,opendata_gw_chemischer_messwert,2,"CREATE TABLE ""opendata_gw_chemischer_messwert"" (""sl_nr"" integer, ""messstelle_id"" varchar(9), ""pna_id"" varchar(15), ""datum_pn"" text, ""stoff_nr"" integer, ""stoff"" varchar(80), ""probengut"" varchar(30), ""messergebnis_c"" varchar(30), ""messergebnis_hinweis"" varchar(50), ""bestimmungsgrenze"" real(10,5), ""masseinheit_nr"" integer, ""masseinheit"" varchar(9), ""trennverfahren"" varchar(32), ""verfahren"" varchar(255), ""vor_ort"" text, ""herkunft"" varchar(20), ""freigabe"" varchar(4), ""aktual_dat"" text, ""erstell_dat"" text, ""gemeinde_id"" varchar(8))"


In [11]:
%config SqlMagic.autolimit = 10
%sql select count(*) from opendata_gw_chemischer_messwert

count(*)
3671913


## Read the data into a Pandas dataframe.

In [12]:
table_name = "opendata_gw_chemischer_messwert"

In [13]:
%%time
df_in = pd.read_sql_table(table_name, sqlite_uri)

CPU times: user 18.3 s, sys: 1.11 s, total: 19.4 s
Wall time: 19.4 s


### Have a quick look.

In [14]:
df_in.dtypes

sl_nr                     int64
messstelle_id            object
pna_id                   object
datum_pn                 object
stoff_nr                  int64
stoff                    object
probengut                object
messergebnis_c           object
messergebnis_hinweis     object
bestimmungsgrenze       float64
masseinheit_nr            int64
masseinheit              object
trennverfahren           object
verfahren                object
vor_ort                  object
herkunft                 object
freigabe                 object
aktual_dat               object
erstell_dat              object
gemeinde_id              object
dtype: object

In [15]:
df_in.head(3)

Unnamed: 0,sl_nr,messstelle_id,pna_id,datum_pn,stoff_nr,stoff,probengut,messergebnis_c,messergebnis_hinweis,bestimmungsgrenze,masseinheit_nr,masseinheit,trennverfahren,verfahren,vor_ort,herkunft,freigabe,aktual_dat,erstell_dat,gemeinde_id
0,2903561,59620687,5/2005/4599,20051018,1164,Zink,Grundwasser,22.0,,,10,µg/l,Gesamtgehalt,DIN 38406-E22 MAERZ 1988,,HYGC_BR-AR,ja,20051205,20051205,5962024
1,2903564,59620687,5/2005/4599,20051018,1061,pH-Wert,Grundwasser,6.8,,,23,-,Gesamtgehalt,DIN 38404-C5 JANUAR 1984,ja,HYGC_BR-AR,ja,20051205,20051205,5962024
2,2903565,59620687,5/2005/4599,20051018,1011,Wassertemperatur,Grundwasser,12.8,,,4,°C,Gesamtgehalt,DIN 38404-C4 DEZEMBER 1976,ja,HYGC_BR-AR,ja,20051205,20051205,5962024


### Problems

#### Problem 1: The measurement column cannot be converted to float.

In [16]:
df_in["messergebnis_c"].astype(float)

ValueError: could not convert string to float: '<1.00000'

#### Problem 2: The date columns are not of dtype datetime.

In [18]:
df_in.iloc[1234567:1234567+3][["datum_pn", "aktual_dat", "erstell_dat"]]

Unnamed: 0,datum_pn,aktual_dat,erstell_dat
1234567,20060626,20060913,20060913
1234568,20060626,20060913,20060913
1234569,20060626,20060913,20060913


In [19]:
df_in.iloc[1234567:1234567+3][["datum_pn", "aktual_dat", "erstell_dat"]].dtypes

datum_pn       object
aktual_dat     object
erstell_dat    object
dtype: object

## DATA ENGINEERING! Create a copy and do the data engineering to clean the data.

In [20]:
%%time
df_qual = df_in.copy()

CPU times: user 434 ms, sys: 45.2 ms, total: 479 ms
Wall time: 477 ms


## Proposed translation of column names:

In [21]:
# Define the dictionary
column_dict = {
    'sl_nr': 'sl_nr', # unique_numeric_index
    'messstelle_id': 'station_id', # groundwater measuring station or monitoring well
    'pna_id': 'sampling_id', # unique ID of the water sample
    'datum_pn': 'sampling_date', # date of sampling
    'stoff_nr': 'substance_id', # numerical ID of the parameter measured, e.g. nitrate of pH
    'stoff': 'substance', # measurand, not just substance but also physical parameters like temp, pH, etc. 
    'probengut': 'sample_type', # the type of the sample, e.g. groundwater, soil, etc.
    'messergebnis_c': 'measurement_result_c',
    'messergebnis_hinweis': 'measurement_note',
    'bestimmungsgrenze': 'detection_limit',
    'masseinheit_nr': 'unit_id',
    'masseinheit': 'unit',
    'trennverfahren': 'separation_method',
    'verfahren': 'method',
    'vor_ort': 'on_site',
    'herkunft': 'origin',
    'freigabe': 'release',
    'aktual_dat': 'update_date',
    'erstell_dat': 'creation_date',
    'gemeinde_id': 'municipality_id'
}

# Rename the columns
df_qual.rename(columns=column_dict, inplace=True)


In [22]:
# Data types
df_qual.dtypes

sl_nr                     int64
station_id               object
sampling_id              object
sampling_date            object
substance_id              int64
substance                object
sample_type              object
measurement_result_c     object
measurement_note         object
detection_limit         float64
unit_id                   int64
unit                     object
separation_method        object
method                   object
on_site                  object
origin                   object
release                  object
update_date              object
creation_date            object
municipality_id          object
dtype: object

## Convert date columns from dtype object to datetime64.

In [23]:
%%time
date_cols = ["sampling_date", "update_date", "creation_date"]
print("Start conversion ... ")
for d in date_cols:
    if df_qual[d].dtype != "datetime64[ns]":
        df_qual[d] = pd.to_datetime(df_qual[d].astype(str), format='%Y%m%d')
print("... done")

Start conversion ... 
... done
CPU times: user 2.42 s, sys: 65.2 ms, total: 2.49 s
Wall time: 2.48 s


## The remaining object columns contain strings.

In [24]:
df_qual.dtypes

sl_nr                            int64
station_id                      object
sampling_id                     object
sampling_date           datetime64[ns]
substance_id                     int64
substance                       object
sample_type                     object
measurement_result_c            object
measurement_note                object
detection_limit                float64
unit_id                          int64
unit                            object
separation_method               object
method                          object
on_site                         object
origin                          object
release                         object
update_date             datetime64[ns]
creation_date           datetime64[ns]
municipality_id                 object
dtype: object

## The tricky part: Engineering of column measurement_result_c

In [25]:
%%time
idx_is_less_than = df_qual["measurement_result_c"].str.startswith("<")
idx_is_greater_than = df_qual["measurement_result_c"].str.startswith(">")
idx_is_float = ~(idx_is_less_than | idx_is_greater_than)

CPU times: user 1.92 s, sys: 41.4 ms, total: 1.96 s
Wall time: 1.96 s


In [26]:
%%time
df_qual[idx_is_less_than].head(3)

CPU times: user 311 ms, sys: 18.5 ms, total: 330 ms
Wall time: 329 ms


Unnamed: 0,sl_nr,station_id,sampling_id,sampling_date,substance_id,substance,sample_type,measurement_result_c,measurement_note,detection_limit,unit_id,unit,separation_method,method,on_site,origin,release,update_date,creation_date,municipality_id
161523,17716627,91163705,0/2016/1,2016-05-11,1521,"Organischer Kohlenstoff, gelöst",Grundwasser,<1.00000,Konzentration zu gering zur Bestimmung,1.0,7,mg/l,Membranfilter,DIN EN 1484 - DE - H03 - 1,,LIMS_LANUV,ja,2021-09-11,2021-09-11,5974016
161681,17716638,91163705,0/2016/1,2016-05-11,1262,Gesamtphosphat-Phosphor,Grundwasser,<0.01000,Konzentration zu gering zur Bestimmung,0.01,7,mg/l,Gesamtgehalt,"Analog DIN EN ISO 6878, Abschnitt 7 - AD - D11...",,LIMS_LANUV,ja,2021-09-11,2021-09-11,5974016
161682,17716639,91163705,0/2016/1,2016-05-11,1261,Gesamt-Phosphat,Grundwasser,<0.03000,Konzentration zu gering zur Bestimmung,0.03,7,mg/l,Gesamtgehalt,"Analog DIN EN ISO 6878, Abschnitt 7 - AD - D11...",,LIMS_LANUV,ja,2021-09-11,2021-09-11,5974016


In [27]:
%%time
df_qual[idx_is_greater_than].head(3)

CPU times: user 1.02 ms, sys: 409 µs, total: 1.43 ms
Wall time: 908 µs


Unnamed: 0,sl_nr,station_id,sampling_id,sampling_date,substance_id,substance,sample_type,measurement_result_c,measurement_note,detection_limit,unit_id,unit,separation_method,method,on_site,origin,release,update_date,creation_date,municipality_id
209713,17552890,60080164,0/2017/90347,2017-05-10,1029,"Trübung, Messg. d. gestreuten Strahlung",Grundwasser,>1.00000,,,57,FNU,Gesamtgehalt,"DIN EN ISO 7027, Abschnitt 6 - DO - C02 - 3",ja,LIMS_LANUV,ja,2021-06-19,2021-06-19,5513000
308091,1263499,10420484,1/2004/90387,2004-10-13,1249,Ammonium-Stickstoff,Grundwasser,>0.03875,,,7,mg/l,Gesamtgehalt,,,HYGC_BR-K,ja,2005-01-27,2005-01-27,5358036
395121,2016179,24170070,2/2003/90776,2003-11-13,1695,Coliforme Keime bei (36+-2) Grad C,Grundwasser,>1.00000,,,31,1/100ml,Nach Laborjournal,,,HYGC_BR-DET,ja,2004-02-20,2004-02-20,5774040


In [28]:
%%time
df_qual[idx_is_float].head(3)

CPU times: user 332 ms, sys: 40.2 ms, total: 372 ms
Wall time: 370 ms


Unnamed: 0,sl_nr,station_id,sampling_id,sampling_date,substance_id,substance,sample_type,measurement_result_c,measurement_note,detection_limit,unit_id,unit,separation_method,method,on_site,origin,release,update_date,creation_date,municipality_id
0,2903561,59620687,5/2005/4599,2005-10-18,1164,Zink,Grundwasser,22.0,,,10,µg/l,Gesamtgehalt,DIN 38406-E22 MAERZ 1988,,HYGC_BR-AR,ja,2005-12-05,2005-12-05,5962024
1,2903564,59620687,5/2005/4599,2005-10-18,1061,pH-Wert,Grundwasser,6.8,,,23,-,Gesamtgehalt,DIN 38404-C5 JANUAR 1984,ja,HYGC_BR-AR,ja,2005-12-05,2005-12-05,5962024
2,2903565,59620687,5/2005/4599,2005-10-18,1011,Wassertemperatur,Grundwasser,12.8,,,4,°C,Gesamtgehalt,DIN 38404-C4 DEZEMBER 1976,ja,HYGC_BR-AR,ja,2005-12-05,2005-12-05,5962024


### Check if all cases were covered by the boolean indices.

This test checks if all rows are in any of the three sets `is_less_than`, `is_greater_than`, `is_float`. The two logial propositions are redundant:

$$ 
\mathrm{idx}_\mathrm{true} = (\mathrm{idx}_<) \vee  (\mathrm{idx}_>) \vee (\mathrm{idx}_=) \\
\mathrm{idx}_\mathrm{false} = (\mathrm{idx}_<) \wedge  (\mathrm{idx}_>) \wedge (\mathrm{idx}_=)
$$

The sets $\mathrm{idx}_<$, $\mathrm{idx}_>$, and $\mathrm{idx}_=$ are the Boolean indices being true for all value strings indictating the value is below the detection limit or above the measurement range of the analytical instrument/method, or is directly convertible to float, repectively.

In [29]:
idx_check_true = idx_is_float | idx_is_less_than | idx_is_greater_than
idx_check_true.value_counts()

measurement_result_c
True    3671913
Name: count, dtype: int64

In [30]:
(num_rows, num_cols) = df_qual.shape
assert num_rows == idx_check_true.value_counts()[True], \
"The number of rows does not match the number of True values in idx_check_true."

In [31]:
idx_check_false = idx_is_float & idx_is_less_than & idx_is_greater_than
idx_check_false.value_counts()

measurement_result_c
False    3671913
Name: count, dtype: int64

In [32]:
(num_rows, num_cols) = df_qual.shape
assert num_rows == idx_check_false.value_counts()[False], \
"The number of rows does not match the number of False values in idx_check_false."

## Seperate the number strings in all cases and convert them to numbers.

Create two new columns `value` and `limit`. The former stores the value in float format, the latter contains one the three characters `<`, `>`, `=` indicating whether the lower or upper limit was exceeded or not.

In [33]:
df_qual["value"] = df_qual["measurement_result_c"]

In [34]:
%%time
df_qual["value"] = df_qual["value"].replace({'<':''}, regex=True)
df_qual["value"] = df_qual["value"].replace({'>':''}, regex=True)
df_qual["value"] = df_qual["value"].astype(float)

CPU times: user 4.07 s, sys: 101 ms, total: 4.17 s
Wall time: 4.17 s


In [35]:
%%time
df_qual.loc[idx_is_less_than, "limit"] = "<"
df_qual.loc[idx_is_greater_than, "limit"] = ">"
df_qual.loc[idx_is_float, "limit"] = "="

CPU times: user 76.5 ms, sys: 4.04 ms, total: 80.5 ms
Wall time: 79.2 ms




## Set index of dataframe df_qual.

In [36]:
df_qual.set_index("sl_nr", inplace = True)

## Final check.

In [37]:
df_qual.dtypes

station_id                      object
sampling_id                     object
sampling_date           datetime64[ns]
substance_id                     int64
substance                       object
sample_type                     object
measurement_result_c            object
measurement_note                object
detection_limit                float64
unit_id                          int64
unit                            object
separation_method               object
method                          object
on_site                         object
origin                          object
release                         object
update_date             datetime64[ns]
creation_date           datetime64[ns]
municipality_id                 object
value                          float64
limit                           object
dtype: object

In [38]:
df_qual.head(3)

Unnamed: 0_level_0,station_id,sampling_id,sampling_date,substance_id,substance,sample_type,measurement_result_c,measurement_note,detection_limit,unit_id,...,separation_method,method,on_site,origin,release,update_date,creation_date,municipality_id,value,limit
sl_nr,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
2903561,59620687,5/2005/4599,2005-10-18,1164,Zink,Grundwasser,22.0,,,10,...,Gesamtgehalt,DIN 38406-E22 MAERZ 1988,,HYGC_BR-AR,ja,2005-12-05,2005-12-05,5962024,22.0,=
2903564,59620687,5/2005/4599,2005-10-18,1061,pH-Wert,Grundwasser,6.8,,,23,...,Gesamtgehalt,DIN 38404-C5 JANUAR 1984,ja,HYGC_BR-AR,ja,2005-12-05,2005-12-05,5962024,6.8,=
2903565,59620687,5/2005/4599,2005-10-18,1011,Wassertemperatur,Grundwasser,12.8,,,4,...,Gesamtgehalt,DIN 38404-C4 DEZEMBER 1976,ja,HYGC_BR-AR,ja,2005-12-05,2005-12-05,5962024,12.8,=


In [39]:
%%time
df_qual.to_csv("measure.csv", sep=";")

CPU times: user 17.6 s, sys: 570 ms, total: 18.2 s
Wall time: 18.2 s


In [None]:


# PostgreSQL connection
pg_engine = create_engine("postgresql://geo_master:xxxxxx@localhost/geo")

# Write the Polars DataFrame to PostgreSQL
df.write_sql(pg_engine, 'messungen', schema='gw', if_exists='replace')

# Close connections
sqlite_conn.dispose()
postgres_engine.dispose()
