In [7]:
#| default_exp handlers.ospar

# OSPAR 

> Data pipeline (handler) to convert OSPAR data ([source](https://odims.ospar.org/en/)) to `NetCDF` format or `Open Refine` format.  

The OSPAR Environment [database](https://odims.ospar.org/en/) is provided as a Microsoft Access database. 
`Mdbtools` (https://github.com/mdbtools/mdbtools) can be used to convert the tables of the Microsoft Access database to `.csv` files on Unix-like OS.

Example steps:
1. Download data.
2. Install mdbtools via VScode Terminal 

    ```
    sudo apt-get -y install mdbtools
    ````

3. Install unzip via VScode Terminal 

    ```
    sudo apt-get -y install unzip
    ````

4. In VS code terminal, navigate to the marisco data folder

    ```
    cd /home/marisco/downloads/marisco/_data/accdb/opar_data
    ```

5. Unzip MORS_ENVIRONMENT.zip 

    ```
    unzip MORS_ENVIRONMENT.zip 
    ```

6. Run preprocess.sh to generate the required data files

    ```
    ./preprocess.sh MORS_ENVIRONMENT.zip
    ````
7. Content of 'preprocess.sh' script.
    ```
    #!/bin/bash

    # Example of use: ./preprocess.sh MORS_ENVIRONMENT.zip
    unzip $1
	dbname=$(ls *.accdb *.mdb)
    mkdir csv
    for table in $(mdb-tables -1 "$dbname"); do
        echo "Export table $table"
        mdb-export "$dbname" "$table" > "csv/$table.csv"
    done```

## Packages import

In [8]:
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [9]:
#| export
import pandas as pd # Python package that provides fast, flexible, and expressive data structures.
import numpy as np
from tqdm import tqdm # Python Progress Bar Library
from functools import partial # Function which Return a new partial object which when called will behave like func called with the positional arguments args and keyword arguments keywords
import fastcore.all as fc # package that brings fastcore functionality, see https://fastcore.fast.ai/.
from pathlib import Path # This module offers classes representing filesystem paths
from dataclasses import asdict
import re # provides regular expression matching operations

from marisco.utils import (has_valid_varname, match_maris_lut, Match)
from marisco.callbacks import (Callback, Transformer, EncodeTimeCB, SanitizeLonLatCB)
from marisco.metadata import (GlobAttrsFeeder, BboxCB, DepthRangeCB, TimeRangeCB, ZoteroCB, KeyValuePairCB)
from marisco.configs import (nc_tpl_path, cfg, cache_path, cdl_cfg, Enums, lut_path,nuc_lut_path,
                             species_lut_path, bodyparts_lut_path, unit_lut_path, detection_limit_lut_path)
from marisco.serializers import NetCDFEncoder

from collections.abc import Callable
import warnings
from marisco.netcdf_to_csv import (LookupTimeFromEncodedTime, GetSampleTypeCB,
                                   LookupNuclideByIdCB, ConvertLonLatCB, LookupUnitByIdCB,
                                   LookupValueTypeByIdCB, LookupSpeciesByIdCB, 
                                   LookupBodypartByIdCB, LookupSedimentTypeByIdCB)                                  
from marisco.serializers import OpenRefineCsvEncoder

In [10]:
warnings.filterwarnings('ignore')

***

##  MARIS NetCDF 
When MARISCO is installed, it uses `cdl.toml` to create the `maris-template.nc`, which acts as a standardized template for MARIS NetCDF files. The `cdl.toml` is a configuration file listing all the variables allowed in the NetCDF4 files. The contents of the cdl.toml can be retrieved with the function `cdl_cfg()`.  

Retrieving the keys of the `cdl_cfg()`.

In [11]:
print (cdl_cfg()['vars'].keys())

dict_keys(['defaults', 'bio', 'sed', 'suffixes'])


Printing the contents of all keys

In [12]:
print (cdl_cfg()['vars']['defaults'].keys())
print (cdl_cfg()['vars']['bio'].keys())
print (cdl_cfg()['vars']['sed'].keys())
print (cdl_cfg()['vars']['suffixes'].keys())

dict_keys(['data_provider_sample_id', 'lon', 'lat', 'smp_depth', 'tot_depth', 'time', 'area', 'sample_notes', 'measurement_notes'])
dict_keys(['bio_group', 'species', 'body_part'])
dict_keys(['sed_type'])
dict_keys(['uncertainty', 'detection_limit', 'volume', 'salinity', 'temperature', 'filtered', 'counting_method', 'sampling_method', 'preparation_method', 'unit'])


***

## MARIS Open Refine 

Currently, updates to the MARIS database are facilitated through a standardized CSV file using Open Refine. Description of the variables included in this CSV file are provided at [Maris](https://maris.iaea.org/help/1132).

Here we define the fname_in and fname_out variables. These variables are paths which are defined as relative paths. These paths are relative to 
the current working directory. Note that fname_in refers to the csv folder that contains the OSPAR data. fname_out defines the path and filename for the NetCDF output.

***

## MARIS Open Refine CSV & MARIS NetCDF variable relationship. 

The table below lists the MARIS variables in both MARIS Open Refine and MARIS NetCDF formats. Each variable's presence in both formats for the seawater (``sea``), biota (``bio``), and sediment (``sed``) groups is indicated with a checkmark (``✓``).


<style>
  table {
    width: 100%;
    border-collapse: collapse
  }

  td,
  th {
    border: 1px solid #000;
    padding: 5px;
    text-align: center
  }

  th {
    background-color: #f2f2f2
  }

  .open-refine {
    background-color: #fff;
    color: black;
    text-align: center

  }

  .netcdf {
    background-color: #e6e6e6;
    color: black;
    text-align: center
  }
</style>
<table>
  <thead>
    <tr>
      <th class="open-refine">Open Refine Variables</th>
      <th class="open-refine">sea</th>
      <th class="open-refine">bio</th>
      <th class="open-refine">sed</th>
      <th class="netcdf">sea</th>
      <th class="netcdf">bio</th>
      <th class="netcdf">sed</th>
      <th class="netcdf">NetCDF Variables</th>
    </tr>
  </thead>
  <tbody>
    <tr>
      <td class="open-refine">Sample type</td>
      <td class="open-refine">✓</td>
      <td class="open-refine">✓</td>
      <td class="open-refine">✓</td>
      <td class="netcdf">✓</td>
      <td class="netcdf">✓</td>
      <td class="netcdf">✓</td>
      <td class="netcdf">*Included as netcdf.group*</td>
    </tr>
    <tr>
      <td class="open-refine">Latitude decimal</td>
      <td class="open-refine">✓</td>
      <td class="open-refine">✓</td>
      <td class="open-refine">✓</td>
      <td class="netcdf">✓</td>
      <td class="netcdf">✓</td>
      <td class="netcdf">✓</td>
      <td class="netcdf">lat</td>
    </tr>
    <tr>
      <td class="open-refine">Longitude decimal</td>
      <td class="open-refine">✓</td>
      <td class="open-refine">✓</td>
      <td class="open-refine">✓</td>
      <td class="netcdf">✓</td>
      <td class="netcdf">✓</td>
      <td class="netcdf">✓</td>
      <td class="netcdf">lon</td>
    </tr>
    <tr>
      <td class="open-refine">Sampling start date</td>
      <td class="open-refine">✓</td>
      <td class="open-refine">✓</td>
      <td class="open-refine">✓</td>
      <td class="netcdf">✓</td>
      <td class="netcdf">✓</td>
      <td class="netcdf">✓</td>
      <td class="netcdf">time</td>
    </tr>
    <tr>
      <td class="open-refine">Sampling start time</td>
      <td class="open-refine">✓</td>
      <td class="open-refine">✓</td>
      <td class="open-refine">✓</td>
      <td class="netcdf">✓</td>
      <td class="netcdf">✓</td>
      <td class="netcdf">✓</td>
      <td class="netcdf">time</td>
    </tr>
    <tr>
      <td class="open-refine">Sampling end date</td>
      <td class="open-refine">✓</td>
      <td class="open-refine">✓</td>
      <td class="open-refine">✓</td>
      <td class="netcdf"></td>
      <td class="netcdf"></td>
      <td class="netcdf"></td>
      <td class="netcdf"></td>
    </tr>
    <tr>
      <td class="open-refine">Sampling end time</td>
      <td class="open-refine">✓</td>
      <td class="open-refine">✓</td>
      <td class="open-refine">✓</td>
      <td class="netcdf"></td>
      <td class="netcdf"></td>
      <td class="netcdf"></td>
      <td class="netcdf"></td>
    </tr>
    <tr>
      <td class="open-refine">Nuclide</td>
      <td class="open-refine">✓</td>
      <td class="open-refine">✓</td>
      <td class="open-refine">✓</td>
      <td class="netcdf">✓</td>
      <td class="netcdf">✓</td>
      <td class="netcdf">✓</td>
      <td class="netcdf">nuclide</td>
    </tr>
    <tr>
      <td class="open-refine">Value type</td>
      <td class="open-refine">✓</td>
      <td class="open-refine">✓</td>
      <td class="open-refine">✓</td>
      <td class="netcdf">✓</td>
      <td class="netcdf">✓</td>
      <td class="netcdf">✓</td>
      <td class="netcdf">detection_limit</td>
    </tr>
    <tr>
      <td class="open-refine">Unit</td>
      <td class="open-refine">✓</td>
      <td class="open-refine">✓</td>
      <td class="open-refine">✓</td>
      <td class="netcdf">✓</td>
      <td class="netcdf">✓</td>
      <td class="netcdf">✓</td>
      <td class="netcdf">unit</td>
    </tr>
    <tr>
      <td class="open-refine">Activity or MDA</td>
      <td class="open-refine">✓</td>
      <td class="open-refine">✓</td>
      <td class="open-refine">✓</td>
      <td class="netcdf">✓</td>
      <td class="netcdf">✓</td>
      <td class="netcdf">✓</td>
      <td class="netcdf">value</td>
    </tr>
    <tr>
      <td class="open-refine">Uncertainty</td>
      <td class="open-refine">✓</td>
      <td class="open-refine">✓</td>
      <td class="open-refine">✓</td>
      <td class="netcdf">✓</td>
      <td class="netcdf">✓</td>
      <td class="netcdf">✓</td>
      <td class="netcdf">uncertainty</td>
    </tr>
    <tr>
      <td class="open-refine">Sampling depth</td>
      <td class="open-refine">✓</td>
      <td class="open-refine"></td>
      <td class="open-refine">✓</td>
      <td class="netcdf">✓</td>
      <td class="netcdf">✓</td>
      <td class="netcdf">✓</td>
      <td class="netcdf">smp_depth</td>
    </tr>
    <tr>
      <td class="open-refine">Top</td>
      <td class="open-refine"></td>
      <td class="open-refine"></td>
      <td class="open-refine">✓</td>
      <td class="netcdf"></td>
      <td class="netcdf"></td>
      <td class="netcdf"></td>
      <td class="netcdf"></td>
    </tr>
    <tr>
      <td class="open-refine">Bottom</td>
      <td class="open-refine"></td>
      <td class="open-refine"></td>
      <td class="open-refine">✓</td>
      <td class="netcdf"></td>
      <td class="netcdf"></td>
      <td class="netcdf"></td>
      <td class="netcdf"></td>
    </tr>
    <tr>
      <td class="open-refine">Species</td>
      <td class="open-refine"></td>
      <td class="open-refine">✓</td>
      <td class="open-refine"></td>
      <td class="netcdf"></td>
      <td class="netcdf">✓</td>
      <td class="netcdf"></td>
      <td class="netcdf">species</td>
    </tr>
    <tr>
      <td class="open-refine">Body part</td>
      <td class="open-refine"></td>
      <td class="open-refine">✓</td>
      <td class="open-refine"></td>
      <td class="netcdf"></td>
      <td class="netcdf">✓</td>
      <td class="netcdf"></td>
      <td class="netcdf">body_part</td>
    </tr>
    <tr>
      <td class="open-refine"></td>
      <td class="open-refine"></td>
      <td class="open-refine"></td>
      <td class="open-refine"></td>
      <td class="netcdf"></td>
      <td class="netcdf">✓</td>
      <td class="netcdf"></td>
      <td class="netcdf">bio_group</td>
    </tr>
    <tr>
      <td class="open-refine">Salinity</td>
      <td class="open-refine">✓</td>
      <td class="open-refine"></td>
      <td class="open-refine"></td>
      <td class="netcdf">✓</td>
      <td class="netcdf">✓</td>
      <td class="netcdf">✓</td>
      <td class="netcdf">salinity</td>
    </tr>
    <tr>
      <td class="open-refine">Temperature</td>
      <td class="open-refine">✓</td>
      <td class="open-refine"></td>
      <td class="open-refine"></td>
      <td class="netcdf">✓</td>
      <td class="netcdf">✓</td>
      <td class="netcdf">✓</td>
      <td class="netcdf">temperature</td>
    </tr>
    <tr>
      <td class="open-refine">Filtered</td>
      <td class="open-refine">✓</td>
      <td class="open-refine"></td>
      <td class="open-refine"></td>
      <td class="netcdf">✓</td>
      <td class="netcdf">✓</td>
      <td class="netcdf">✓</td>
      <td class="netcdf">filtered</td>
    </tr>
    <tr>
      <td class="open-refine">Mesh size</td>
      <td class="open-refine">✓</td>
      <td class="open-refine"></td>
      <td class="open-refine"></td>
      <td class="netcdf"></td>
      <td class="netcdf"></td>
      <td class="netcdf"></td>
      <td class="netcdf"></td>
    </tr>
    <tr>
      <td class="open-refine">Quality flag</td>
      <td class="open-refine">✓</td>
      <td class="open-refine">✓</td>
      <td class="open-refine">✓</td>
      <td class="netcdf"></td>
      <td class="netcdf"></td>
      <td class="netcdf"></td>
      <td class="netcdf"></td>
    </tr>
    <tr>
      <td class="open-refine">Sediment type</td>
      <td class="open-refine"></td>
      <td class="open-refine"></td>
      <td class="open-refine">✓</td>
      <td class="netcdf"></td>
      <td class="netcdf"></td>
      <td class="netcdf">✓</td>
      <td class="netcdf">sed_type</td>
    </tr>
    <tr>
      <td class="open-refine">Dry weight</td>
      <td class="open-refine"></td>
      <td class="open-refine">✓</td>
      <td class="open-refine">✓</td>
      <td class="netcdf"></td>
      <td class="netcdf"></td>
      <td class="netcdf"></td>
      <td class="netcdf"></td>
    </tr>
    <tr>
      <td class="open-refine">Wet weight</td>
      <td class="open-refine"></td>
      <td class="open-refine">✓</td>
      <td class="open-refine">✓</td>
      <td class="netcdf"></td>
      <td class="netcdf"></td>
      <td class="netcdf"></td>
      <td class="netcdf"></td>
    </tr>
    <tr>
      <td class="open-refine">Dry/wet ratio</td>
      <td class="open-refine"></td>
      <td class="open-refine">✓</td>
      <td class="open-refine">✓</td>
      <td class="netcdf"></td>
      <td class="netcdf"></td>
      <td class="netcdf"></td>
      <td class="netcdf"></td>
    </tr>
    <tr>
      <td class="open-refine">Station ID</td>
      <td class="open-refine">✓</td>
      <td class="open-refine">✓</td>
      <td class="open-refine">✓</td>
      <td class="netcdf"></td>
      <td class="netcdf"></td>
      <td class="netcdf"></td>
      <td class="netcdf"></td>
    </tr>
    <tr>
      <td class="open-refine">Sample ID</td>
      <td class="open-refine">✓</td>
      <td class="open-refine">✓</td>
      <td class="open-refine">✓</td>
      <td class="netcdf">✓</td>
      <td class="netcdf">✓</td>
      <td class="netcdf">✓</td>
      <td class="netcdf">data_provider_sample_id</td>
    </tr>
    <tr>
      <td class="open-refine">Total depth</td>
      <td class="open-refine">✓</td>
      <td class="open-refine"></td>
      <td class="open-refine"></td>
      <td class="netcdf">✓</td>
      <td class="netcdf">✓</td>
      <td class="netcdf">✓</td>
      <td class="netcdf">tot_depth</td>
    </tr>
    <tr>
      <td class="open-refine">Profile or transect ID</td>
      <td class="open-refine">✓</td>
      <td class="open-refine">✓</td>
      <td class="open-refine">✓</td>
      <td class="netcdf"></td>
      <td class="netcdf"></td>
      <td class="netcdf"></td>
      <td class="netcdf"></td>
    </tr>
    <tr>
      <td class="open-refine">Sampling method</td>
      <td class="open-refine">✓</td>
      <td class="open-refine">✓</td>
      <td class="open-refine">✓</td>
      <td class="netcdf">✓</td>
      <td class="netcdf">✓</td>
      <td class="netcdf">✓</td>
      <td class="netcdf">sampling_method</td>
    </tr>
    <tr>
      <td class="open-refine">Preparation method</td>
      <td class="open-refine">✓</td>
      <td class="open-refine">✓</td>
      <td class="open-refine">✓</td>
      <td class="netcdf">✓</td>
      <td class="netcdf">✓</td>
      <td class="netcdf">✓</td>
      <td class="netcdf">preparation_method</td>
    </tr>
    <tr>
      <td class="open-refine">Drying method</td>
      <td class="open-refine"></td>
      <td class="open-refine">✓</td>
      <td class="open-refine">✓</td>
      <td class="netcdf"></td>
      <td class="netcdf"></td>
      <td class="netcdf"></td>
      <td class="netcdf"></td>
    </tr>
    <tr>
      <td class="open-refine">Counting method</td>
      <td class="open-refine">✓</td>
      <td class="open-refine">✓</td>
      <td class="open-refine">✓</td>
      <td class="netcdf">✓</td>
      <td class="netcdf">✓</td>
      <td class="netcdf">✓</td>
      <td class="netcdf">counting_method</td>
    </tr>
    <tr>
      <td class="open-refine">Sample notes</td>
      <td class="open-refine">✓</td>
      <td class="open-refine">✓</td>
      <td class="open-refine">✓</td>
      <td class="netcdf">✓</td>
      <td class="netcdf">✓</td>
      <td class="netcdf">✓</td>
      <td class="netcdf">sample_notes<sup>*1</sup></td>
    </tr>
    <tr>
      <td class="open-refine">Measurement notes</td>
      <td class="open-refine">✓</td>
      <td class="open-refine">✓</td>
      <td class="open-refine">✓</td>
      <td class="netcdf">✓</td>
      <td class="netcdf">✓</td>
      <td class="netcdf">✓</td>
      <td class="netcdf">measurement_notes<sup>*1</sup></td>
    </tr>
  </tbody>
</table>

<sup>*1</sup> The MARIS NetCDF does not currently support strings of variable length (i.e. vlen string data type).

***

## Define variables

1. **fname_in** - is the path to the folder containing the HELCOM data in CSV format. The path can be defined as a relative path. 

2. **fname_out_nc** - is the path and filename for the NetCDF output.The path can be defined as a relative path. 

3. **zotero_key** - is used to retrieve attributes related to the dataset from [Zotero](https://www.zotero.org/). The MARIS datasets include a [library](https://maris.iaea.org/datasets) available on [Zotero](https://www.zotero.org/groups/2432820/maris/library). 
4. **ref_id** - ref_id is used in the MARIS dataset to link to the reference source. 


In [13]:
# | export
fname_in = '../../_data/accdb/ospar/csv'
fname_out = '../../_data/output/191-OSPAR-2024.nc'
fname_out_csv = '../../_data/output/191-OSPAR-2024.csv'
zotero_key ='LQRA4MMK'
ref_id = 191

***

## Utils

In [14]:
#| export
def load_data(src_dir):
    "Load OSPAR data and return them as an individual dataframe by sample type"
    '''
    Load data from the measurement file found in the src_dir (i.e. fname_in).
    Returns a dictionary of pandas' dataframes. The key to the dictionary is 
    the sample type (i.e lut_smp_type)
    '''    
    dfs = {}
    lut_smp_type = {'Seawater data': 'seawater', 'Biota data': 'biota'}
    for k, v in lut_smp_type.items():
        fname_meas = k + '.csv' # measurement (i.e. radioactivity) information and sample information     
        df = pd.read_csv(Path(src_dir)/fname_meas, encoding='unicode_escape')
        dfs[v] = df
    return dfs

***

## Load data

`dfs` is a dictionary of dataframes created from the Helcom dataset located at the path `fname_in`. The data to be included in each dataframe is sorted by sample type. Each dictionary is defined with a key equal to the sample type. 

In [15]:
#|eval: false
dfs = load_data(fname_in)
print(dfs.keys())
print(f"Seawater cols: {dfs['seawater'].columns}")
print(f"Biota cols: {dfs['biota'].columns}")

dict_keys(['seawater', 'biota'])
Seawater cols: Index(['ID', 'Contracting Party', 'RSC Sub-division', 'Station ID',
       'Sample ID', 'LatD', 'LatM', 'LatS', 'LatDir', 'LongD', 'LongM',
       'LongS', 'LongDir', 'Sample type', 'Sampling depth', 'Sampling date',
       'Nuclide', 'Value type', 'Activity or MDA', 'Uncertainty', 'Unit',
       'Data provider', 'Measurement Comment', 'Sample Comment',
       'Reference Comment'],
      dtype='object')
Biota cols: Index(['ID', 'Contracting Party', 'RSC Sub-division', 'Station ID',
       'Sample ID', 'LatD', 'LatM', 'LatS', 'LatDir', 'LongD', 'LongM',
       'LongS', 'LongDir', 'Sample type', 'Biological group', 'Species',
       'Body Part', 'Sampling date', 'Nuclide', 'Value type',
       'Activity or MDA', 'Uncertainty', 'Unit', 'Data provider',
       'Measurement Comment', 'Sample Comment', 'Reference Comment'],
      dtype='object')


Show the structure of the `seawater` dataframe:

In [16]:
#|eval: false
dfs['seawater'].head()

Unnamed: 0,ID,Contracting Party,RSC Sub-division,Station ID,Sample ID,LatD,LatM,LatS,LatDir,LongD,...,Sampling date,Nuclide,Value type,Activity or MDA,Uncertainty,Unit,Data provider,Measurement Comment,Sample Comment,Reference Comment
0,1,Belgium,8.0,Belgica-W01,WNZ 01,51.0,22.0,31.0,N,3.0,...,27/01/2010,137Cs,<,0.2,,Bq/l,SCKCEN,,,
1,2,Belgium,8.0,Belgica-W02,WNZ 02,51.0,13.0,25.0,N,2.0,...,27/01/2010,137Cs,<,0.27,,Bq/l,SCKCEN,,,
2,3,Belgium,8.0,Belgica-W03,WNZ 03,51.0,11.0,4.0,N,2.0,...,27/01/2010,137Cs,<,0.26,,Bq/l,SCKCEN,,,
3,4,Belgium,8.0,Belgica-W04,WNZ 04,51.0,25.0,13.0,N,3.0,...,27/01/2010,137Cs,<,0.25,,Bq/l,SCKCEN,,,
4,5,Belgium,8.0,Belgica-W05,WNZ 05,51.0,24.0,58.0,N,2.0,...,26/01/2010,137Cs,<,0.2,,Bq/l,SCKCEN,,,


Show the structure of the `biota` dataframe:

In [17]:
#|eval: false
dfs['biota'].head()

Unnamed: 0,ID,Contracting Party,RSC Sub-division,Station ID,Sample ID,LatD,LatM,LatS,LatDir,LongD,...,Sampling date,Nuclide,Value type,Activity or MDA,Uncertainty,Unit,Data provider,Measurement Comment,Sample Comment,Reference Comment
0,96793,United Kingdom,5,Hunterston,2200086,55,43,31.0,N,4,...,31/12/2021,"239,240Pu",=,0.351,0.066,Bq/kg f.w.,SEPA-Scottish Environment Protection Agency,,"PLZ. Annual bulk of 2 samples, representative ...",
1,96822,United Kingdom,6,Chapelcross,2200081,54,58,8.0,N,3,...,31/12/2021,99Tc,=,39.0,15.0,Bq/kg f.w.,SEPA-Scottish Environment Protection Agency,,PLZ,
2,96823,United Kingdom,7,Dounreay,2200093,58,33,57.0,N,3,...,31/12/2021,"239,240Pu",=,0.0938,0.018,Bq/kg f.w.,SEPA-Scottish Environment Protection Agency,,"Sandside Bay. Annual bulk of 4 samples, repre...",
3,96824,United Kingdom,7,Dounreay,2200089,58,37,7.0,N,3,...,31/12/2021,"239,240Pu",=,1.54,0.31,Bq/kg f.w.,SEPA-Scottish Environment Protection Agency,,"Brims Ness. Annual bulk of 4 samples, represe...",
4,96857,United Kingdom,10,Torness,2100074,55,57,53.0,N,2,...,31/12/2021,99Tc,=,16.0,6.0,Bq/kg f.w.,SEPA-Scottish Environment Protection Agency,,"Thornton Loch. Annual bulk of 2 samples, repre...",


***

## Data transformation pipeline for NetCDF.

### Data transformation pipeline utils

``CompareDfsAndTfm`` compares the original dataframes to the transformed dataframe. A dictionary of dataframes, ``tfm.dfs_dropped``, is created to include the data present in the original dataset but absent from the transformed data. ``tfm.compare_stats`` provides a quick overview of the number of rows in both the original dataframes and the transformed dataframe.

In [18]:
# | export
class CompareDfsAndTfm(Callback):
    "Create a dfs of dropped data. Data included in the DFS not in the TFM"
    def __init__(self, dfs_compare):
        fc.store_attr()

    def __call__(self, tfm):
        tfm.dfs_dropped={}
        tfm.compare_stats={}
        for grp in tfm.dfs.keys():
           
            # get the index values in dfs (i.e. dfs_compare) not in tfm.dfs. 
            index_diff=self.dfs_compare[grp].index.difference(tfm.dfs[grp].index)
            tfm.dfs_dropped[grp] = self.dfs_compare[grp].loc[index_diff]

            tfm.compare_stats[grp]= {'Number of rows in dfs :' : len(self.dfs_compare[grp].index),
                                     'Number of rows in tfm.dfs:' : len(tfm.dfs[grp].index),
                                     'Number of dropped rows:' : len(tfm.dfs_dropped[grp].index),
                                     'Number of rows in tfm.dfs + Number of dropped rows:' : len(tfm.dfs[grp].index) + len(tfm.dfs_dropped[grp].index)
                                    }

***

### Normalize nuclide names

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;*NetCDF4 format variable: ``nuclide``.*

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;*Open Refine format variable: ``Nuclide``.*

#### Remap OSPAR nuclide names to MARIS nuclide names

The ``maris-template.nc`` file, generated from ``cdl.toml`` during the installation of the Marisco package, specifies the nuclides allowed in the MARIS NetCDF file. Here, we define a function get_unique_nuclides() that compiles a list of unique nuclides from each dataframe within the dictionary of dataframes ``dfs``, which were created from the OSPAR data.

In [19]:
#| export
def get_unique_nuclides(dfs, name= 'NUCLIDE'):
    "Get list of unique radionuclide types measured across samples."
    nuclides = []
    for k in dfs.keys():
        nuclides += dfs[k][name].unique().tolist()
    # remove duplicates from nuclides list.
    nuclides=list(set(nuclides))
    return nuclides

Return the unique nuclides in the ``dfs``. 

In [20]:
get_unique_nuclides(dfs, name= 'Nuclide')

[nan,
 'CS-137',
 'Cs-137',
 '239, 240 Pu',
 '238Pu',
 '241Am',
 'Cs-134',
 '210Po',
 '239,240Pu',
 '99Tc',
 '210Pb',
 '137Cs',
 '228Ra',
 'RA-226',
 'RA-228',
 'CS-134',
 '3H',
 '226Ra']

Check if the unique nuclides in the ``dfs``. are valid in the ``maris-template.nc`` file.

In [21]:
has_valid_varname(get_unique_nuclides(dfs,name= 'Nuclide'), nc_tpl_path())

"nan" variable name not found in MARIS CDL
"CS-137" variable name not found in MARIS CDL
"Cs-137" variable name not found in MARIS CDL
"239, 240 Pu" variable name not found in MARIS CDL
"238Pu" variable name not found in MARIS CDL
"241Am" variable name not found in MARIS CDL
"Cs-134" variable name not found in MARIS CDL
"210Po" variable name not found in MARIS CDL
"239,240Pu" variable name not found in MARIS CDL
"99Tc" variable name not found in MARIS CDL
"210Pb" variable name not found in MARIS CDL
"137Cs" variable name not found in MARIS CDL
"228Ra" variable name not found in MARIS CDL
"RA-226" variable name not found in MARIS CDL
"RA-228" variable name not found in MARIS CDL
"CS-134" variable name not found in MARIS CDL
"3H" variable name not found in MARIS CDL
"226Ra" variable name not found in MARIS CDL


False

Return the Nuclide names defined in the ``maris-template.nc``.

In [22]:
#| false
def get_nuc_lut():
    df = pd.read_excel(nuc_lut_path())
    return df[['nuclide_id','nc_name']]
get_nuc_lut()

Unnamed: 0,nuclide_id,nc_name
0,-1,
1,0,
2,1,h3
3,2,be7
4,3,c14
...,...,...
129,138,tbeta40k
130,139,fe55
131,140,ce144_pr144_tot
132,141,pu240_pu239_ratio


The format of many nuclide names in the ``OSPAR`` dataset do not match those listed in the `maris-template.nc`. To address this, we have created a lookup table, ``varnames_lut_updates``, which will be used to correct the incompatible nuclide names within the dictionary of dataframes (i.e., ``dfs``) to ensure they are consistent with the maris-template.nc.

In [23]:
#| export

varnames_lut_updates = {
    '3H' : 'h3',
    'Cs-134' : 'cs134',
    '210Po' : 'po210',
    '99Tc' : 'tc99',
    '239, 240 Pu' : 'pu239_240_tot',
    '226Ra': 'ra226',
    'CS-134': 'cs134',
    '228Ra': 'ra228',
    'RA-226': 'ra226',
    '238Pu': 'pu238',
    '239,240Pu': 'pu239_240_tot',
    '241Am': 'am241',
    '137Cs': 'cs137',
    'CS-137': 'cs137',
    'RA-228': 'ra228',
    'Cs-137': 'cs137',
    '210Pb': 'pb210'
        }

Function `get_varnames_lut` returns a dictionary of nuclide names. This dictionary includes the `NUCLIDE` names from the dataframes in dfs, along with corrections specified in `varnames_lut_updates`.

In [24]:
#| export
def get_varnames_lut(dfs, lut=varnames_lut_updates):
    lut = {n: n for n in set(get_unique_nuclides(dfs, name='Nuclide'))}
    lut.update(varnames_lut_updates)
    return lut

Create a callback that remaps the nuclide names in the dataframes within dfs to the updated names in `varnames_lut_updates`.

In [25]:
# | export
class RemapRdnNameCB(Callback):
    "Remap to MARIS radionuclide names."
    def __init__(self,
                 fn_lut=partial(get_varnames_lut, lut=varnames_lut_updates)):
        fc.store_attr()

    def __call__(self, tfm):
        lut = self.fn_lut(tfm.dfs)
        for grp in tfm.dfs.keys():
            tfm.dfs[grp]['Nuclide'].replace(lut, inplace=True)
            
            # Keep rows where 'Nuclide' is not nan. 
            tfm.dfs[grp] = tfm.dfs[grp][tfm.dfs[grp][['Nuclide']].notna().any(axis='columns')]


Apply the transformer for callbacks `LowerStripRdnNameCB` and `RemapRdnNameCB`. Then, print the unique nuclides for each dataframe in the dictionary dfs.

In [26]:
#|eval: false
dfs = load_data(fname_in)
tfm = Transformer(dfs, cbs=[RemapRdnNameCB(),
                            CompareDfsAndTfm(dfs)
                            ])
tfm()

print(pd.DataFrame.from_dict(tfm.compare_stats) , '\n')
print('seawater nuclides: ')
print(tfm.dfs['seawater']['Nuclide'].unique())
print('biota nuclides: ')
print(tfm.dfs['biota']['Nuclide'].unique())

                                                    seawater  biota
Number of rows in dfs :                                18856  15314
Number of rows in tfm.dfs:                             18310  15314
Number of dropped rows:                                  546      0
Number of rows in tfm.dfs + Number of dropped r...     18856  15314 

seawater nuclides: 
['cs137' 'pu239_240_tot' 'ra226' 'ra228' 'tc99' 'h3' 'po210' 'pb210']
biota nuclides: 
['pu239_240_tot' 'tc99' 'cs137' 'ra226' 'ra228' 'pu238' 'am241' 'cs134'
 'h3' 'pb210' 'po210']


After applying correction to the nuclide names check that all nuclide in the dictionary of dataframes are valid. Returns `True` if all are valid.

In [27]:
#|eval: false
has_valid_varname(get_unique_nuclides(tfm.dfs, name='Nuclide'), nc_tpl_path())

True

***

### Parse time

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;*NetCDF4 format variable: `time`.*

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;*Open Refine format variables: `Sampling start date` and `Sampling start time`.*

Create a callback that remaps the time format in the dictionary of dataframes (i.e. `%m/%d/%y %H:%M:%S`):

In [28]:
#| export
class ParseTimeCB(Callback):
    def __call__(self, tfm):
        for k in tfm.dfs.keys():
            # drop nan values
            tfm.dfs[k] = tfm.dfs[k][tfm.dfs[k]['Sampling date'].notna()]            
            tfm.dfs[k]['time'] = pd.to_datetime(tfm.dfs[k]['Sampling date'], 
                                                format='%d/%m/%Y')
                

Apply the transformer for callbacks `RemapRdnNameCB` and `ParseTimeCB`. Then, print the `time` data for `seawater`.

In [29]:
#|eval: false
dfs = load_data(fname_in)
tfm = Transformer(dfs, cbs=[RemapRdnNameCB(),
                            ParseTimeCB()])
tfm()
print(tfm.dfs['seawater']['time'][:5])

0   2010-01-27
1   2010-01-27
2   2010-01-27
3   2010-01-27
4   2010-01-26
Name: time, dtype: datetime64[ns]


***

### Encode time (seconds since ...)

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;*NetCDF4 format variable: ``time``*

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;*Open Refine format variables: `Sampling start date` and `Sampling start time`*

`EncodeTimeCB` converts the HELCOM `time` format to the MARIS `time` format.

In [30]:
#|eval: false
dfs = load_data(fname_in)
tfm = Transformer(dfs, cbs=[RemapRdnNameCB(),
                            ParseTimeCB(),
                            EncodeTimeCB(cfg(), verbose = True),
                            CompareDfsAndTfm(dfs)
                            ])
tfm()
print(pd.DataFrame.from_dict(tfm.compare_stats) , '\n')

                                                    seawater  biota
Number of rows in dfs :                                18856  15314
Number of rows in tfm.dfs:                             18308  15314
Number of dropped rows:                                  548      0
Number of rows in tfm.dfs + Number of dropped r...     18856  15314 



***

### Normalize uncertainty

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;*NetCDF4 format variable: ``uncertainty``.*

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;*Open Refine format variable: `Uncertainty`.*

For each sample type in the OSPAR dataset, the uncertainty is given as an expanded uncertainty (k=2). See [OSPAR reporting guidelines](https://mcc.jrc.ec.europa.eu/documents/OSPAR/Guidelines_forestimationof_a_%20measurefor_uncertainty_in_OSPARmonitoring.pdf). The OSPAR uncertainty is normalized to standard uncertainty (k=1).   

Function `unc_exp2stan` coverts uncertainty from expanded uncertainty (k=2) to standard uncertainty (k=1).

In [31]:
#| export
# Make measurement and uncertainty units consistent
def unc_exp2stan(df, unc_col):
    k = 2 
    return df.apply(lambda row: row[unc_col]/k, axis=1)

NormalizeUncCB callback normalizes the uncertainty by converting from expanded uncertainty to standard uncertainty. 

In [32]:
#| export
class NormalizeUncCB(Callback):
    "Convert from expanded uncertainty to standard uncertainty"
    def __init__(self, 
                 fn_convert_unc=unc_exp2stan):
        fc.store_attr()

    def __call__(self, tfm):
        for grp in tfm.dfs.keys():
            tfm.dfs[grp]['Uncertainty'] = self.fn_convert_unc(tfm.dfs[grp], 'Uncertainty')

Apply the transformer for callbacks `RemapRdnNameCB`, `ParseTimeCB` and `NormalizeUncCB()`. Then, print the value (i.e. activity per unit ) and standard uncertainty for each sample type.

In [33]:
#|eval: false
dfs = load_data(fname_in)
tfm = Transformer(dfs, cbs=[
                            RemapRdnNameCB(),
                            ParseTimeCB(),
                            EncodeTimeCB(cfg()),                             
                            NormalizeUncCB()])
tfm()
print(tfm.dfs['seawater'][['Activity or MDA', 'Uncertainty']][:5])
print(tfm.dfs['biota'][['Activity or MDA', 'Uncertainty']][:5])


   Activity or MDA  Uncertainty
0             0.20          NaN
1             0.27          NaN
2             0.26          NaN
3             0.25          NaN
4             0.20          NaN
   Activity or MDA  Uncertainty
0           0.3510        0.033
1          39.0000        7.500
2           0.0938        0.009
3           1.5400        0.155
4          16.0000        3.000


***

### Lookup transformations 

#### Lookup MARIS function 

`get_maris_lut` performs a lookup of data provided in `data_provider_lut` against the MARIS lookup (`maris_lut`) using a fuzzy matching algorithm based on Levenshtein distance. The `get_maris_lut` is used to correct the HELCOM data to a standard format for MARIS. 

In [34]:
#|export
def get_maris_lut(df_biota : pd.DataFrame,
                  fname_cache : str, # For instance 'species_ospar.pkl'
                  data_provider_name_col : str, # Data provider lookup column name of interest
                  maris_lut : Callable, # Function retrieving MARIS source lookup table
                  maris_id : str, # Id of MARIS lookup table nomenclature item to match
                  maris_name : str, # Name of MARIS lookup table nomenclature item to match
                  unmatched_fixes = {},
                  as_dataframe = False,
                  overwrite = False
                 ):
    fname_cache = cache_path() / fname_cache
    lut = {}
    maris_lut = maris_lut()

    if overwrite or (not fname_cache.exists()):        
        df = pd.DataFrame({data_provider_name_col : df_biota[data_provider_name_col].unique()})
        
        for _, row in tqdm(df.iterrows(), total=len(df)):
            # Fix if unmatched
            has_to_be_fixed = row[data_provider_name_col] in unmatched_fixes       
            name_to_match = unmatched_fixes[row[data_provider_name_col]] if has_to_be_fixed else row[data_provider_name_col]
            
            # Match
            result = match_maris_lut(maris_lut, str(name_to_match), maris_id, maris_name)
            match = Match(result.iloc[0][maris_id], result.iloc[0][maris_name], 
                        row[data_provider_name_col], result.iloc[0]['score'])
            lut[row[data_provider_name_col]] = match
            
        fc.save_pickle(fname_cache, lut)
    else:
        lut = fc.load_pickle(fname_cache)

    if as_dataframe:
        df_lut = pd.DataFrame({k: asdict(v) for k, v in lut.items()}).transpose()
        df_lut.index.name = 'source_id'
        return df_lut.sort_values(by='match_score', ascending=False)
    else:
        return lut

***

#### Lookup : Biota species

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;*NetCDF4 format variable: ``species``.*

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;*Open Refine format variable: `Species`.*

Create `unmatched_fixes_biota_species` to correct the spelling of names that are unmatched in the ``OSPAR`` dataset. 

For now, leave unmatched empty and return uncorrected species lut. 

In [35]:
#|export
# key equals name in dfs['biota']. 
# value equals replacement name to use in match_maris_lut (i.e. name_to_match)
unmatched_fixes_biota_species = {}

In [36]:
#|eval: false
# ignore nan values for now. 
#tfm.dfs['biota']=tfm.dfs['biota'][tfm.dfs['biota']['Species'].notna()]
species_lut_df = get_maris_lut(df_biota=tfm.dfs['biota'], 
                                fname_cache='species_ospar.pkl', 
                                data_provider_name_col='Species',
                                maris_lut=species_lut_path,
                                maris_id='species_id',
                                maris_name='species',
                                unmatched_fixes=unmatched_fixes_biota_species,
                                as_dataframe=True,
                                overwrite=True)
# Show `maris_species_lut` where `match_type` is not a perfect match ( i.e. not equal 0).
species_lut_df[species_lut_df['match_score'] > 1]

  0%|          | 0/156 [00:00<?, ?it/s]

100%|██████████| 156/156 [00:28<00:00,  5.50it/s]


Unnamed: 0_level_0,matched_id,matched_maris_name,source_name,match_score
source_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
RHODYMENIA PSEUDOPALAMATA & PALMARIA PALMATA,1426,Lomentaria catenata,RHODYMENIA PSEUDOPALAMATA & PALMARIA PALMATA,31
"Mixture of green, red and brown algae",814,Mercenaria mercenaria,"Mixture of green, red and brown algae",26
Solea solea (S.vulgaris),161,Loligo vulgaris,Solea solea (S.vulgaris),12
SOLEA SOLEA (S.VULGARIS),161,Loligo vulgaris,SOLEA SOLEA (S.VULGARIS),12
CERASTODERMA (CARDIUM) EDULE,274,Cerastoderma edule,CERASTODERMA (CARDIUM) EDULE,10
Cerastoderma (Cardium) Edule,274,Cerastoderma edule,Cerastoderma (Cardium) Edule,10
MONODONTA LINEATA,1213,Ophiothrix lineata,MONODONTA LINEATA,9
NUCELLA LAPILLUS,363,Mugil cephalus,NUCELLA LAPILLUS,9
DICENTRARCHUS (MORONE) LABRAX,424,Dicentrarchus labrax,DICENTRARCHUS (MORONE) LABRAX,9
Pleuronectiformes [order],411,Pleuronectiformes,Pleuronectiformes [order],8


Update `unmatched_fixes_biota_species` to correct for entries that are unmatched in the ``OSPAR`` dataset. 

In [37]:
#|export
# LookupBiotaSpeciesCB filters 'Not available'. 
unmatched_fixes_biota_species = {'RHODYMENIA PSEUDOPALAMATA & PALMARIA PALMATA': 'Not available', # mix
 'Mixture of green, red and brown algae': 'Not available', #mix 
 'Solea solea (S.vulgaris)': 'Solea solea',
 'SOLEA SOLEA (S.VULGARIS)': 'Solea solea',
 'CERASTODERMA (CARDIUM) EDULE': 'Cerastoderma edule',
 'Cerastoderma (Cardium) Edule': 'Cerastoderma edule',
 'MONODONTA LINEATA': 'Phorcus lineatus',
 'NUCELLA LAPILLUS': 'Not available', # Droped. In worms 'Nucella lapillus (Linnaeus, 1758)', 
 'DICENTRARCHUS (MORONE) LABRAX': 'Dicentrarchus labrax',
 'Pleuronectiformes [order]': 'Pleuronectiformes',
 'RAJIDAE/BATOIDEA': 'Not available', #mix 
 'PALMARIA PALMATA': 'Not available', # Dropped. In worms 'Palmaria palmata (Linnaeus) F.Weber & D.Mohr, 1805',
 'Sepia spp.': 'Sepia',
 'Rhodymenia spp.': 'Rhodymenia',
 'unknown': 'Not available',
 'RAJA DIPTURUS BATIS': 'Dipturus batis',
 'Unknown' : 'Not available',
 'Flatfish' : 'Not available',
 'FUCUS SPP.' : 'FUCUS',
 'Patella sp.' : 'Patella',
 'Gadus sp.' : 'Gadus',
 'FUCUS spp' : 'FUCUS',
 'Tapes sp.' : 'Tapes',
 'Thunnus sp.' : 'Thunnus',
 'RHODYMENIA spp' : 'RHODYMENIA',
 'Fucus sp.' : 'Fucus',
 'PECTINIDAE' : 'Not available', # Droped. In worms as PECTINIDAE is a family.
 'PLUERONECTES PLATESSA' : 'Pleuronectes platessa',
 'Gaidropsarus argenteus' : 'Gaidropsarus argentatus', 
 }

In [38]:
#|eval: false
species_lut_df = get_maris_lut(df_biota=tfm.dfs['biota'], 
                                fname_cache='species_ospar.pkl', 
                                data_provider_name_col='Species',
                                maris_lut=species_lut_path,
                                maris_id='species_id',
                                maris_name='species',
                                unmatched_fixes=unmatched_fixes_biota_species,
                                as_dataframe=True,
                                overwrite=True)
# Show `maris_species_lut` where `match_type` is not a perfect match ( i.e. not equal 0).
species_lut_df[species_lut_df['match_score'] > 1]

  0%|          | 0/156 [00:00<?, ?it/s]

100%|██████████| 156/156 [00:28<00:00,  5.55it/s]


Unnamed: 0_level_0,matched_id,matched_maris_name,source_name,match_score
source_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
,719,Ensis,,4


For ``Species`` included ``unmatched_fixes_biota_species`` with a value of ``Not available`` or ``Species`` with a value of NaN use  ``Biological group`` variable for the ``Species``.

Return a list of unique 'Biological group' variables

In [39]:
tfm.dfs['biota']['Biological group'].unique()

array(['Molluscs', 'Seaweed', 'Fish', 'FISH', 'seaweed', 'SEAWEED',
       'molluscs', 'fish', 'MOLLUSCS'], dtype=object)

Update unmatched_fixes_biota_species to include the corrections for `Biological group`.

In [40]:
unmatched_fixes_biota_species.update({
# Biological group corrections
'Molluscs' : 'Mollusca',
'Seaweed' : 'Seaweed',
'Fish' : 'Pisces',
'FISH' : 'Pisces',
'seaweed' : 'Seaweed',
'SEAWEED' : 'Seaweed',
'molluscs' : 'Mollusca',
'fish' : 'Pisces',
'MOLLUSCS' : 'Mollusca' })

In [41]:
#|eval: false
species_lut_df = get_maris_lut(df_biota=tfm.dfs['biota'], 
                                fname_cache='species_ospar.pkl', 
                                data_provider_name_col='Species',
                                maris_lut=species_lut_path,
                                maris_id='species_id',
                                maris_name='species',
                                unmatched_fixes=unmatched_fixes_biota_species,
                                as_dataframe=True,
                                overwrite=True)
# Show `maris_species_lut` where `match_type` is not a perfect match ( i.e. not equal 0).
species_lut_df[species_lut_df['match_score'] > 1]

  0%|          | 0/156 [00:00<?, ?it/s]

100%|██████████| 156/156 [00:27<00:00,  5.59it/s]


Unnamed: 0_level_0,matched_id,matched_maris_name,source_name,match_score
source_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
,719,Ensis,,4


`get_maris_species` defines a partial function of `get_maris_lut`, with predefined arguments  for species lookup.

In [42]:
#| export
get_maris_species = partial(get_maris_lut, 
                fname_cache='species_ospar.pkl', 
                data_provider_name_col='Species',
                maris_lut=species_lut_path,
                maris_id='species_id',
                maris_name='species',
                unmatched_fixes=unmatched_fixes_biota_species,
                as_dataframe=False,
                overwrite=False)

`LookupBiotaSpeciesCB` applies the corrected `biota` `species` data obtained from the `get_maris_species` function to the `biota` dataframe in the dictionary of dataframes, `dfs`.

In [43]:
#| export
class LookupBiotaSpeciesCB(Callback):
    """
    Biota species remapped to MARIS db:
    """
    def __init__(self, fn_lut, unmatched_dict): fc.store_attr()
    def __call__(self, tfm):
        lut = self.fn_lut(tfm.dfs['biota']) 
        unmatched_fixes_biota_species=self.unmatched_dict
        # For ``Species`` included ``unmatched_fixes_biota_species`` with a value of ``Not available`` or ``Species`` with a value of NaN, replace ``Species`` variable with ``Biological group`` variable.  
        na_biota_species = [k for k,v in unmatched_fixes_biota_species.items() if v == 'Not available']
        tfm.dfs['biota']['Species'] = tfm.dfs['biota'].apply(lambda row: row.loc['Biological group'] if (row.loc['Species'] in na_biota_species or pd.isna(row.loc['Species'])) else row.loc['Species'], axis=1)
        # Correct unmatched 
        tfm.dfs['biota']['Species'] = tfm.dfs['biota']['Species'].apply(lambda x: unmatched_fixes_biota_species[x] if x in unmatched_fixes_biota_species.keys() else x)
        # Perform lookup 
        tfm.dfs['biota']['species'] = tfm.dfs['biota']['Species'].apply(lambda x: lut[x].matched_id)

Apply the transformer for callbacks `RemapRdnNameCB`, `ParseTimeCB`,  `NormalizeUncCB()` and `LookupBiotaSpeciesCB()`. Then, print the unique `species` for the `biota` dataframe.

In [44]:
#|eval: false
dfs = load_data(fname_in)
tfm = Transformer(dfs, cbs=[
                            RemapRdnNameCB(),
                            ParseTimeCB(),
                            EncodeTimeCB(cfg()),                             
                            NormalizeUncCB(),
                            LookupBiotaSpeciesCB(get_maris_species, unmatched_fixes_biota_species), 
                            CompareDfsAndTfm(dfs)
                            ])
tfm()

print(pd.DataFrame.from_dict(tfm.compare_stats) , '\n')

KeyError: 'FUCUS'

***

#### Lookup : Biota tissues

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;*NetCDF4 format variable: ``body_part``.*

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;*Open Refine format variable: `Body part`.*

##### Correct OSPAR `Body Part` labelled as `Whole`

In [None]:
#| export
whole_animal_plant = {'whole' : ['Whole','WHOLE', 'WHOLE FISH', 'Whole fisk', 'Whole fish'],
                      'Whole animal' : ['Molluscs','Fish','FISH','molluscs','fish','MOLLUSCS'],
                      'Whole plant' : ['Seaweed','seaweed','SEAWEED'] }

In [None]:
#| export
class CorrectWholeBodyPartCB(Callback):
    """
    Update bodypart labeled as 'whole' to either 'Whole animal' or 'Whole plant'.
    """
    
    def __init__(self, wap=whole_animal_plant): fc.store_attr()
    
    def __call__(self, tfm):        
        tfm.dfs['biota'] = self.correct_whole_body_part(tfm.dfs['biota'],self.wap)

    def correct_whole_body_part(self, df, wap):
        whole_list= wap['whole']
        animal_list = wap['Whole animal']
        plant_lst = wap['Whole plant']
        df['body_part'] = df['Body Part']   
        df['body_part'].loc[(df['body_part'].isin(whole_list)) & (df['Biological group'].isin(animal_list))] = 'Whole animal'
        df['body_part'].loc[(df['body_part'].isin(whole_list)) & (df['Biological group'].isin(plant_lst))] = 'Whole plant'
        return df

In [None]:
#|eval: false
dfs = load_data(fname_in)
tfm = Transformer(dfs, cbs=[
                            RemapRdnNameCB(),
                            ParseTimeCB(),
                            EncodeTimeCB(cfg()),                             
                            NormalizeUncCB(),
                            LookupBiotaSpeciesCB(get_maris_species, unmatched_fixes_biota_species),
                            CorrectWholeBodyPartCB(),
                            CompareDfsAndTfm(dfs)
                            ])
tfm()
print(pd.DataFrame.from_dict(tfm.compare_stats) , '\n')

print(tfm.dfs['biota']['body_part'].unique())

                                                    seawater  biota
Number of rows in dfs :                                18856  15314
Number of rows in tfm.dfs:                             18308  15314
Number of dropped rows:                                  548      0
Number of rows in tfm.dfs + Number of dropped r...     18856  15314 

['SOFT PARTS' 'GROWING TIPS' 'Whole plant' 'Whole animal' 'WHOLE ANIMAL'
 'FLESH WITHOUT BONES' 'WHOLE PLANT' 'Soft Parts' 'Whole without head'
 'Cod medallion' 'Muscle' 'Mix of muscle and whole fish without liver'
 'Flesh' 'FLESH WITHOUT BONE' 'UNKNOWN' 'FLESH' 'FLESH WITH SCALES' 'HEAD'
 'Flesh without bones' 'Soft parts' 'whole plant' 'LIVER' 'MUSCLE']


#### Lookup : Biota tissues

Return a DataFrame of  unmatched OSPAR ``body_part``( i.e. ``source_name``).

In [None]:
#|eval: false
unmatched_fixes_biota_tissues = {}
tissues_lut_df = get_maris_lut(df_biota=tfm.dfs['biota'], 
                                fname_cache='tissues_ospar.pkl', 
                                data_provider_name_col='body_part',
                                maris_lut=bodyparts_lut_path,
                                maris_id='bodypar_id',
                                maris_name='bodypar',
                                unmatched_fixes=unmatched_fixes_biota_tissues,
                                as_dataframe=True,
                                overwrite=True)
# List unmatched OSPAR tissues.
tissues_lut_df[tissues_lut_df['match_score'] >= 1]

  0%|          | 0/23 [00:00<?, ?it/s]

100%|██████████| 23/23 [00:00<00:00, 80.60it/s]


Unnamed: 0_level_0,matched_id,matched_maris_name,source_name,match_score
source_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
Mix of muscle and whole fish without liver,52,Flesh without bones,Mix of muscle and whole fish without liver,27
Whole without head,52,Flesh without bones,Whole without head,10
Cod medallion,8,Exoskeleton,Cod medallion,9
UNKNOWN,12,Skin,UNKNOWN,5
FLESH,42,Leaf,FLESH,3
Flesh,42,Leaf,Flesh,3
FLESH WITHOUT BONE,52,Flesh without bones,FLESH WITHOUT BONE,1


Read Maris tissue lut to assist with the creation of dictionary of unmatched tissues, ``unmatched_fixes_biota_tissues``.

In [None]:
#|eval: false
marisco_lut_df = pd.read_excel(bodyparts_lut_path())
marisco_lut_df

Unnamed: 0,bodypar_id,bodypar,bodycode,groupcode
0,-1,Not applicable,,
1,0,(Not available),0,0
2,1,Whole animal,WHOA,WHO
3,2,Whole animal eviscerated,WHOEV,WHO
4,3,Whole animal eviscerated without head,WHOHE,WHO
...,...,...,...,...
57,56,Growing tips,GTIP,PHAN
58,57,Upper parts of plants,UPPL,PHAN
59,58,Lower parts of plants,LWPL,PHAN
60,59,Shells/carapace,SHCA,SKEL


Create a dictionary of unmatched tissues to allow for  correction.

In [None]:
#|export
unmatched_fixes_biota_tissues = {
'Mix of muscle and whole fish without liver' : 'Not available', # Drop
 'UNKNOWN' : 'Not available',
 'Whole without head' : 'Whole animal eviscerated without head', # Drop? eviscerated? ,
 'Cod medallion' : 'Whole animal eviscerated without head',
 'FLESH' : 'Flesh without bones', 
 'Flesh' : 'Flesh without bones', 
 'FLESH WITHOUT BONE' : 'Flesh without bones'
}

Return a DataFrame of unmatched OSPAR ``body_part``.

In [None]:
#|eval: false
# For now ignore row in the dfs['biota]['body_part'] where corresponding unmatched_fixes_biota_tissues value is 'Not available' or na. . 
not_available_list=[k for k,v in unmatched_fixes_biota_tissues.items() if v == 'Not available']
tfm.dfs['biota'] = tfm.dfs['biota'][~tfm.dfs['biota']['body_part'].isin(not_available_list)]
tfm.dfs['biota']=tfm.dfs['biota'][tfm.dfs['biota']['body_part'].notna()]
                
tissues_lut_df = get_maris_lut(df_biota=tfm.dfs['biota'], 
                                fname_cache='tissues_ospar.pkl', 
                                data_provider_name_col='body_part',
                                maris_lut=bodyparts_lut_path,
                                maris_id='bodypar_id',
                                maris_name='bodypar',
                                unmatched_fixes=unmatched_fixes_biota_tissues,
                                as_dataframe=True,
                                overwrite=True)
# List unmatched OSPAR tissues.
tissues_lut_df[tissues_lut_df['match_score'] >= 1]

  0%|          | 0/21 [00:00<?, ?it/s]

100%|██████████| 21/21 [00:00<00:00, 78.94it/s]


Unnamed: 0_level_0,matched_id,matched_maris_name,source_name,match_score
source_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1


In [None]:
#| export
class LookupBiotaBodyPartCB(Callback):
    """
    Update bodypart id based on MARIS dbo_bodypar.xlsx:
        - 3: 'Whole animal eviscerated without head',
        - 12: 'Viscera',
        - 8: 'Skin'
    """
    def __init__(self, fn_lut, unmatched_fixes_biota_tissues): fc.store_attr()
    def __call__(self, tfm):
        lut = self.fn_lut(df_biota=tfm.dfs['biota'])      
        # Perform lookup. If nan or 'Not available' then let 'body_part' equal -1.
        not_available_list=[k for k,v in unmatched_fixes_biota_tissues.items() if v == 'Not available']
        tfm.dfs['biota']['body_part'] = tfm.dfs['biota']['body_part'].apply(lambda x: -1 if (pd.isna(x) or x in not_available_list) else lut[x].matched_id)

In [None]:
#|eval: false
get_maris_bodypart=partial(get_maris_lut, 
                            fname_cache='tissues_ospar.pkl', 
                            data_provider_name_col='body_part',
                            maris_lut=bodyparts_lut_path,
                            maris_id='bodypar_id',
                            maris_name='bodypar',
                            unmatched_fixes=unmatched_fixes_biota_tissues,
                            as_dataframe=False,
                            overwrite=False)
tissues_lut_df.head()

Unnamed: 0_level_0,matched_id,matched_maris_name,source_name,match_score
source_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
SOFT PARTS,19,Soft parts,SOFT PARTS,0
Flesh,52,Flesh without bones,Flesh,0
LIVER,25,Liver,LIVER,0
whole plant,40,Whole plant,whole plant,0
Soft parts,19,Soft parts,Soft parts,0


Apply the transformer for callbacks `RemapRdnNameCB`, `ParseTimeCB`,  `NormalizeUncCB()`, `LookupBiotaSpeciesCB(get_maris_species)` and `LookupBiotaBodyPartCB()`. Then, print the `Body Part` and `body_part` for the `biota` dataframe.

In [None]:
#|eval: false
dfs = load_data(fname_in)
tfm = Transformer(dfs, cbs=[
                            RemapRdnNameCB(),
                            ParseTimeCB(),
                            EncodeTimeCB(cfg()),                             
                            NormalizeUncCB(),
                            LookupBiotaSpeciesCB(get_maris_species, unmatched_fixes_biota_species),
                            CorrectWholeBodyPartCB(),
                            LookupBiotaBodyPartCB(get_maris_bodypart, unmatched_fixes_biota_tissues),
                        	CompareDfsAndTfm(dfs)
                            ])
tfm()
print(pd.DataFrame.from_dict(tfm.compare_stats) , '\n')
print(tfm.dfs['biota']['body_part'].unique())

                                                    seawater  biota
Number of rows in dfs :                                18856  15314
Number of rows in tfm.dfs:                             18308  15314
Number of dropped rows:                                  548      0
Number of rows in tfm.dfs + Number of dropped r...     18856  15314 

[19 56 40  1 52  3 34 -1 60 13 25]


***

#### Lookup : Biogroup

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;*NetCDF4 format variable: ``bio_group``.*

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;*Open Refine format variable: Biogroup is not included.*

`get_biogroup_lut` reads the file at `species_lut_path()` and from the contents of this file creates a dictionary linking `species_id` to `biogroup_id`.

In [None]:
#| export
def get_biogroup_lut(maris_lut):
    species = pd.read_excel(maris_lut)
    return species[['species_id', 'biogroup_id']].set_index('species_id').to_dict()['biogroup_id']

`LookupBiogroupCB` applies the corrected `biota` `bio group` data obtained from the `get_maris_lut` function to the `biota` dataframe in the dictionary of dataframes, `dfs`.

In [None]:

#| export
class LookupBiogroupCB(Callback):
    """
    Update biogroup id  based on MARIS dbo_species.xlsx
    """
    def __init__(self, fn_lut): fc.store_attr()
    def __call__(self, tfm):
        lut = self.fn_lut()
        tfm.dfs['biota']['bio_group'] = tfm.dfs['biota']['species'].apply(lambda x: lut[x])

In [None]:
#|eval: false
dfs = load_data(fname_in)
tfm = Transformer(dfs, cbs=[
                            RemapRdnNameCB(),
                            ParseTimeCB(),
                            EncodeTimeCB(cfg()),                             
                            NormalizeUncCB(),
                            LookupBiotaSpeciesCB(get_maris_species, unmatched_fixes_biota_species),
                            CorrectWholeBodyPartCB(),
                            LookupBiotaBodyPartCB(get_maris_bodypart, unmatched_fixes_biota_tissues),
                            LookupBiogroupCB(partial(get_biogroup_lut, species_lut_path())),
                            CompareDfsAndTfm(dfs)
                            ])
tfm()
print(pd.DataFrame.from_dict(tfm.compare_stats) , '\n')


                                                    seawater  biota
Number of rows in dfs :                                18856  15314
Number of rows in tfm.dfs:                             18308  15314
Number of dropped rows:                                  548      0
Number of rows in tfm.dfs + Number of dropped r...     18856  15314 



***

#### Lookup : Units

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;*NetCDF4 format variable: ``unit``.*

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;*Open Refine format variable: ``Unit``.*

In [None]:
#|eval: false
tfm.dfs['seawater']['Unit'].unique()

array(['Bq/l', nan, 'Bq/L', 'BQ/L'], dtype=object)

In [None]:
#|eval: false
tfm.dfs['biota'][ 'Unit'].unique()

array(['Bq/kg f.w.', 'Bq/kg.fw', 'Bq/kg fw', 'Bq/kg f.w'], dtype=object)

In [None]:
#|eval: false
units_df=pd.read_excel(unit_lut_path())
units_df

Unnamed: 0,unit_id,unit,unit_sanitized,ordlist,Unnamed: 4,Unnamed: 5,Unnamed: 6
0,-1,Not applicable,Not applicable,,,,
1,0,NOT AVAILABLE,NOT AVAILABLE,0.0,,,
2,1,Bq/m3,Bq per m3,1.0,Bq/m3,,Bq/m<sup>3</sup>
3,2,Bq/m2,Bq per m2,2.0,,,
4,3,Bq/kg,Bq per kg,3.0,,,
5,4,Bq/kgd,Bq per kgd,4.0,,,
6,5,Bq/kgw,Bq per kgw,5.0,,,
7,6,kg/kg,kg per kg,6.0,,,
8,7,TU,TU,7.0,,,
9,8,DELTA/mill,DELTA per mill,8.0,,,


In [None]:
#| export
# Define unit names renaming rules
renaming_unit_rules = {'Bq/l': 1, #'Bq/m3'
                       'Bq/L': 1,
                       'BQ/L': 1,
                       'Bq/kg f.w.': 5, # Bq/kgw
                       'Bq/kg.fw' : 5,
                       'Bq/kg fw' : 5,
                       'Bq/kg f.w' : 5 
                       } 

**Niall's Comment:** Assign a value of 1 to all 'unit' entries for 'seawater' (i.e., Bq/m3). Note that many rows for OSPAR 'seawater' are missing 'Unit' data. Additionally, the density of seawater (approximately 1.03 g/cm3) was not considered when converting units.

In [None]:
#| export
class LookupUnitCB(Callback):
    def __init__(self,
                 lut=renaming_unit_rules):
        fc.store_attr()
    def __call__(self, tfm):
        for grp in tfm.dfs.keys():
            # For seawater replace nan with 'Bq/l'. 
            if grp == 'seawater':
                tfm.dfs[grp]['Unit'] = tfm.dfs[grp]['Unit'].fillna(value='Bq/l')
            # Perform lookup  
            tfm.dfs[grp]['unit'] = tfm.dfs[grp]['Unit'].apply(lambda x: self.lut[x])

In [None]:
#|eval: false
dfs = load_data(fname_in)
tfm = Transformer(dfs, cbs=[
                            RemapRdnNameCB(),
                            ParseTimeCB(),
                            EncodeTimeCB(cfg()),                             
                            NormalizeUncCB(),
                            LookupBiotaSpeciesCB(get_maris_species, unmatched_fixes_biota_species),
                            CorrectWholeBodyPartCB(),
                            LookupBiotaBodyPartCB(get_maris_bodypart, unmatched_fixes_biota_tissues),
                            LookupBiogroupCB(partial(get_biogroup_lut, species_lut_path())),
                            LookupUnitCB(renaming_unit_rules),
                            CompareDfsAndTfm(dfs)
                            ])
tfm()
print(pd.DataFrame.from_dict(tfm.compare_stats) , '\n')


                                                    seawater  biota
Number of rows in dfs :                                18856  15314
Number of rows in tfm.dfs:                             18308  15314
Number of dropped rows:                                  548      0
Number of rows in tfm.dfs + Number of dropped r...     18856  15314 



***

#### Lookup : Detection limit or Value type

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;*NetCDF4 format variable: ``detection_limit``.*

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;*Open Refine foramt variable: ``Value type``.*

`get_detectionlimit_lut` reads the file at `detection_limit_lut_path()` and from the contents of this file creates a dictionary linking `name` to `id`.
| id | name | name_sanitized |
| :-: | :-: | :-: |
|-1|Not applicable|Not applicable|
|0|Not Available|Not available|
|1|=|Detected value|
|2|<|Detection limit|
|3|ND|Not detected|
|4|DE|Derived|

In [None]:
#| export 
def get_detectionlimit_lut():
    df = pd.read_excel(detection_limit_lut_path(), usecols=['name','id'])
    return df.set_index('name').to_dict()['id']

In [None]:
# | export
class LookupDetectionLimitCB(Callback):
    "Remamp activity value, activity uncertainty and detection limit to MARIS format."
    def __init__(self , lut):
        fc.store_attr()

    def __call__(self, tfm):
        lut = self.lut()        
        for grp in tfm.dfs.keys():
            # Copy 'Value type' col 
            tfm.dfs[grp]['detection_limit'] = tfm.dfs[grp]['Value type']
            # Fill nan values with 'Not Available'.
            tfm.dfs[grp]['detection_limit'] = tfm.dfs[grp]['detection_limit'].fillna('Not Available') 
            # Fill values that are not in the lut (e.g. >) with 0
            tfm.dfs[grp].loc[~tfm.dfs[grp]["detection_limit"].isin(list(lut.keys())), "detection_limit"] = 'Not Available'
            # Apply rules.
            # Fill values with '=' if both a value and uncertainty are not nan and detection_limit is 0.
            condition = ((tfm.dfs[grp]['Activity or MDA'].notna()) & (tfm.dfs[grp]['Uncertainty'].notna())) & (tfm.dfs[grp]["detection_limit"] == 'Not Available' )
            tfm.dfs[grp].loc[condition, 'detection_limit']= '='
            # Perform lookup
            tfm.dfs[grp]['detection_limit'] = tfm.dfs[grp]['detection_limit'].apply(lambda x: lut[x])

Apply the transformer for callbacks `RemapRdnNameCB`, `ParseTimeCB`,  `NormalizeUncCB()`, `LookupBiotaSpeciesCB(get_maris_species)`, `LookupBiotaBodyPartCB(get_maris_bodypart)`, `LookupSedimentCB(get_maris_sediments)`, `LookupBiogroupCB(partial(get_biogroup_lut, species_lut_path())`, `LookupUnitCB()` and `LookupDetectionLimitCB`. Then, print the unique `detection_limit` for the `seawater` dataframe.

In [None]:
#|eval: false
dfs = load_data(fname_in)
tfm = Transformer(dfs, cbs=[
                            RemapRdnNameCB(),
                            ParseTimeCB(),
                            EncodeTimeCB(cfg()),                             
                            NormalizeUncCB(),
                            LookupBiotaSpeciesCB(get_maris_species, unmatched_fixes_biota_species),
                            CorrectWholeBodyPartCB(),
                            LookupBiotaBodyPartCB(get_maris_bodypart, unmatched_fixes_biota_tissues),
                            LookupBiogroupCB(partial(get_biogroup_lut, species_lut_path())),
                            LookupUnitCB(renaming_unit_rules),
                            LookupDetectionLimitCB(get_detectionlimit_lut),
                            CompareDfsAndTfm(dfs)
                            ])
tfm()
print(pd.DataFrame.from_dict(tfm.compare_stats) , '\n')

                                                    seawater  biota
Number of rows in dfs :                                18856  15314
Number of rows in tfm.dfs:                             18308  15314
Number of dropped rows:                                  548      0
Number of rows in tfm.dfs + Number of dropped r...     18856  15314 



***

#### Lookup : Method

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; *NetCDF4 format variables: ``counting_method``, ``sampling_method`` and ``preparation_method``.*

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;*Open Refine format variable: ``Sampling method``,	``Preparation method`` and ``Counting method``.*

> 'Method' is not provided in the OSPAR data.

***

### Data provider sample id

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;*NetCDF4 format variable: ``data_provider_sample_id``*

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;*Open Refine format variable: ``Sample ID``*

>  MARIS NetCDF4 format for variable type ``data_provider_sample_id`` does not support vlen strings.

In [None]:
# | export
class RemapDataProviderSampleIdCB(Callback):
    "Remap key to MARIS data_provider_sample_id format."
    def __init__(self):
        fc.store_attr()

    def __call__(self, tfm):
        for grp in tfm.dfs.keys():
            # data_provider_sample_id
            tfm.dfs[grp]['data_provider_sample_id'] = tfm.dfs[grp]['ID']


In [None]:
#|eval: false
dfs = load_data(fname_in)
tfm = Transformer(dfs, cbs=[
                            RemapRdnNameCB(),
                            ParseTimeCB(),
                            EncodeTimeCB(cfg()),                             
                            NormalizeUncCB(),
                            LookupBiotaSpeciesCB(get_maris_species, unmatched_fixes_biota_species),
                            CorrectWholeBodyPartCB(),
                            LookupBiotaBodyPartCB(get_maris_bodypart, unmatched_fixes_biota_tissues),
                            LookupBiogroupCB(partial(get_biogroup_lut, species_lut_path())),
                            LookupUnitCB(renaming_unit_rules),
                            LookupDetectionLimitCB(get_detectionlimit_lut),
                            RemapDataProviderSampleIdCB(),
                            CompareDfsAndTfm(dfs)
                            ])
tfm()
print(pd.DataFrame.from_dict(tfm.compare_stats) , '\n')

                                                    seawater  biota
Number of rows in dfs :                                18856  15314
Number of rows in tfm.dfs:                             18308  15314
Number of dropped rows:                                  548      0
Number of rows in tfm.dfs + Number of dropped r...     18856  15314 



***

### Filtered

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;*NetCDF4 format variable: ``filtered``*

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;*Open Refine format variable: ``Filtered``*

> 'Filtered' is not provided in the OSPAR data.

***

#### ~~Lookup : Area~~

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;*NetCDF4 format variable: ``area``.*

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;*Open Refine format variable: Area is not included*

TODO : Write callback for area. Will I use the marineregions.org API to complete lookup? Or use 'RSC Sub-division'.

***

### Sample Notes

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;*NetCDF4 format variable: ``sample_notes``.*

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;*Open Refine format variable: ``Sample notes
``*

In [None]:
# | export
class RemapSampleNotesIdCB(Callback):
    "Remap 'Sample Comment' to MARIS sample_notes format."
    def __init__(self):
        fc.store_attr()

    def __call__(self, tfm):
        for grp in tfm.dfs.keys():
            # sample_notes
            tfm.dfs[grp]['sample_notes'] = tfm.dfs[grp]['Sample Comment']


In [None]:
#|eval: false
dfs = load_data(fname_in)
tfm = Transformer(dfs, cbs=[
                            RemapRdnNameCB(),
                            ParseTimeCB(),
                            EncodeTimeCB(cfg()),                             
                            NormalizeUncCB(),
                            LookupBiotaSpeciesCB(get_maris_species, unmatched_fixes_biota_species),
                            CorrectWholeBodyPartCB(),
                            LookupBiotaBodyPartCB(get_maris_bodypart, unmatched_fixes_biota_tissues),
                            LookupBiogroupCB(partial(get_biogroup_lut, species_lut_path())),
                            LookupUnitCB(renaming_unit_rules),
                            LookupDetectionLimitCB(get_detectionlimit_lut),
                            RemapDataProviderSampleIdCB(),
                            RemapSampleNotesIdCB(),
                            CompareDfsAndTfm(dfs)
                            ])
tfm()
print(pd.DataFrame.from_dict(tfm.compare_stats) , '\n')

                                                    seawater  biota
Number of rows in dfs :                                18856  15314
Number of rows in tfm.dfs:                             18308  15314
Number of dropped rows:                                  548      0
Number of rows in tfm.dfs + Number of dropped r...     18856  15314 



***

### Measurement Notes

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;*NetCDF4 format variable: ``measurement_notes``.*

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;*Open Refine format variable: ``Measurement notes``*

In [None]:
# | export
class RemapMeasurementNotesIdCB(Callback):
    "Remap 'Measurement Comment' to MARIS measurement_notes format."
    def __init__(self):
        fc.store_attr()

    def __call__(self, tfm):
        for grp in tfm.dfs.keys():
            # measurement_notes
            tfm.dfs[grp]['measurement_notes'] = tfm.dfs[grp]['Measurement Comment']


In [None]:
#|eval: false
dfs = load_data(fname_in)
tfm = Transformer(dfs, cbs=[
                            RemapRdnNameCB(),
                            ParseTimeCB(),
                            EncodeTimeCB(cfg()),                             
                            NormalizeUncCB(),
                            LookupBiotaSpeciesCB(get_maris_species, unmatched_fixes_biota_species),
                            CorrectWholeBodyPartCB(),
                            LookupBiotaBodyPartCB(get_maris_bodypart, unmatched_fixes_biota_tissues),
                            LookupBiogroupCB(partial(get_biogroup_lut, species_lut_path())),
                            LookupUnitCB(renaming_unit_rules),
                            LookupDetectionLimitCB(get_detectionlimit_lut),
                            RemapDataProviderSampleIdCB(),
                            RemapSampleNotesIdCB(),
                            RemapMeasurementNotesIdCB(),
                            CompareDfsAndTfm(dfs)
                            ])
tfm()
print(pd.DataFrame.from_dict(tfm.compare_stats) , '\n')

                                                    seawater  biota
Number of rows in dfs :                                18856  15314
Number of rows in tfm.dfs:                             18308  15314
Number of dropped rows:                                  548      0
Number of rows in tfm.dfs + Number of dropped r...     18856  15314 



***

### Station ID 

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;*NetCDF4 format variable: Station ID is not included.*

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;*Open Refine format variable: ``Station ID``*

>  MARIS NetCDF4 format does not include Station ID.

In [None]:
# | export
class RemapStationIdCB(Callback):
    "Remap Station ID to MARIS format."
    def __init__(self):
        fc.store_attr()

    def __call__(self, tfm):
        for grp in tfm.dfs.keys():
            tfm.dfs[grp]['station_id'] = tfm.dfs[grp]['Station ID']


In [None]:
#|eval: false
dfs = load_data(fname_in)
tfm = Transformer(dfs, cbs=[
                            RemapRdnNameCB(),
                            ParseTimeCB(),
                            EncodeTimeCB(cfg()),                             
                            NormalizeUncCB(),
                            LookupBiotaSpeciesCB(get_maris_species, unmatched_fixes_biota_species),
                            CorrectWholeBodyPartCB(),
                            LookupBiotaBodyPartCB(get_maris_bodypart, unmatched_fixes_biota_tissues),
                            LookupBiogroupCB(partial(get_biogroup_lut, species_lut_path())),
                            LookupUnitCB(renaming_unit_rules),
                            LookupDetectionLimitCB(get_detectionlimit_lut),
                            RemapDataProviderSampleIdCB(),
                            RemapSampleNotesIdCB(),
                            RemapMeasurementNotesIdCB(),
                            RemapStationIdCB(),
                            CompareDfsAndTfm(dfs)
                            ])
tfm()
print(pd.DataFrame.from_dict(tfm.compare_stats) , '\n')

                                                    seawater  biota
Number of rows in dfs :                                18856  15314
Number of rows in tfm.dfs:                             18308  15314
Number of dropped rows:                                  548      0
Number of rows in tfm.dfs + Number of dropped r...     18856  15314 



***

### Profile ID, Transect ID or Sequence ID

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;*NetCDF4 format variable: Profile ID, Transect ID or Sequence ID is not included.*

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;*Open Refine format variable: ``Profile or transect ID``*

> 'Profile ID', 'Transect ID' or 'Sequence ID' is not provided in the OSPAR data. Remove from handler.

***

### Dry to wet ratio

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;*NetCDF4 format variable: DW% is not included.*

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;*Open Refine format variables: ``Dry/wet ratio``.*

> 'Dry to wet ratio' is not provided in the OSPAR data. Remove from handler.

***

### Capture Coordinates

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;*NetCDF4 format variables: ``lon``  and ``lat``*

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;*Open Refine format variables: ``Longitude decimal`` and ``Latitude decimal``.*

In [None]:
# | export
class FormatCoordinates(Callback):
    "Convert Longitude and Latitude values to DDD.DDDDD°"
    def __init__(self):
        fc.store_attr()

    def __call__(self, tfm):
        for grp in tfm.dfs.keys():
            tfm.dfs[grp]['lat'] = np.where(tfm.dfs[grp]['LatDir'].isin(['S']), ((tfm.dfs[grp]['LatD'] + tfm.dfs[grp]['LatM']/60 + tfm.dfs[grp]['LatS'] /(60*60))* (-1)), (tfm.dfs[grp]['LatD'] + tfm.dfs[grp]['LatM']/60 + tfm.dfs[grp]['LatS'] /(60*60)))
            tfm.dfs[grp]['lon'] = np.where(tfm.dfs[grp]['LongDir'].isin(['W']), ((tfm.dfs[grp]['LongD'] + tfm.dfs[grp]['LongM']/60 + tfm.dfs[grp]['LongS'] /(60*60))* (-1)), (tfm.dfs[grp]['LongD'] + tfm.dfs[grp]['LongM']/60 + tfm.dfs[grp]['LongS'] /(60*60)))

In [None]:
#|eval: false
dfs = load_data(fname_in)
tfm = Transformer(dfs, cbs=[
                            RemapRdnNameCB(),
                            ParseTimeCB(),
                            EncodeTimeCB(cfg()),                             
                            NormalizeUncCB(),
                            LookupBiotaSpeciesCB(get_maris_species, unmatched_fixes_biota_species),
                            CorrectWholeBodyPartCB(),
                            LookupBiotaBodyPartCB(get_maris_bodypart, unmatched_fixes_biota_tissues),
                            LookupBiogroupCB(partial(get_biogroup_lut, species_lut_path())),
                            LookupUnitCB(renaming_unit_rules),
                            LookupDetectionLimitCB(get_detectionlimit_lut),
                            RemapDataProviderSampleIdCB(),
                            RemapSampleNotesIdCB(),
                            RemapMeasurementNotesIdCB(),
                            RemapStationIdCB(),
                            FormatCoordinates(),
                            CompareDfsAndTfm(dfs)
                            ])
tfm()
print(pd.DataFrame.from_dict(tfm.compare_stats) , '\n')

                                                    seawater  biota
Number of rows in dfs :                                18856  15314
Number of rows in tfm.dfs:                             18308  15314
Number of dropped rows:                                  548      0
Number of rows in tfm.dfs + Number of dropped r...     18856  15314 



***

### Sanitize coordinates

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;*NetCDF4 format variables: ``lon``  and ``lat``*

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;*Open Refine format variables: ``Longitude decimal`` and ``Latitude decimal``.*

Sanitize coordinates drops a row when both longitude & latitude equal 0 or data contains unrealistic longitude & latitude values. Converts longitude & latitude `,` separator to `.` separator."

In [None]:
#|eval: false
dfs = load_data(fname_in)
tfm = Transformer(dfs, cbs=[
                            RemapRdnNameCB(),
                            ParseTimeCB(),
                            EncodeTimeCB(cfg()),                             
                            NormalizeUncCB(),
                            LookupBiotaSpeciesCB(get_maris_species, unmatched_fixes_biota_species),
                            CorrectWholeBodyPartCB(),
                            LookupBiotaBodyPartCB(get_maris_bodypart, unmatched_fixes_biota_tissues),
                            LookupBiogroupCB(partial(get_biogroup_lut, species_lut_path())),
                            LookupUnitCB(renaming_unit_rules),
                            LookupDetectionLimitCB(get_detectionlimit_lut),
                            RemapDataProviderSampleIdCB(),
                            RemapSampleNotesIdCB(),
                            RemapMeasurementNotesIdCB(),
                            RemapStationIdCB(),
                            FormatCoordinates(),
                            SanitizeLonLatCB(),
                            CompareDfsAndTfm(dfs)
                            ])
tfm()
print(pd.DataFrame.from_dict(tfm.compare_stats) , '\n')

                                                    seawater  biota
Number of rows in dfs :                                18856  15314
Number of rows in tfm.dfs:                             18308  15314
Number of dropped rows:                                  548      0
Number of rows in tfm.dfs + Number of dropped r...     18856  15314 



***

### Sanitize value

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;*NetCDF4 format variable: ``value``.*

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;*Open Refine format variables: ``Activity or MDA``.*

In [None]:
# | export
class SanitizeValue(Callback):
    "Sanitize value. Remove blank entries."
    def __init__(self):
        fc.store_attr()

    def __call__(self, tfm):
        for grp in tfm.dfs.keys():
            # Keep rows where value (i.e. 'Activity or MDA') is not 'nan'.
            tfm.dfs[grp] = tfm.dfs[grp][tfm.dfs[grp][[ 'Activity or MDA']].notna()]


In [None]:
#|eval: false
dfs = load_data(fname_in)
tfm = Transformer(dfs, cbs=[
                            RemapRdnNameCB(),
                            ParseTimeCB(),
                            EncodeTimeCB(cfg()),                             
                            NormalizeUncCB(),
                            LookupBiotaSpeciesCB(get_maris_species, unmatched_fixes_biota_species),
                            CorrectWholeBodyPartCB(),
                            LookupBiotaBodyPartCB(get_maris_bodypart, unmatched_fixes_biota_tissues),
                            LookupBiogroupCB(partial(get_biogroup_lut, species_lut_path())),
                            LookupUnitCB(renaming_unit_rules),
                            LookupDetectionLimitCB(get_detectionlimit_lut),
                            RemapDataProviderSampleIdCB(),
                            RemapSampleNotesIdCB(),
                            RemapMeasurementNotesIdCB(),
                            RemapStationIdCB(),
                            FormatCoordinates(),
                            SanitizeLonLatCB(),
                            SanitizeValue(),
                            CompareDfsAndTfm(dfs)
                            ])
tfm()
print(pd.DataFrame.from_dict(tfm.compare_stats) , '\n')

                                                    seawater  biota
Number of rows in dfs :                                18856  15314
Number of rows in tfm.dfs:                             18308  15314
Number of dropped rows:                                  548      0
Number of rows in tfm.dfs + Number of dropped r...     18856  15314 



***

### Review DFS and TFM data

In [None]:
#|eval: false
dfs = load_data(fname_in)
tfm = Transformer(dfs, cbs=[
                            RemapRdnNameCB(),
                            ParseTimeCB(),
                            EncodeTimeCB(cfg()),                             
                            NormalizeUncCB(),
                            LookupBiotaSpeciesCB(get_maris_species, unmatched_fixes_biota_species),
                            CorrectWholeBodyPartCB(),
                            LookupBiotaBodyPartCB(get_maris_bodypart, unmatched_fixes_biota_tissues),
                            LookupBiogroupCB(partial(get_biogroup_lut, species_lut_path())),
                            LookupUnitCB(renaming_unit_rules),
                            LookupDetectionLimitCB(get_detectionlimit_lut),
                            RemapDataProviderSampleIdCB(),
                            RemapSampleNotesIdCB(),
                            RemapMeasurementNotesIdCB(),
                            RemapStationIdCB(),
                            FormatCoordinates(),
                            SanitizeLonLatCB(),
                            SanitizeValue(),
                            CompareDfsAndTfm(dfs)
                            ])
tfm()
print(pd.DataFrame.from_dict(tfm.compare_stats) , '\n')

                                                    seawater  biota
Number of rows in dfs :                                18856  15314
Number of rows in tfm.dfs:                             18308  15314
Number of dropped rows:                                  548      0
Number of rows in tfm.dfs + Number of dropped r...     18856  15314 



In [None]:
seawater_review=tfm.dfs_dropped['seawater']
biota_review=tfm.dfs_dropped['biota']

In [None]:
seawater_review.head()

Unnamed: 0,ID,Contracting Party,RSC Sub-division,Station ID,Sample ID,LatD,LatM,LatS,LatDir,LongD,...,Sampling date,Nuclide,Value type,Activity or MDA,Uncertainty,Unit,Data provider,Measurement Comment,Sample Comment,Reference Comment
16799,97147,,,,,,,,,,...,,,,,,,,,,
16800,97148,,,,,,,,,,...,,,,,,,,,,
16801,97149,,,,,,,,,,...,,,,,,,,,,
16802,97150,,,,,,,,,,...,,,,,,,,,,
16803,97151,,,,,,,,,,...,,,,,,,,,,


***

### Columns of interest and rename for NetCDF

> Column names are standardized to MARIS NetCDF format (i.e. PEP8 ). 

In [None]:
tfm.dfs['biota'].columns

Index(['ID', 'Contracting Party', 'RSC Sub-division', 'Station ID',
       'Sample ID', 'LatD', 'LatM', 'LatS', 'LatDir', 'LongD', 'LongM',
       'LongS', 'LongDir', 'Sample type', 'Biological group', 'Species',
       'Body Part', 'Sampling date', 'Nuclide', 'Value type',
       'Activity or MDA', 'Uncertainty', 'Unit', 'Data provider',
       'Measurement Comment', 'Sample Comment', 'Reference Comment', 'time',
       'species', 'body_part', 'bio_group', 'unit', 'detection_limit',
       'data_provider_sample_id', 'sample_notes', 'measurement_notes',
       'station_id', 'lat', 'lon'],
      dtype='object')

In [None]:
#| export
# Define columns of interest (keys) and renaming rules (values).
def get_renaming_rules_netcdf():
    vars = cdl_cfg()['vars']
    return {('seawater','biota', 'sediment') : {    
                                                        ## DEFAULT
                                                        'lat' : vars['defaults']['lat']['name'] ,
                                                        'lon' : vars['defaults']['lon']['name'] ,
                                                        'time' : vars['defaults']['time']['name'],
                                                        'Nuclide' : 'nuclide',
                                                        'unit' : vars['suffixes']['unit']['name'],
                                                        #'station_id' : 'data_provider_station_id',
                                                        #'data_provider_sample_id' : vars['defaults']['data_provider_sample_id']['name'],
                                                        #'profile_or_transect_id' : 'profile_id',
                                                        'detection_limit' : vars['suffixes']['detection_limit']['name']
                                                        #'Sampling method' : 'sampling_method'
                                                        #'Preparation method' : 'preparation_method'
                                                        #'Counting method' : 'counting_method'
                                                        #'Sample notes' : 'sample_notes'
                                                        #'Measurement notes' : 'measurement_notes'
                                                    },
                  ('seawater',) : {
                                ## SEAWATER
                                'Activity or MDA': 'value',
                                'Uncertainty': vars['suffixes']['uncertainty']['name'],
                                #'TDEPTH': vars['defaults']['tot_depth']['name'],
                                'Sampling depth': vars['defaults']['smp_depth']['name'],
                                #'SALIN' : vars['suffixes']['salinity']['name'],
                                #'TTEMP' : vars['suffixes']['temperature']['name'],
                                #'FILT' : vars['suffixes']['filtered']['name']
                                },
                  ('biota',) : { 
                                ## BIOTA
                                'Activity or MDA': 'value',
                                'Uncertainty' : vars['suffixes']['uncertainty']['name'],
                                'species' : vars['bio']['species']['name'],
                                'body_part' : vars['bio']['body_part']['name'],
                                'bio_group' : vars['bio']['bio_group']['name'],
                                #'SDEPTH' : vars['defaults']['smp_depth']['name'],
                                #'DW%' : 'dry_wet_ratio'
                                #'Drying Method' : drying_method
                                
                                }
                    }

Open Refine data format includes additional data that is not available in NetCDF format. Here we select columns of interest for Open Refine and standardize columns names to MARIS NetCDF format. 

> For Open Refine CSV column names are standardized to MARIS NetCDF format. 

In [None]:
#| export
# Define columns of interest (keys) and renaming rules (values).
def get_renaming_rules_openrefine():
    vars = cdl_cfg()['vars']
    return {('seawater','biota', 'sediment') : {    
                                                        ## DEFAULT
                                                        'lat' : vars['defaults']['lat']['name'] ,
                                                        'lon' : vars['defaults']['lon']['name'] ,
                                                        'time' : vars['defaults']['time']['name'],
                                                        'Nuclide' : 'nuclide',
                                                        'unit' : vars['suffixes']['unit']['name'],
                                                        'station_id' : 'data_provider_station_id',
                                                        'data_provider_sample_id' : vars['defaults']['data_provider_sample_id']['name'],
                                                        #'profile_or_transect_id' : 'profile_id',
                                                        'detection_limit' : vars['suffixes']['detection_limit']['name'],
                                                        #'Sampling method' : 'sampling_method'
                                                        #'Preparation method' : 'preparation_method'
                                                        #'Counting method' : 'counting_method'
                                                        'sample_notes' : 'sample_notes',
                                                        'measurement_notes' : 'measurement_notes',
                                                    },
                  ('seawater',) : {
                                ## SEAWATER
                                'Activity or MDA': 'value',
                                'Uncertainty': vars['suffixes']['uncertainty']['name'],
                                #'TDEPTH': vars['defaults']['tot_depth']['name'],
                                'Sampling depth': vars['defaults']['smp_depth']['name'],
                                #'SALIN' : vars['suffixes']['salinity']['name'],
                                #'TTEMP' : vars['suffixes']['temperature']['name'],
                                #'FILT' : vars['suffixes']['filtered']['name']
                                },
                  ('biota',) : { 
                                ## BIOTA
                                'Activity or MDA': 'value',
                                'Uncertainty' : vars['suffixes']['uncertainty']['name'],
                                'species' : vars['bio']['species']['name'],
                                'body_part' : vars['bio']['body_part']['name'],
                                'bio_group' : vars['bio']['bio_group']['name'],
                                #'SDEPTH' : vars['defaults']['smp_depth']['name'],
                                #'DW%' : 'dry_wet_ratio'
                                #'Drying Method' : drying_method
                                
                                }
                    }

In [None]:

#| export
class SelectAndRenameColumnCB(Callback):
    def __init__(self,
                 fn_renaming_rules,
                ):
        fc.store_attr()
    def __call__(self, tfm):
        renaming = self.fn_renaming_rules()
        for grp in tfm.dfs.keys():            
            # get columns related to the grp (e.g. 'biota').
            coi = [v for k, v in renaming.items() if grp in k]
            # Join cols of interest
            coi_rename = {}
            for d in coi:
                for k, v in d.items(): 
                    coi_rename[k]=v
            # list cols
            cols = list(coi_rename.keys()) 
            # select cols in df 
            tfm.dfs[grp] = tfm.dfs[grp].loc[:, cols]
            # Rename cols
            tfm.dfs[grp].rename(columns=coi_rename, inplace=True)
            

In [None]:
#|eval: false
dfs = load_data(fname_in)
tfm = Transformer(dfs, cbs=[
                            RemapRdnNameCB(),
                            ParseTimeCB(),
                            EncodeTimeCB(cfg()),                             
                            NormalizeUncCB(),
                            LookupBiotaSpeciesCB(get_maris_species, unmatched_fixes_biota_species),
                            CorrectWholeBodyPartCB(),
                            LookupBiotaBodyPartCB(get_maris_bodypart, unmatched_fixes_biota_tissues),
                            LookupBiogroupCB(partial(get_biogroup_lut, species_lut_path())),
                            LookupUnitCB(renaming_unit_rules),
                            LookupDetectionLimitCB(get_detectionlimit_lut),
                            RemapDataProviderSampleIdCB(),
                            RemapSampleNotesIdCB(),
                            RemapMeasurementNotesIdCB(),
                            RemapStationIdCB(),
                            FormatCoordinates(),
                            SanitizeLonLatCB(),
                            SanitizeValue(),
                            SelectAndRenameColumnCB(get_renaming_rules_netcdf),
                            CompareDfsAndTfm(dfs)
                            ])
tfm()
print(pd.DataFrame.from_dict(tfm.compare_stats) , '\n')

                                                    seawater  biota
Number of rows in dfs :                                18856  15314
Number of rows in tfm.dfs:                             18308  15314
Number of dropped rows:                                  548      0
Number of rows in tfm.dfs + Number of dropped r...     18856  15314 



In [None]:
tfm.dfs['biota']

Unnamed: 0,lat,lon,time,nuclide,_unit,_dl,value,_unc,species,body_part,bio_group
0,,,,,,,0.3510,,,,
1,,,,,,,39.0000,,,,
2,,,,,,,0.0938,,,,
3,,,,,,,1.5400,,,,
4,,,,,,,16.0000,,,,
...,...,...,...,...,...,...,...,...,...,...,...
15309,,,,,,,838.0000,,,,
15310,,,,,,,0.0180,,,,
15311,,,,,,,0.2100,,,,
15312,,,,,,,0.5600,,,,


***

### Reshape: long to wide

Convert data from long to wide and rename columns to comply with NetCDF format.

In [None]:
#| export
class ReshapeLongToWide(Callback):
    "Convert data from long to wide with renamed columns."
    def __init__(self, columns=['nuclide'], values=['value']):
        fc.store_attr()
        # Retrieve all possible derived vars (e.g 'unc', 'dl', ...) from configs
        self.derived_cols = [value['name'] for value in cdl_cfg()['vars']['suffixes'].values()]
    
    def renamed_cols(self, cols):
        "Flatten columns name"
        return [inner if outer == "value" else f'{inner}{outer}'
                if inner else outer
                for outer, inner in cols]

    def pivot(self, df):
        # Among all possible 'derived cols' select the ones present in df
        derived_coi = [col for col in self.derived_cols if col in df.columns]
        
        df=df.reset_index()
        
        idx = list(set(df.columns) - set(self.columns + derived_coi + self.values))
        
        # Create a fill_value to replace NaN values in the columns used as the index in the pivot table.
        # Check if num_fill_value is already in the dataframe index values. If num_fill_value already exists
        # then increase num_fill_value by 1 until a value is found for num_fill_value that is not in the dataframe. 
        num_fill_value = 99999999999999
        while (df[idx] == num_fill_value).any().any():
            num_fill_value += 1
        # Fill in nan values for each col found in idx. 
        for col in idx:   
            if pd.api.types.is_numeric_dtype(df[col]):
                fill_value = num_fill_value
            if pd.api.types.is_string_dtype(df[col]):
                fill_value = 'NOT AVAILABLE'
                
            df[col]=df[col].fillna(fill_value)

        pivot_df=df.pivot_table(index=idx,
                              columns=self.columns,
                              values=self.values + derived_coi,
                              fill_value=np.nan,
                              aggfunc=lambda x: x
                              ).reset_index()
        
        pivot_df.index.name = 'sample'
        pivot_df=pivot_df.reset_index('sample')
        
        # Replace fill_value  with  np.nan
        pivot_df[idx]=pivot_df[idx].replace({'NOT AVAILABLE': np.nan,
                                             num_fill_value : np.nan})
        return (pivot_df)

    def __call__(self, tfm):
        for grp in tfm.dfs.keys():
            tfm.dfs[grp] = self.pivot(tfm.dfs[grp])
            tfm.dfs[grp].columns = self.renamed_cols(tfm.dfs[grp].columns)

In [None]:
#|eval: false
dfs = load_data(fname_in)
tfm = Transformer(dfs, cbs=[
                            RemapRdnNameCB(),
                            ParseTimeCB(),
                            EncodeTimeCB(cfg()),                             
                            NormalizeUncCB(),
                            LookupBiotaSpeciesCB(get_maris_species, unmatched_fixes_biota_species),
                            CorrectWholeBodyPartCB(),
                            LookupBiotaBodyPartCB(get_maris_bodypart, unmatched_fixes_biota_tissues),
                            LookupBiogroupCB(partial(get_biogroup_lut, species_lut_path())),
                            LookupUnitCB(renaming_unit_rules),
                            LookupDetectionLimitCB(get_detectionlimit_lut),
                            RemapDataProviderSampleIdCB(),
                            RemapSampleNotesIdCB(),
                            RemapMeasurementNotesIdCB(),
                            RemapStationIdCB(),
                            FormatCoordinates(),
                            SanitizeLonLatCB(),
                            SanitizeValue(),
                            SelectAndRenameColumnCB(get_renaming_rules_netcdf),
                            ReshapeLongToWide(), 
                            CompareDfsAndTfm(dfs)
                            ])
tfm()
print(pd.DataFrame.from_dict(tfm.compare_stats) , '\n')

                                                    seawater  biota
Number of rows in dfs :                                18856  15314
Number of rows in tfm.dfs:                                 0      0
Number of dropped rows:                                18856  15314
Number of rows in tfm.dfs + Number of dropped r...     18856  15314 



In [None]:
tfm.dfs['seawater']

Unnamed: 0,sample,lon,smp_depth,time,index,lat


***

## Review OSPAR uncertainty

In [None]:
#|eval: false
dfs = load_data(fname_in)
tfm = Transformer(dfs, cbs=[RemapRdnNameCB(),
                            ParseTimeCB(),
                            EncodeTimeCB(cfg(), verbose = True),
                            CompareDfsAndTfm(dfs)
                            ])
tfm()
print(pd.DataFrame.from_dict(tfm.compare_stats) , '\n')

                                                    seawater  biota
Number of rows in dfs :                                18856  15314
Number of rows in tfm.dfs:                             18308  15314
Number of dropped rows:                                  548      0
Number of rows in tfm.dfs + Number of dropped r...     18856  15314 



TODO: Review OSPAR uncertainty, especially in cases where the uncertainty significantly exceeds the measured value. Although it's possible for uncertainty to be greater than the measured value, consider what implications this has for the analysis.

In [None]:
grp='seawater'
print ('Number of rows where uncertainty is greater than value for seawater:')
print(tfm.dfs[grp][tfm.dfs[grp]['Uncertainty'] > tfm.dfs[grp]['Activity or MDA']].shape[0])

Number of rows where uncertainty is greater than value for seawater:
88


In [None]:
grp='biota'
print ('Number of rows where uncertainty is greater than value for seawater:')
print(tfm.dfs[grp][tfm.dfs[grp]['Uncertainty'] > tfm.dfs[grp]['Activity or MDA']].shape[0])

Number of rows where uncertainty is greater than value for seawater:
100


In [None]:
grp='seawater'
#grp='biota'

In [None]:
tfm.dfs[grp][tfm.dfs[grp]['Uncertainty'] > tfm.dfs[grp]['Activity or MDA']][['Nuclide','Activity or MDA','Uncertainty']]

Unnamed: 0,Nuclide,Activity or MDA,Uncertainty
1158,cs137,0.002800,0.327600
1160,cs137,0.002900,0.336400
1162,cs137,0.002500,0.332500
1164,cs137,0.002500,0.345000
1166,cs137,0.003800,0.334400
...,...,...,...
15971,h3,2.332310,37.720058
15973,h3,0.777316,111.320048
15977,h3,0.155439,552.877118
18788,cs137,0.002460,0.400980
