# NRW Groundwater Data --- OpenHygrisC Data Engineering

Data from <br>
**[LANUV](https://www.lanuv.nrw.de/): Landesamt für Natur, Umwelt und Verbraucherschutz Nordrhein-Westfalen** <br>
(State Office for Nature, Environment and Consumer Protection NRW)

* LANUV groundwater web pages: https://www.lanuv.nrw.de/umwelt/wasser/grundwasser

Groundwater data: https://www.lanuv.nrw.de/umwelt/wasser/grundwasser/grundwasserstand/grundwasserdaten-online

ELWAS-WEB NRW - Infos zu den Grundwasserkörpern (YouTube): https://www.youtube.com/watch?v=4wFKIu622rk

In the database HygrisC the LANUV provides groundwater quality and quantity data for most groundwater wells in NRW. The groundwater wells are partly owned and operated by NRW, partly by other parties. 
The measurement intervals are usually annual. Some groundwater well are sampled more frequently. 

WRRL: EU Wasserrahmenrichtlinie, EU Water Framework Directive

The quality data is based on chemical analyses of groundwater samples. The quantity data is based on groundwater level measurement.


OpenHygrisC Data: https://www.opengeodata.nrw.de/produkte/umwelt_klima/wasser/grundwasser/hygrisc/

**Download the NRW groundwater data zip file**:
<br>
https://www.opengeodata.nrw.de/produkte/umwelt_klima/wasser/grundwasser/hygrisc/OpenHygrisC_gw-messstellen-messwerte_EPSG25832_CSV.zip

The zip archive contains gw station info, a catalog of possible physico-chemical analysis parameters, and the measured data. 

## Coordinate Obfuscation 

Some coordinate data in the gw station info reveal difficulties. The coordinate reference system (CRS) used is the projected metric based 
EPSG:25832 ( ETRS89 / UTM zone 32N). 
The dataframe coordinate columns `e32` (easting) and `n32` (northing) are of data type object (not numeric). 

The resolution is 1m but many coordinates are obscurred because of privacy issues to a precision of 100m. A few coordinates are missing, i.e. either empty (nan) or filled with `xx`.


The coordinate columns e32 and n32 are of data type object/string. Four cases must be distinguished:

* Most strings are in a regular number format and can be converted to float right away (case (1) and (2) in the table)
* Other coordinate strings are obfuscated by replacing the two least significant decimal places with the characters "xx". This usually happens when a groundwater well is on private property. The coordinates are made less precise to respect privacy. The remaining coordinate information is still usable. The precision is limited to 100 meters. The uncertainty is +- 50m. (case (3) in the table)
* In some cases no coordinate infomation is given at all. In these cases the coordinate strings are just "xx". (case (4) in the table)
* In a very few cases the coordinate columns are empty, i.e. NaN (Null). (case (5) in the table)

The following table shows representative cases.


| case |   messstelle_id | e32    | n32     | grundstueck   |
|-----:|----------------:|-------:|--------:|:--------------|
|  (1) |        10000094 | 292868 | 5632572 | oeffentlich   |
|  (2) |        10000045 | 299399 | 5650595 | privat        |
|  (3) |        10000033 | 3070xx | 56583xx | privat        |
|  (4) |        47247101 | xx     | xx      |               |
|  (5) |        79921802 | nan    | nan     |               |

Case (1) and (2) have coordinate strings which can be immediately converted to integer or float with 1m precision. Case (3) shows coordinate obfuscation to a precision to 100m. The digits representing tens and ones are anonymized. Case (4) and (5) show useless coordinate information.  

How to deal with non-anonymized data:

"299399" (string, prec. 1) => 299399.0 (float) 

How to deal with anonymization:

307000 <= 3070xx <= 307099

"3070xx" (string, prec. 100) => 307050 (float, +- 50m) 



In [None]:
!conda env list

## Correct wrong `PROJ_LIB` environment variable value 

This problem seems to occur on Windows when using the OSGeo4W installer. The environment variable must point to a user specific directory and according to the activated conda environment, e.g. `PROJ_LIB=C:\Users\<username>\Anaconda3\envs\geo\Library\share\proj` 

In [None]:
# Correct wrong environment variable value occurring when using OSGeo4W installer

import os
#proj_lib = os.environ['proj_lib']
#print(proj_lib)

conda_prefix = os.environ['conda_prefix']
print(f"CONDA_PREFIX: {conda_prefix:s}")
os.environ['proj_lib'] = conda_prefix + r"\Library\share\proj"
proj_lib = os.environ['proj_lib']
print(f"New env var value: \nPROJ_LIB={proj_lib:s}")

## Imports

In [None]:
import pandas as pd
import geopandas as gpd

## Data Directories and Files

In [None]:
data_in_dir = r"../data/original/OpenHygrisC_gw-messstellen-messwerte_EPSG25832_CSV/"

gw_station_fname = r"opendata.gw_messstelle.csv"
gw_quality_fname = r"opendata.gw_chemischer_messwert.csv"

gw_station_pfname = data_in_dir + gw_station_fname
gw_quality_pfname = data_in_dir + gw_quality_fname
print(f"Stationsdaten:  {gw_station_pfname:s}")
print(f"Qualitätsdaten: {gw_quality_pfname:s}")

## GW Station Data


In [None]:
df = pd.read_csv(gw_station_pfname, sep = ";", index_col=["messstelle_id"])

In [None]:
df.sort_index(ascending=True, inplace=True)

In [None]:
num_total = df.shape[0]
df.shape

In [None]:
df.head(5)

In [None]:
df[df["grundstueck"]=="oeffentlich"]

## Challange: Coordinates obfuscation

The coordinate columns e32 and n32 are of data type string. Four cases must be distinguished:

(1) Most strings are in a regular number format and can be converted to float right away.

(2) Other coordinate strings are obfuscated by replacing the two least significant digits with the characters "xx". This usually happens when a groundwater well is on private property. The coordinates are made less precise to respect privacy. The remaining coordinate information is still usable. The precision is limited to 100 meters. The uncertainty is +- 50m. 

(3) In some cases no coordinate infomation is given at all. In these cases the coordinate strings are just "xx".

(4) In a very few cases the coordinate columns are empty, i.e. NaN (Null).

In [None]:
# These four groundwater wells summarize the coordinate problems.
df_coord_problem=df.loc[[10000094, 10000045, 10000033, 47247101, 79921802],["e32","n32", "grundstueck"]]
df_coord_problem

In [None]:
# forma table as markdown
from tabulate import tabulate
print(tabulate(df_coord_problem, tablefmt="pipe", headers="keys"))

|   messstelle_id | e32    | n32     | grundstueck   |
|----------------:|:-------|:--------|:--------------|
|        10000094 | 292868 | 5632572 | oeffentlich   |
|        10000045 | 299399 | 5650595 | privat        |
|        10000033 | 3070xx | 56583xx | privat        |
|        47247101 | xx     | xx      |               |
|        79921802 | nan    | nan     |               |

**Boolean indexes are used to filter the data according to the cases (1) to (4).**

In [None]:
# Add column for precision
df["genau"] = 0

# (1) If the coord data is numeric then the precision is 1m
idx_coords_1m_prec = (df["e32"].str.isnumeric() == True) 

# (3,4) Some stations don't have coordinates
# e32 and n32 strings are either NaN (Null) or "xx"
idx_coords_missing = (df["e32"].str.len() < 6) | (df["e32"].isnull() == True)

# (2) If coord data is avaliable but not numeric, then the numbers have been obscured with "XX" for the two least significant decimals.
idx_coords_100m_prec = ~idx_coords_missing &  ~(df["e32"].str.isnumeric() == True)


In [None]:
df.index[idx_coords_missing]

**Convert the strings to floats where possible. No data values are represented as negative numbers.**

In [None]:
df.loc[idx_coords_1m_prec,"e32num"] = df.loc[idx_coords_1m_prec,"e32"].astype(float)
df.loc[idx_coords_1m_prec,"n32num"] = df.loc[idx_coords_1m_prec,"n32"].astype(float)
df.loc[idx_coords_1m_prec, "genau"] = 1

In [None]:
df.loc[idx_coords_100m_prec,"e32num"] = (df.loc[idx_coords_100m_prec,"e32"].str[:-2]+"50").astype(float)
df.loc[idx_coords_100m_prec,"n32num"] = (df.loc[idx_coords_100m_prec,"n32"].str[:-2]+"50").astype(float)
df.loc[idx_coords_100m_prec, "genau"] = 100

In [None]:
df.loc[idx_coords_missing,"e32num"] = -999.9
df.loc[idx_coords_missing,"n32num"] = -999.9
df.loc[idx_coords_missing, "genau"] = -999

In [None]:
# check if all records have been matched
num_of_1m_prec = df[df["genau"] == 1].shape[0]
num_of_100m_prec = df[df["genau"] == 100].shape[0]
num_of_no_prec = df[df["genau"] == -999].shape[0]

num_check = num_of_1m_prec + num_of_100m_prec + num_of_no_prec

print(f"total num of recs:                        {num_total:6d}")
print(f"number of recs with 1m coord precision:   {num_of_1m_prec:6d}")
print(f"number of recs with 100m coord precision: {num_of_100m_prec:6d}")
print(f"number of recs with no coords:            {num_of_no_prec:6d}")
print(f"check sum:                                {num_check:6d}")

assert num_check == num_total, "ERROR. Mismatch in numbers of stations"


**Save the original string as well as the derived numeric columns to a CSV file for checking externally.**

In [None]:
df[["e32","e32num","n32","n32num","genau"]].to_csv("check.csv")
df[["e32","e32num","n32","n32num","genau"]]

## Geopandas

In [None]:
import geopandas as gpd
from shapely.geometry import Point

In [None]:
# remove records without coords
df2 = df[df["genau"] > 0]

In [None]:
df2.shape

In [None]:
%%time
gdf = gpd.GeoDataFrame(df2, geometry=gpd.points_from_xy(df2.e32num, df2.n32num), crs="EPSG:25832")

In [None]:
gdf.info()

In [None]:
gdf.head(3)

In [None]:
%%time

# This takes 90 secs on my computer!

#gdf.to_file("GW_Stations.gpkg", layer='GW Stations', driver="GPKG")

## PostGIS, Inline SQL: `create schema gw`

To store the data in PostGIS/PostgreSQL it is recommended to create a dedicated database "schema" (a kind of name space) to separate relations (tables, views), stored procedures, etc. from the rest of the database. Schemata help to organize the tables and access privileges clearly. 


In [None]:
#!conda install -c conda-forge ipython-sql

In [None]:
%load_ext sql

In [None]:
print("Connect")
%sql postgresql://env_master:xxxxxx@localhost/env_db

In [None]:
%%sql
SELECT * FROM information_schema.schemata

In [None]:
%%sql
CREATE SCHEMA IF NOT EXISTS gw AUTHORIZATION env_master

In [None]:
%%sql
SELECT * FROM information_schema.schemata;

## PostGIS: Upload GeoDataFrame with `gdf.to_postgis()`

Dependencies:
* psycopg2
* geoalchemy2

In [None]:
import sqlalchemy
engine = sqlalchemy.create_engine("postgresql://env_master:xxxxxx@localhost/env_db")
# fast_executemany=True
# use_batch_mode=True

In [None]:
%%time
gdf.to_postgis(con=engine, name="gw_stations", schema="gw", index=True, chunksize=100, if_exists="replace")

# Groundwater "Quality Data": Chemistry!

## Imports

In [None]:
import pandas as pd

## Data Directories and Files

In [None]:
data_in_dir = r"../data/original/OpenHygrisC_gw-messstellen-messwerte_EPSG25832_CSV/"

gw_station_fname = r"opendata.gw_messstelle.csv"
gw_quality_fname = r"opendata.gw_chemischer_messwert.csv"

gw_station_pfname = data_in_dir + gw_station_fname
gw_quality_pfname = data_in_dir + gw_quality_fname
print(f"Stationsdaten:  {gw_station_pfname:s}")
print(f"Qualitätsdaten: {gw_quality_pfname:s}")

In [None]:
print(gw_quality_pfname)

In [None]:
fh = open(gw_quality_pfname,"r", encoding = "utf-8", newline = '')
s = fh.readline()
s = s.replace('"', '').strip()
header_de = s[1:].split(';')
header_de

In [None]:
%time df_qual = pd.read_csv(gw_quality_pfname, sep = ";", dtype = {"messergebnis_c":str ,"messergebnis_hinweis":str }, nrows = 5)

In [None]:
df_qual.head(3)

In [None]:
df_qual.info()

The full CSV file with the chemical lab measurements comprises more than 3.6 Mio measurments! 

In [None]:
# Wall time: 13 s
%time df_qual = pd.read_csv(gw_quality_pfname, sep = ";", index_col=["sl_nr"], \
                            dtype = {"messergebnis_c":str ,"messergebnis_hinweis":str }, \
                            parse_dates = ["datum_pn", "aktual_dat", "erstell_dat"])

In [None]:
df_qual.shape

In [None]:
df_qual.info()

In [None]:
df_qual.head(3)

In [None]:
# duplicate sl_nr values? Can it be a unique index?
# Result should be empty
print(df_qual[df_qual.index.duplicated()])

## Time Series Example

In [None]:
# time series example
# stoff_nr=1244 ->"Nitrat"
idx = (df_qual["messstelle_id"] == 20002129) & (df_qual["stoff_nr"] == 1244)
df_qual.loc[idx,["datum_pn", "messergebnis_c"]].sort_values("datum_pn")

### Tests for different measurement value string cases

```
(1)   "1.00" (is_float)
(2)  "<1.00" (is_less)
(3)  ">1.00" (is_greater)
```


In [None]:
# check if string can be converted to float
def is_float(element: str) -> bool:
    try:
        float(element)
        return True
    except ValueError:
        return False

In [None]:
# check if string starts with '<'
def is_less(element: str) -> bool:
    return element[0] == "<" 

In [None]:
# check if string starts with '>'
def is_greater(element: str) -> bool:
    return element[0] == ">" 

In [None]:
print("is_float()")
print(is_float("<1.234"))
print(is_float(">1.234"))
print(is_float("-1.234"))

In [None]:
# Some test applications
print("is_less()")
print(is_less("<1.234"))
print(is_less(">1.234"))
print(is_less("1.234"))
print("is_greater()")
print(is_greater("<1.234"))
print(is_greater(">1.234"))
print(is_greater("1.234"))
print("is_float()")
print(is_float("<1.234"))
print(is_float(">1.234"))
print(is_float("1.234"))

In [None]:
# Apply the tests and create Boolean indexes
%time idx_mess_is_float   = df_qual["messergebnis_c"].apply(is_float)
%time idx_mess_is_less    = df_qual["messergebnis_c"].apply(is_less)
%time idx_mess_is_greater = df_qual["messergebnis_c"].apply(is_greater)

In [None]:
# Print records which are neither less nor greater nor float -> should be empty data frame
assert df_qual[~idx_mess_is_less & ~idx_mess_is_greater & ~idx_mess_is_float].shape[0] == 0

# Dataframe should be empty
print(df_qual[~idx_mess_is_less & ~idx_mess_is_greater & ~idx_mess_is_float])

In [None]:
# res = (~idx_mess_is_less & ~idx_mess_is_greater & ~idx_mess_is_float).value_counts()
res = (idx_mess_is_less | idx_mess_is_greater | idx_mess_is_float).value_counts()
res

## Convert measurement results to float. Fill the limit column.

In [None]:
%time df_qual.loc[idx_mess_is_less,"messergebnis_num"] = df_qual.loc[idx_mess_is_less,"messergebnis_c"].str[1:]
%time df_qual.loc[idx_mess_is_less,"limit"] = "<"

%time df_qual.loc[idx_mess_is_greater,"messergebnis_num"] = df_qual.loc[idx_mess_is_greater,"messergebnis_c"].str[1:]
%time df_qual.loc[idx_mess_is_greater,"limit"] = ">"

%time df_qual.loc[idx_mess_is_float,"messergebnis_num"] = df_qual.loc[idx_mess_is_float,"messergebnis_c"].astype(float)
%time df_qual.loc[idx_mess_is_float,"limit"] = "="

In [None]:
print("Different values for column 'limit'")
print(df_qual["limit"].value_counts())

In [None]:
df_qual[idx_mess_is_greater][["messergebnis_c", "messergebnis_num", "limit"]].head()

In [None]:
df_qual[idx_mess_is_less][["messergebnis_c", "messergebnis_num", "limit"]].head()

In [None]:
df_qual[idx_mess_is_float][["messergebnis_c", "messergebnis_num", "limit"]].head()

In [None]:
# Reason for not being float? XOR: A ^ B
#idx = (~idx_mess_is_float ^ idx_mess_is_less) # These are non-floats which are be less at the same time => greater
#df_qual[idx]

In [None]:
# Reason for not being float? XOR
#idx = (~idx_mess_is_float ^ idx_mess_is_greater)
#df_qual[idx]

In [None]:
df_qual[df_qual["messergebnis_num"]<0]

## SQLAlchemy performance tests

`df.to_sql()` uses the `SQLalchemy library`. This library provides a SQL database API for a lot of different database management systems (DBMS), e.g. PostgreSQL, Microsoft SQL Server, etc. SQLAlchemy uses DBMS specific low level drivers such as `psycopg2` for PostgreSQL to connect to the different database systems. The following connection strings are used to connect to PostgreSQL (PG) using the psycopg22 driver (the default PG driver):

`engine = sqlalchemy.create_engine("postgresql://env_master:xxxxxx@localhost/env_db")`

`engine = sqlalchemy.create_engine("postgresql+psycopg2://env_master:xxxxxx@localhost/env_db")`


In [None]:
import sqlalchemy

### The following performance tests to not differ significantly. 

In [None]:
# the default to_sql() / sqlalchemy method using psycopg2 (default PG driver) ...
# on my laptop:
# Approx. Wall time: 5min 35s 

engine = sqlalchemy.create_engine("postgresql+psycopg2://env_master:xxxxxx@localhost/env_db")

%time df_qual.to_sql(con=engine, name="gw_meas", schema="gw", if_exists="replace")

In [None]:
# other attempts to speed up ...
# on my laptop:
# Approx. Wall time: 5min 35s
# => no improvement
engine = sqlalchemy.create_engine("postgresql+psycopg2://env_master:xxxxxx@localhost/env_db", \
                                  executemany_mode='values', \
                                  executemany_values_page_size=10000, \
                                  executemany_batch_page_size=500)

# %time df_qual.to_sql(con=engine, name="gw_meas", schema="gw", if_exists="replace", method="multi")
%time df_qual.to_sql(con=engine, name="gw_meas", schema="gw", if_exists="replace")

### The following attempt using `method="multi"` fails with `psycopg2`! 

The parameter `method="multi"` seems to be effective with the `msodbc` driver for MS SQL Server. In `psycopg2` it causes problems.  

In [None]:
# the default to_sql() / sqlalchemy method using psycopg2 (default PG driver) ...
# on my laptop:
# Wall time: FAILED! MANUALLY INTERRUPTED AFTER 10:00 MINS.

engine = sqlalchemy.create_engine("postgresql+psycopg2://env_master:xxxxxx@localhost/env_db")

# %time df_qual.to_sql(con=engine, name="gw_meas", schema="gw", if_exists="replace", chunksize=1000)
%time df_qual.to_sql(con=engine, name="gw_meas", schema="gw", if_exists="replace", method="multi")

## Exercises

1) Add the PostGIS table `gw.gw_stations` as vector layer to QGIS.

2) Use df.to_sql() to upload the table with the catalog (file `katalog_stoff.csv` in the data directory) of the analyzed quantities (substances, physico-chemical parameters, e.g. NO3- concentation (nitrate), pH, air temperature (can be neg.), etc.)

3) Add the catalog with municipalities (file `katalog_gemeinde.csv`)

4) SQL: Create a view joining the gw station table with gw meas table and gw parameter table. (A bit difficult. We have not discussed it yet.)

5) Create a reduced view for nitrate only joining the gw station table with gw meas table and gw parameter table.

6) Try to get the station-nitrate table into QGIS using the PostGIS interface.

SQL: Before you create the views create primary keys for the tables. i.e. `(messstelle_id)` for `gw_stations`, 
`(messstelle_id, stoff_nr, pna_datum)` for `gw_meas`.