# NRW Groundwater Data - OpenHygrisC Data Engineering

Data from <br>
**[LANUV](https://www.lanuv.nrw.de/): Landesamt für Natur, Umwelt und Verbraucherschutz Nordrhein-Westfalen** <br>
(State Office for Nature, Environment and Consumer Protection NRW)

* LANUV groundwater web pages: https://www.lanuv.nrw.de/umwelt/wasser/grundwasser

Groundwater data: https://www.lanuv.nrw.de/umwelt/wasser/grundwasser/grundwasserstand/grundwasserdaten-online

ELWAS-WEB NRW - Infos zu den Grundwasserkörpern (YouTube): https://www.youtube.com/watch?v=4wFKIu622rk

In the database HygrisC the LANUV provides groundwater quality and quantity data for most groundwater wells in NRW. The groundwater wells are partly owned and operated by NRW, partly by other parties. 
The measurement intervals are usually annual. Some groundwater well are sampled more frequently. 

WRRL: EU Wasserrahmenrichtlinie, EU Water Framework Directive

The quality data is based on chemical analyses of groundwater samples. The quantity data is based on groundwater level measurement.


OpenHygrisC Data: https://www.opengeodata.nrw.de/produkte/umwelt_klima/wasser/grundwasser/hygrisc/

**Download the NRW groundwater data zip file**:
<br>
https://www.opengeodata.nrw.de/produkte/umwelt_klima/wasser/grundwasser/hygrisc/OpenHygrisC_gw-messstellen-messwerte_EPSG25832_CSV.zip

The zip archive contains gw station info, a catalog of possible physico-chemical analysis parameters, and the measured data. 

## Coordinate Obfuscation 

Some coordinate data in the gw station info reveal difficulties. The coordinate reference system (CRS) used is the projected metric based 
EPSG:25832 ( ETRS89 / UTM zone 32N). 
The dataframe coordinate columns `e32` (easting) and `n32` (northing) are of data type object (not numeric). 

The resolution is 1m but many coordinates are obscurred because of privacy issues to a precision of 100m. A few coordinates are missing, i.e. either empty (nan) or filled with `xx`.


The coordinate columns e32 and n32 are of data type object/string. Four cases must be distinguished:

* Most strings are in a regular number format and can be converted to float right away (case (1) and (2) in the table)
* Other coordinate strings are obfuscated by replacing the two least significant decimal places with the characters "xx". This usually happens when a groundwater well is on private property. The coordinates are made less precise to respect privacy. The remaining coordinate information is still usable. The precision is limited to 100 meters. The uncertainty is +- 50m. (case (3) in the table)
* In some cases no coordinate infomation is given at all. In these cases the coordinate strings are just "xx". (case (4) in the table)
* In a very few cases the coordinate columns are empty, i.e. NaN (Null). (case (5) in the table)

The following table shows representative cases.


| case |   messstelle_id | e32    | n32     | grundstueck   |
|-----:|----------------:|-------:|--------:|:--------------|
|  (1) |        10000094 | 292868 | 5632572 | oeffentlich   |
|  (2) |        10000045 | 299399 | 5650595 | privat        |
|  (3) |        10000033 | 3070xx | 56583xx | privat        |
|  (4) |        47247101 | xx     | xx      |               |
|  (5) |        79921802 | nan    | nan     |               |

Case (1) and (2) have coordinate strings which can be immediately converted to integer or float with 1m precision. Case (3) shows coordinate obfuscation to a precision to 100m. The digits representing tens and ones are anonymized. Case (4) and (5) show useless coordinate information.  

How to deal with non-anonymized data:

"299399" (string, prec. 1) => 299399.0 (float) 

How to deal with anonymization:

307000 <= 3070xx <= 307099

"3070xx" (string, prec. 100) => 307050 (float, +- 50m) 



In [1]:
#!conda env list

## Correct wrong `PROJ_LIB` environment variable value 

This problem seems to occur on Windows when using the OSGeo4W installer. The environment variable must point to a user specific directory and according to the activated conda environment, e.g. `PROJ_LIB=C:\Users\<username>\Anaconda3\envs\geo\Library\share\proj` 

In [1]:
import os
os.environ['proj_lib']

'C:\\Program Files\\PostgreSQL\\13\\share\\contrib\\postgis-3.4\\proj'

In [2]:
# Correct wrong environment variable value occurring when using OSGeo4W installer
conda_prefix = os.environ['conda_prefix']
print(f"CONDA_PREFIX: {conda_prefix:s}")
os.environ['proj_lib'] = conda_prefix + r"\Library\share\proj"
proj_lib = os.environ['proj_lib']
print(f"New env var value: \nPROJ_LIB={proj_lib:s}")

CONDA_PREFIX: C:\Users\shrey\anaconda3\envs\geo
New env var value: 
PROJ_LIB=C:\Users\shrey\anaconda3\envs\geo\Library\share\proj


## Imports

In [3]:
# CORRECT THE WRONG PYPROJ PATH FIRST! OTHERWISE GEOPANDAS DOES NOT LOAD!
import pandas as pd
import geopandas as gpd

## Data Directories and Files

In [4]:
#data_in_dir = r"C:/users/shrey/Desktop/Geodata_Management/EE_8136_Geodata_WS2023_1_EXAM-Group-C/gdms0000_Final_Assignment/Task_3/OpenHyPE-main/OpenHyPE-main/data/OpenGeodata.NRW/OpenHygrisC/OpenHygrisC_gw-messstelle_EPSG25832_CSV/"
data_in_dir = r"C:\Users\shrey\Desktop\Task_3\OpenHyPE-main\OpenHyPE-main\data\OpenGeodata.NRW\OpenHygrisC\OpenHygrisC_gw-messstelle_EPSG25832_CSV\OpenHygrisC_gw-chemischer-messwert_2020-2029_EPSG25832_CSV"
for elt in os.listdir(data_in_dir): print(elt)

OpenHygrisC_gw-chemischer-messwert_2020-2029.csv


## GW Station Data


In [5]:
gw_station_fname = r"\OpenHygrisC_gw-chemischer-messwert_2020-2029.csv"
gw_station_pfname = data_in_dir + gw_station_fname
print(f"Stationsdaten:  {gw_station_pfname:s}")

Stationsdaten:  C:\Users\shrey\Desktop\Task_3\OpenHyPE-main\OpenHyPE-main\data\OpenGeodata.NRW\OpenHygrisC\OpenHygrisC_gw-messstelle_EPSG25832_CSV\OpenHygrisC_gw-chemischer-messwert_2020-2029_EPSG25832_CSV\OpenHygrisC_gw-chemischer-messwert_2020-2029.csv


In [6]:
df = pd.read_csv(gw_station_pfname, sep = ";", encoding="cp1252", index_col=["messstelle_id"])

In [7]:
df.sort_index(ascending=True, inplace=True)

In [8]:
num_total = df.shape[0]
df.shape

(547538, 12)

In [9]:
print(f"{pd.get_option('display.max_columns') = }")
pd.set_option("display.max_columns", None)
print(f"{pd.get_option('display.max_columns') = }")

pd.get_option('display.max_columns') = 20
pd.get_option('display.max_columns') = None


In [10]:
df.head()

Unnamed: 0_level_0,sl_nr,messstelle_sl_nr,datum_pn,stoff_nr,probengut,messergebnis_c,messergebnis_hinweis,bestimmungsgrenze,masseinheit,trennverfahren,verfahren,vor_ort
messstelle_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1
10099839,21197816,73160,2023-04-20,1264,Grundwasser,"<0,02000",Konzentration zu gering zur Bestimmung ...,2000.0,mg/l,Gesamtgehalt,DIN EN ISO 6878 (2004),
10099839,21197779,73160,2023-04-20,1247,Grundwasser,"<0,00000",Konzentration zu gering zur Bestimmung ...,0.0,mg/l,Gesamtgehalt,DIN ISO 15923-1 (2014),
10099839,21197778,73160,2023-04-20,1246,Grundwasser,"<0,01000",Konzentration zu gering zur Bestimmung ...,1000.0,mg/l,Gesamtgehalt,DIN ISO 15923-1 (2014),
10099839,21197777,73160,2023-04-20,1245,Grundwasser,1152000,...,,mg/l,Gesamtgehalt,DIN EN ISO 10304-1 (2009),
10099839,21197776,73160,2023-04-20,1244,Grundwasser,5100000,...,,mg/l,Gesamtgehalt,DIN EN ISO 10304-1 (2009),


In [11]:
df[df["grundstueck"]=="oeffentlich"].head()

KeyError: 'grundstueck'

## Challenge: Coordinates obfuscation

The coordinate columns e32 and n32 are of data type string. Four cases must be distinguished:

(1) Most strings are in a regular number format and can be converted to float right away.

(2) Other coordinate strings are obfuscated by replacing the two least significant digits with the characters "xx". This usually happens when a groundwater well is on private property. The coordinates are made less precise to respect privacy. The remaining coordinate information is still usable. The precision is limited to 100 meters. The uncertainty is +- 50m. 

(3) In some cases no coordinate infomation is given at all. In these cases the coordinate strings are just "xx".

(4) In a very few cases the coordinate columns are empty, i.e. NaN (Null).

In [None]:
# These four groundwater wells summarize the coordinate problems.
df_coord_problem=df.loc[[10000094, 10000045, 10000033, 47247101, 79921802],["e32","n32", "grundstueck"]]
df_coord_problem

In [None]:
# forma table as markdown
#from tabulate import tabulate
#print(tabulate(df_coord_problem, tablefmt="pipe", headers="keys"))

|   messstelle_id | e32    | n32     | grundstueck   |
|----------------:|:-------|:--------|:--------------|
|        10000094 | 292868 | 5632572 | oeffentlich   |
|        10000045 | 299399 | 5650595 | privat        |
|        10000033 | 3070xx | 56583xx | privat        |
|        47247101 | xx     | xx      |               |
|        79921802 | nan    | nan     |               |

**Boolean indexes are used to filter the data according to the cases (1) to (4).**

In [None]:
# Add column for precision
df["genau"] = 0

# (1) If the coord data is numeric then the precision is 1m
idx_coords_1m_prec = (df["e32"].str.isnumeric() == True)

# (3,4) Some stations don't have coordinates
# e32 and n32 strings are either NaN (Null) or "xx"
idx_coords_missing = (df["e32"].str.len() < 6) | (df["e32"].isnull() == True)

# (2) If coord data is avaliable but not numeric, then the numbers have been obscured with "XX" for the two least significant decimals.
idx_coords_100m_prec = ~idx_coords_missing &  ~(df["e32"].str.isnumeric() == True)


In [None]:
df[idx_coords_missing]

**Convert the strings to floats where possible. No data values are represented as negative numbers.**

In [None]:
df.loc[idx_coords_1m_prec,"e32num"] = df.loc[idx_coords_1m_prec,"e32"].astype(float)
df.loc[idx_coords_1m_prec,"n32num"] = df.loc[idx_coords_1m_prec,"n32"].astype(float)
df.loc[idx_coords_1m_prec, "genau"] = 1

In [None]:
df.loc[idx_coords_100m_prec,"e32num"] = (df.loc[idx_coords_100m_prec,"e32"].str[:-2]+"50").astype(float)
df.loc[idx_coords_100m_prec,"n32num"] = (df.loc[idx_coords_100m_prec,"n32"].str[:-2]+"50").astype(float)
df.loc[idx_coords_100m_prec, "genau"] = 100

In [None]:
df.loc[idx_coords_missing,"e32num"] = -999.9
df.loc[idx_coords_missing,"n32num"] = -999.9
df.loc[idx_coords_missing, "genau"] = -999

In [None]:
# check if all records have been matched
num_of_1m_prec = df[df["genau"] == 1].shape[0]
num_of_100m_prec = df[df["genau"] == 100].shape[0]
num_of_no_prec = df[df["genau"] == -999].shape[0]

num_check = num_of_1m_prec + num_of_100m_prec + num_of_no_prec

print(f"total num of recs:                        {num_total:6d}")
print(f"number of recs with 1m coord precision:   {num_of_1m_prec:6d}")
print(f"number of recs with 100m coord precision: {num_of_100m_prec:6d}")
print(f"number of recs with no coords:            {num_of_no_prec:6d}")
print(f"check sum:                                {num_check:6d}")

assert num_check == num_total, "ERROR. Mismatch in numbers of stations"


**Save the original string as well as the derived numeric columns to a CSV file for checking externally.**

In [None]:
df[["e32","e32num","n32","n32num","genau"]].to_csv("check.csv")
df[["e32","e32num","n32","n32num","genau"]]

## Geopandas

In [None]:
import geopandas as gpd
from shapely.geometry import Point

In [None]:
# remove records without coords
df2 = df[df["genau"] > 0]

In [None]:
df2.shape

In [None]:
%%time
gdf = gpd.GeoDataFrame(df2, geometry=gpd.points_from_xy(df2.e32num, df2.n32num), crs="EPSG:25832")

In [None]:
gdf.info()

In [None]:
gdf.head(3)

In [None]:
%%time

# This takes 90 secs on my computer!

#gdf.to_file("GW_Stations.gpkg", layer='GW Stations', driver="GPKG")

## PostGIS, Inline SQL Magic: `create schema gw`

To store the data in PostGIS/PostgreSQL it is recommended to create a dedicated database "schema" (a kind of name space) to separate relations (tables, views), stored procedures, etc. from the rest of the database. Schemata help to organize the tables and access privileges clearly. 


In [None]:
!conda install -c conda-forge ipython-sql

Collecting package metadata (current_repodata.json): ...working... done
Solving environment: ...working... unsuccessful initial attempt using frozen solve. Retrying with flexible solve.
Collecting package metadata (repodata.json): ...working... done
Solving environment: ...working... unsuccessful initial attempt using frozen solve. Retrying with flexible solve.



PackagesNotFoundError: The following packages are not available from current channels:

  - geoalchemy

Current channels:

  - https://conda.anaconda.org/conda-forge/win-64
  - https://conda.anaconda.org/conda-forge/noarch
  - https://repo.anaconda.com/pkgs/main/win-64
  - https://repo.anaconda.com/pkgs/main/noarch
  - https://repo.anaconda.com/pkgs/r/win-64
  - https://repo.anaconda.com/pkgs/r/noarch
  - https://repo.anaconda.com/pkgs/msys2/win-64
  - https://repo.anaconda.com/pkgs/msys2/noarch

To search for alternate channels that may provide the conda package you're
looking for, navigate to

    https://anaconda.org

and use the search bar at the top of the page.




In [None]:
%%load_ext sql

In [None]:
print("Connect")
%%sql postgresql://env_master:M123xyz@localhost/env_db

In [None]:
%%sql
SELECT * FROM information_schema.schemata

In [None]:
%%sql
CREATE SCHEMA IF NOT EXISTS gw AUTHORIZATION env_master

In [None]:
%%sql
SELECT * FROM information_schema.schemata;

## PostGIS: Upload GeoDataFrame with `gdf.to_postgis()`

Dependencies:
* psycopg2
* geoalchemy2

In [95]:
!conda install -c conda-forge geoalchemy psycopg

^C


In [96]:
import sqlalchemy
engine = sqlalchemy.create_engine("postgresql://env_master:M123xyz@localhost/env_db")
# fast_executemany=True
# use_batch_mode=True

ModuleNotFoundError: No module named 'psycopg2'

In [None]:
%%time
gdf.to_postgis(con=engine, name="gw_stations", schema="gw", index=True, chunksize=100, if_exists="replace")

Create primary key!

In [None]:
%%sql
alter table gw.gw_stations add constraint pk_gw_stations primary key (messstelle_id)

# Groundwater "Quality Data": Chemistry!

## Data Directories and Files

In [12]:
gw_quality_fname = r"\OpenHygrisC_gw-chemischer-messwert_2020-2029.csv"
gw_quality_pfname = data_in_dir + gw_quality_fname
print(f"Qualitätsdaten: {gw_quality_pfname:s}")

Qualitätsdaten: C:\Users\shrey\Desktop\Task_3\OpenHyPE-main\OpenHyPE-main\data\OpenGeodata.NRW\OpenHygrisC\OpenHygrisC_gw-messstelle_EPSG25832_CSV\OpenHygrisC_gw-chemischer-messwert_2020-2029_EPSG25832_CSV\OpenHygrisC_gw-chemischer-messwert_2020-2029.csv


In [13]:
fh = open(gw_quality_pfname,"r", encoding = "cp1252", newline = '')
s = fh.readline()
s = s.replace('"', '').strip()
header_de = s[1:].split(';')
header_de

['l_nr',
 'messstelle_id',
 'messstelle_sl_nr',
 'datum_pn',
 'stoff_nr',
 'probengut',
 'messergebnis_c',
 'messergebnis_hinweis',
 'bestimmungsgrenze',
 'masseinheit',
 'trennverfahren',
 'verfahren',
 'vor_ort']

In [14]:
#df['messstell'] = df['Value'].str.replace(',', '.', regex=True)

In [15]:
%time df_qual = pd.read_csv(gw_quality_pfname, sep = ";", encoding="cp1252", dtype = {"messergebnis_c":str ,"messergebnis_hinweis":str }, nrows = 5)

CPU times: total: 0 ns
Wall time: 9.04 ms


In [16]:
df_qual["messergebnis_c"] = df_qual["messergebnis_c"].str.replace(',', '.', regex=True)
df_qual["bestimmungsgrenze"] = df_qual["bestimmungsgrenze"].str.replace(',', '.', regex=True)
df_qual

Unnamed: 0,sl_nr,messstelle_id,messstelle_sl_nr,datum_pn,stoff_nr,probengut,messergebnis_c,messergebnis_hinweis,bestimmungsgrenze,masseinheit,trennverfahren,verfahren,vor_ort
0,19766742,106505920,64756,2021-11-22,4075,Grundwasser,<0.01000,Konzentration zu gering zur Bestimmung ...,0.01,µg/l,Gesamtgehalt,DIN 38407-36 (2014),
1,20157895,86613236,44873,2022-03-15,1131,Grundwasser,0.01700,...,,mg/l,Gesamtgehalt,nach Laborjournal,
2,20157896,86613236,44873,2022-03-15,1211,Grundwasser,0.04000,...,,mg/l,Gesamtgehalt,nach Laborjournal,
3,20157897,86613236,44873,2022-03-15,1124,Grundwasser,0.08000,...,,mg/l,Gesamtgehalt,nach Laborjournal,
4,20157898,86613236,44873,2022-03-15,1122,Grundwasser,67.10000,...,,mg/l,Gesamtgehalt,nach Laborjournal,


In [17]:
df_qual.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5 entries, 0 to 4
Data columns (total 13 columns):
 #   Column                Non-Null Count  Dtype 
---  ------                --------------  ----- 
 0   sl_nr                 5 non-null      int64 
 1   messstelle_id         5 non-null      int64 
 2   messstelle_sl_nr      5 non-null      int64 
 3   datum_pn              5 non-null      object
 4   stoff_nr              5 non-null      int64 
 5   probengut             5 non-null      object
 6   messergebnis_c        5 non-null      object
 7   messergebnis_hinweis  5 non-null      object
 8   bestimmungsgrenze     1 non-null      object
 9   masseinheit           5 non-null      object
 10  trennverfahren        5 non-null      object
 11  verfahren             5 non-null      object
 12  vor_ort               5 non-null      object
dtypes: int64(4), object(9)
memory usage: 652.0+ bytes


**The complete CSV file with the measured values of the chemical analyses comprises more than 3.6 million measured values!**

In [18]:
# Wall time: 13 s
%time df_qual = pd.read_csv(gw_quality_pfname, sep = ";", encoding="cp1252", index_col=["sl_nr"], \
                            dtype = {"messergebnis_c":str ,"messergebnis_hinweis":str }, \
                            parse_dates = ["datum_pn"])

CPU times: total: 1.8 s
Wall time: 1.88 s


In [19]:
df_qual.shape

(547538, 12)

In [20]:
df_qual.info()

<class 'pandas.core.frame.DataFrame'>
Index: 547538 entries, 19766742 to 20640545
Data columns (total 12 columns):
 #   Column                Non-Null Count   Dtype         
---  ------                --------------   -----         
 0   messstelle_id         547538 non-null  int64         
 1   messstelle_sl_nr      547538 non-null  int64         
 2   datum_pn              547538 non-null  datetime64[ns]
 3   stoff_nr              547538 non-null  int64         
 4   probengut             547538 non-null  object        
 5   messergebnis_c        547538 non-null  object        
 6   messergebnis_hinweis  547538 non-null  object        
 7   bestimmungsgrenze     337684 non-null  object        
 8   masseinheit           547538 non-null  object        
 9   trennverfahren        547538 non-null  object        
 10  verfahren             546255 non-null  object        
 11  vor_ort               547538 non-null  object        
dtypes: datetime64[ns](1), int64(3), object(8)
memory usage

In [21]:
# duplicate sl_nr values? Can it be a unique index?
# Result should be empty
print(df_qual[df_qual.index.duplicated()])

Empty DataFrame
Columns: [messstelle_id, messstelle_sl_nr, datum_pn, stoff_nr, probengut, messergebnis_c, messergebnis_hinweis, bestimmungsgrenze, masseinheit, trennverfahren, verfahren, vor_ort]
Index: []


## Time Series Example

In [22]:
# time series example
# stoff_nr=1244 ->"Nitrat"
idx = (df_qual["messstelle_id"] == 106505920) & (df_qual["stoff_nr"] == 4075)
df_qual.loc[idx,["datum_pn", "messergebnis_c"]].sort_values("datum_pn")

Unnamed: 0_level_0,datum_pn,messergebnis_c
sl_nr,Unnamed: 1_level_1,Unnamed: 2_level_1
19766742,2021-11-22,"<0,01000"


In [23]:
df_qual

Unnamed: 0_level_0,messstelle_id,messstelle_sl_nr,datum_pn,stoff_nr,probengut,messergebnis_c,messergebnis_hinweis,bestimmungsgrenze,masseinheit,trennverfahren,verfahren,vor_ort
sl_nr,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1
19766742,106505920,64756,2021-11-22,4075,Grundwasser,"<0,01000",Konzentration zu gering zur Bestimmung ...,001000,µg/l,Gesamtgehalt,DIN 38407-36 (2014),
20157895,86613236,44873,2022-03-15,1131,Grundwasser,001700,...,,mg/l,Gesamtgehalt,nach Laborjournal,
20157896,86613236,44873,2022-03-15,1211,Grundwasser,004000,...,,mg/l,Gesamtgehalt,nach Laborjournal,
20157897,86613236,44873,2022-03-15,1124,Grundwasser,008000,...,,mg/l,Gesamtgehalt,nach Laborjournal,
20157898,86613236,44873,2022-03-15,1122,Grundwasser,6710000,...,,mg/l,Gesamtgehalt,nach Laborjournal,
...,...,...,...,...,...,...,...,...,...,...,...,...
20639785,60220909,13063,2022-06-30,4393,Grundwasser,"<0,05000",Konzentration zu gering zur Bestimmung ...,005000,µg/l,Gesamtgehalt,"Direkt, LC-MS-MS - VS - FY0 - a",
20640175,60220909,13063,2022-06-30,4099,Grundwasser,"<0,05000",Konzentration zu gering zur Bestimmung ...,005000,µg/l,Gesamtgehalt,"Direkt, LC-MS-MS - VS - FY0 - a",
20640193,60220909,13063,2022-06-30,4210,Grundwasser,"<0,02500",Konzentration zu gering zur Bestimmung ...,002500,µg/l,Gesamtgehalt,DIN EN ISO 21676 - DO - F47 -1,
20640541,60220909,13063,2022-06-30,1061,Grundwasser,650000,...,,-,Gesamtgehalt,DIN EN ISO 10523 - DO - C05 - 1,ja


### Tests for different measurement value string cases

```
(1)   "1.00" (is_float)
(2)  "<1.00" (is_less)
(3)  ">1.00" (is_greater)
```


In [59]:
# check if string can be converted to float
def is_float(element: str) -> bool:
    try:
        float(element)
        return True
    except ValueError:
        return False

In [60]:
# check if string starts with '<'
def is_less(element: str) -> bool:
    return element[0] == "<" 

In [61]:
# check if string starts with '>'
def is_greater(element: str) -> bool:
    return element[0] == ">" 

In [62]:
print("is_float()")
print(is_float("<1.234"))
print(is_float(">1.234"))
print(is_float("-1.234"))

is_float()
False
False
True


In [63]:
# Some test applications
print("is_less()")
print(is_less("<1.234"))
print(is_less(">1.234"))
print(is_less("1.234"))
print("is_greater()")
print(is_greater("<1.234"))
print(is_greater(">1.234"))
print(is_greater("1.234"))
print("is_float()")
print(is_float("<1.234"))
print(is_float(">1.234"))
print(is_float("1.234"))

is_less()
True
False
False
is_greater()
False
True
False
is_float()
False
False
True


In [64]:
# Apply the tests and create Boolean indexes
%time idx_mess_is_float   = df_qual["messergebnis_c"].apply(is_float)
%time idx_mess_is_less    = df_qual["messergebnis_c"].apply(is_less)
%time idx_mess_is_greater = df_qual["messergebnis_c"].apply(is_greater)

CPU times: total: 1.12 s
Wall time: 1.19 s
CPU times: total: 125 ms
Wall time: 148 ms
CPU times: total: 93.8 ms
Wall time: 152 ms


In [74]:
print(idx_mess_is_greater)

sl_nr
19766742    False
20157895    False
20157896    False
20157897    False
20157898    False
            ...  
20639785    False
20640175    False
20640193    False
20640541    False
20640545    False
Name: messergebnis_c, Length: 547538, dtype: bool


In [None]:
#df_qual.loc[
#df.loc[idx_coords_1m_prec,"e32num"] = df.loc[idx_coords_1m_prec,"e32"].astype(float)    

In [77]:
# Print records which are neither less nor greater nor float -> should be empty data frame
assert df_qual[~idx_mess_is_less & ~idx_mess_is_greater & ~idx_mess_is_float].shape[0] == 0

# Dataframe should be empty
print(df_qual[~idx_mess_is_less & ~idx_mess_is_greater & ~idx_mess_is_float])

AssertionError: 

In [78]:
# res = (~idx_mess_is_less & ~idx_mess_is_greater & ~idx_mess_is_float).value_counts()
res = (idx_mess_is_less | idx_mess_is_greater | idx_mess_is_float).value_counts()
res

messergebnis_c
True     337467
False    210071
Name: count, dtype: int64

## Convert measurement results to float. Fill the limit column.

In [79]:
%time df_qual.loc[idx_mess_is_float,"messergebnis_num"] = df_qual.loc[idx_mess_is_float,"messergebnis_c"].astype(float)
%time df_qual.loc[idx_mess_is_float,"grenze"] = "="

%time df_qual.loc[idx_mess_is_less,"messergebnis_num"] = df_qual.loc[idx_mess_is_less,"messergebnis_c"].str[1:].astype(float)
%time df_qual.loc[idx_mess_is_less,"grenze"] = "<"

%time df_qual.loc[idx_mess_is_greater,"messergebnis_num"] = df_qual.loc[idx_mess_is_greater,"messergebnis_c"].str[1:].astype(float)
%time df_qual.loc[idx_mess_is_greater,"grenze"] = ">"



CPU times: total: 0 ns
Wall time: 3 ms
CPU times: total: 0 ns
Wall time: 992 µs


ValueError: could not convert string to float: '0,01000'

CPU times: total: 0 ns
Wall time: 8.04 ms
CPU times: total: 0 ns
Wall time: 2 ms
CPU times: total: 0 ns
Wall time: 999 µs


In [80]:
print("Different values for column 'grenze'")
print(df_qual["grenze"].value_counts())

Different values for column 'grenze'
grenze
<    337467
Name: count, dtype: int64


In [82]:
df_qual[idx_mess_is_greater][["messergebnis_c", "messergebnis_num", "grenze"]].head()

Unnamed: 0_level_0,messergebnis_c,messergebnis_num,grenze
sl_nr,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1


In [83]:
df_qual[idx_mess_is_less][["messergebnis_c", "messergebnis_num", "grenze"]].head()

Unnamed: 0_level_0,messergebnis_c,messergebnis_num,grenze
sl_nr,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
19766742,"<0,01000",,<
20157910,"<1,00000",,<
20157911,"<0,23000",,<
20157914,"<0,20000",,<
20157915,"<0,03300",,<


In [84]:
df_qual[idx_mess_is_float][["messergebnis_c", "messergebnis_num", "grenze"]].head()

Unnamed: 0_level_0,messergebnis_c,messergebnis_num,grenze
sl_nr,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1


In [None]:
# Reason for not being float? XOR: A ^ B
#idx = (~idx_mess_is_float ^ idx_mess_is_less) # These are non-floats which are be less at the same time => greater
#df_qual[idx]

In [None]:
# Reason for not being float? XOR
#idx = (~idx_mess_is_float ^ idx_mess_is_greater)
#df_qual[idx]

In [85]:
df_qual[df_qual["messergebnis_num"]<0]

Unnamed: 0_level_0,messstelle_id,messstelle_sl_nr,datum_pn,stoff_nr,probengut,messergebnis_c,messergebnis_hinweis,bestimmungsgrenze,masseinheit,trennverfahren,verfahren,vor_ort,messergebnis_num,grenze
sl_nr,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1


## Upload the data to the database with `df.to_sql()`

In [86]:
import sqlalchemy
engine = sqlalchemy.create_engine("postgresql+psycopg://env_master:M123xyz@localhost/env_db")

In [87]:
# the default to_sql() / sqlalchemy method using psycopg2 (default PG driver) ...
# on my laptop:
# Approx. Wall time: 4min 32s 

%time df_qual.to_sql(con=engine, name="gw_meas", schema="gw", if_exists="fail")
#%time df_qual.to_sql(con=engine, name="gw_meas", schema="gw", if_exists="replace")

ProgrammingError: (psycopg.errors.InvalidSchemaName) schema "gw" does not exist
LINE 2: CREATE TABLE gw.gw_meas (
                     ^
[SQL: 
CREATE TABLE gw.gw_meas (
	sl_nr BIGINT, 
	messstelle_id BIGINT, 
	messstelle_sl_nr BIGINT, 
	datum_pn TIMESTAMP WITHOUT TIME ZONE, 
	stoff_nr BIGINT, 
	probengut TEXT, 
	messergebnis_c TEXT, 
	messergebnis_hinweis TEXT, 
	bestimmungsgrenze TEXT, 
	masseinheit TEXT, 
	trennverfahren TEXT, 
	verfahren TEXT, 
	vor_ort TEXT, 
	messergebnis_num FLOAT(53), 
	grenze TEXT
)

]
(Background on this error at: https://sqlalche.me/e/20/f405)

## Search for duplicates! Primary key is not straight forward!

In [None]:
%load_ext sql

In [None]:
print("Connect")
%sql postgresql://env_master:M123xyz@localhost/env_db

In [None]:
%%sql
alter table gw.gw_meas add constraint pk_gw_meas primary key (messstelle_id, datum_pn, stoff_nr)

In [None]:
%%sql
select * from gw.gw_meas where (messstelle_id, datum_pn, stoff_nr) = (73537317, '1990-08-17 00:00:00', 1061)

Is `sl_nr` unique?

In [None]:
%%sql
select sl_nr,count(sl_nr) as count from gw.gw_meas group by sl_nr having count(sl_nr) > 1; 

**`sl_nr` is a non-smart primary key ...**

In [None]:
%%sql
alter table gw.gw_meas add constraint pk_gw_meas primary key (sl_nr)

## Create some indexes to improve database performance

In [None]:
%%sql
create index idx_gw_meas_messstelle_id_datum_pn on gw.gw_meas (messstelle_id, datum_pn)

In [None]:
%%sql
create index idx_gw_meas_datum_pn_meas_messstelle on gw.gw_meas (datum_pn, messstelle_id)

In [None]:
%%sql
create index idx_gw_meas_stoff_nr on gw.gw_meas (stoff_nr)

In [None]:
%%sql
create index idx_gw_meas_datum_pn_stoff_nr on gw.gw_meas (datum_pn, stoff_nr);

In [None]:
%%time
%sql select count(*) from gw.gw_meas

In [None]:
%%time
%sql select messstelle_id, datum_pn, count(*) as count from gw.gw_meas group by (messstelle_id, datum_pn) limit 20

**ATTENTION! 140515 anlyses were performed with more than one method!**

In [None]:
#%%sql
#SELECT messstelle_id, datum_pn, stoff_nr, COUNT(*) AS Count
#FROM gw.gw_meas
#GROUP BY messstelle_id, datum_pn, stoff_nr
#HAVING COUNT(*) > 1;

In [None]:
# %%sql
# SELECT t1.* from gw.gw_meas t1, gw.gw_meas t2 
# where 
# t1.messstelle_id = t2.messstelle_id
# and
# t1.datum_pn = t2.datum_pn
# and
# t1.stoff_nr = t2.stoff_nr
# and
# t1.verfahren <> t2.verfahren
# and

# t1.sl_nr = (select max(sl_nr) from gw.gw_meas t3 
# where
# t1.messstelle_id = t3.messstelle_id
# and
# t1.datum_pn = t3.datum_pn
# and
# t1.stoff_nr = t3.stoff_nr
# )

# limit 1000

# Import `katalog_stoff`

It is in another notebook!

In [None]:
%%sql
select * from gw.katalog_stoff where name like 'N%'

In [None]:
%%sql
alter table gw.katalog_stoff add constraint pk_kat_stoff primary key (stoff_nr)

In [None]:
%%sql
create index idx_kat_stoff_name on gw.katalog_stoff (name) 

In [None]:
%%sql
select * from gw.gw_stations limit 3

# Create Views!

In [None]:
%%sql
drop view gw.v_gw_stations_wrrl_chemie

In [None]:
%%sql
create view gw.v_gw_stations_wrrl_chemie as
select * from gw.gw_stations 
where im_wrrl_messnetz_chemie = 'ja'
and freigabe_chemie = 'ja'

In [None]:
%%sql
select count(*) from gw.v_gw_stations_wrrl_chemie

In [None]:
%%sql
select geometry, messstelle_id, name, genau, im_wrrl_messnetz_chemie, im_wrrl_messnetz_wasserstand from gw.gw_stations
limit 10

In [None]:
%%sql
select *
from gw.gw_meas
limit 1

In [None]:
%%sql
select sl_nr, messstelle_id, stoff_nr, datum_pn, grenze, messergebnis_num, masseinheit 
from gw.gw_meas
limit 3

In [None]:
%%sql
select 
st.geometry, st.messstelle_id, st.name, st.genau, st.im_wrrl_messnetz_chemie, st.im_wrrl_messnetz_wasserstand,
m.sl_nr, m.stoff_nr,  p.name as stoffname, m.datum_pn, m.grenze, m.messergebnis_num, m.masseinheit
from gw.gw_stations st, gw.gw_meas m, gw.katalog_stoff p
where st.messstelle_id = m.messstelle_id
and m.stoff_nr = p.stoff_nr
limit 2

In [None]:
%%sql
drop view gw.gw_station_series

In [None]:
%%sql
create or replace view gw.gw_station_series as
select 
m.sl_nr as fid, st.geometry, st.messstelle_id, st.name, st.genau, st.im_wrrl_messnetz_chemie, st.im_wrrl_messnetz_wasserstand,
m.sl_nr, m.stoff_nr,  p.name as stoffname, m.datum_pn, m.grenze, m.messergebnis_num, m.masseinheit
from gw.gw_stations st, gw.gw_meas m, gw.katalog_stoff p
where st.messstelle_id = m.messstelle_id
and m.stoff_nr = p.stoff_nr
order by messstelle_id, stoff_nr, datum_pn

In [None]:
%%sql
select * from gw.katalog_stoff where name = 'Nitrat'

In [None]:
%%sql
drop view gw.gw_station_nitrat_series

In [None]:
%%sql
select distinct (im_wrrl_messnetz_chemie) from gw.gw_stations

In [None]:
%%sql
drop view gw.v_gw_station_nitrat

In [None]:
%%sql
create or replace view gw.v_gw_station_nitrat as
select 
m.sl_nr as fid, st.geometry, st.messstelle_id, st.name, st.genau, st.im_wrrl_messnetz_chemie, st.im_wrrl_messnetz_wasserstand,
m.sl_nr, m.stoff_nr,  p.name as stoffname, m.datum_pn, m.grenze, m.messergebnis_num, m.masseinheit
from gw.gw_stations st, gw.gw_meas m, gw.katalog_stoff p
where st.im_wrrl_messnetz_chemie = 'ja'
and p.name = 'Nitrat'
and m.stoff_nr = p.stoff_nr
and st.messstelle_id = m.messstelle_id
order by messstelle_id, stoff_nr, datum_pn

In [None]:
%%sql
create or replace view gw.v_gw_station_sulfat as
select 
m.sl_nr as fid, st.geometry, st.messstelle_id, st.name, st.genau, st.im_wrrl_messnetz_chemie, st.im_wrrl_messnetz_wasserstand,
m.sl_nr, m.stoff_nr,  p.name as stoffname, m.datum_pn, m.grenze, m.messergebnis_num, m.masseinheit
from gw.gw_stations st, gw.gw_meas m, gw.katalog_stoff p
where st.im_wrrl_messnetz_chemie = 'ja'
and p.name = 'Sulfat'
and m.stoff_nr = p.stoff_nr
and st.messstelle_id = m.messstelle_id
order by messstelle_id, stoff_nr, datum_pn

In [None]:
%sql select count(*) from gw.v_gw_station_sulfat

## Exercises

1) Add the PostGIS table `gw.gw_stations` as vector layer to QGIS.

2) Use df.to_sql() to upload the table with the catalog (file `katalog_stoff.csv` in the data directory) of the analyzed quantities (substances, physico-chemical parameters, e.g. NO3- concentation (nitrate), pH, air temperature (can be neg.), etc.)

3) Add the catalog with municipalities (file `katalog_gemeinde.csv`)

4) SQL: Create a view joining the gw station table with gw meas table and gw parameter table. (A bit difficult. We have not discussed it yet.)

5) Create a reduced view for nitrate only joining the gw station table with gw meas table and gw parameter table.

6) Try to get the station-nitrate table into QGIS using the PostGIS interface.

SQL: Before you create the views create primary keys for the tables. i.e. `(messstelle_id)` for `gw_stations`, 
`(messstelle_id, stoff_nr, pna_datum)` for `gw_meas`.