# ETL
*Extract-transform-load*

This notebook does the data engineering steps required for the Met-ML training and evaluation:

- load fluxnet csvs
- fit transformers on the full dataset
- saves the preprocessed data and transformers for use in the next steps of the project

In [None]:
!mamba install -y -c conda-forge python-snappy openpyxl intake-parquet

In [None]:
# %load_ext lab_black

%load_ext autoreload
%autoreload 2

%matplotlib inline

In [None]:
import os
import intake
import pandas as pd
import numpy as np
import xarray as xr

from tqdm import tqdm
from joblib import dump

from sklearn.compose import ColumnTransformer

import matplotlib.pyplot as plt
import cartopy.crs as ccrs
from dask.distributed import Client

import fsspec


from met_ml.data import cat
from met_ml.train.fluxnet_etl import load_fluxnet, get_meta, make_lookback
from met_ml.train.models import fit_transformers, transform_df


SCRATCH = os.getenv('PANGEO_SCRATCH', 's3://pangeo-scratch/jhamman/')

lookback = 90
train_vars = ["P", "t_min", "t_max", "t", "lat", "elev"]
target_vars = ["SW_IN_F", "LW_IN_F", "PA_F", "RH"]

In [None]:
client = Client(n_workers=8)
client

In [None]:
# TODO: put this dataset in cat
# also, there may be a new version of this...?

all_site_meta = pd.read_excel("../met_ml/data/FLX_AA-Flx_BIF_LATEST.xlsx").set_index(
    ["SITE_ID", "VARIABLE"]
)["DATAVALUE"]

all_site_meta.head()

In [None]:
df = load_fluxnet(cat, all_site_meta)

In [None]:
df.count()

In [None]:
all_sites = all_site_meta.index.get_level_values(0).unique()
meta = get_meta(all_site_meta)
meta = pd.DataFrame.from_dict(meta, orient="index")
test_meta = meta.sort_values(["lat"])[::5]

fig = plt.figure(figsize=(12, 8))
ax = fig.add_subplot(1, 1, 1, projection=ccrs.Robinson())
ax.scatter(meta.lon, meta.lat, transform=ccrs.PlateCarree(), label="Training Sites")
ax.scatter(
    test_meta.lon,
    test_meta.lat,
    c="r",
    transform=ccrs.PlateCarree(),
    label="Validation Sites",
)
ax.set_global()
ax.stock_img()
ax.coastlines()
ax.gridlines()
ax.legend()

In [None]:
df.head()

In [None]:
meta = meta.reindex(df.index.levels[0])

In [None]:
def split_by_meta(df, keys, test_keys):
    train = []
    val = []

    for key in keys:
        if key in test_keys:
            val.append(df.loc[key])
        else:
            train.append(df.loc[key])
    return train, val


def qc(da):
    if da.isnull().sum() > 0:
        print("nans found")


# split the data into train/val groups
# x_train, x_val, y_train, y_val = split(x_data_computed, y_data_computed)
train, val = split_by_meta(df, meta.index.to_list(), test_meta.index.to_list())


# fit the transformers
trans = fit_transformers(train)
# x_trans = subset_columntransformer(trans.transformers, trainl


# create the 3D tensor for the LSTM including a lookback dimension
for name, df_list in zip(["train", "val"], [train, val]):
    da = xr.concat(
        [make_lookback(transform_df(trans, d), lookback=lookback) for d in df_list],
        dim="samples",
    )
    da.name = name
    print(name, da.shape)
    # display(da)
    qc(da)

    # save x data
    mapper = fsspec.get_mapper(f'{SCRATCH}/metml/etl/x_{name}.zarr')
    da.to_dataset(name='x').sel(features=train_vars).chunk({'samples': 10000}).to_zarr(mapper, mode='w', consolidated=True)

    # save y data
    mapper = fsspec.get_mapper(f'{SCRATCH}/metml/etl/y_{name}.zarr')
    da.to_dataset(name='y').sel(features=target_vars).isel(lookback=-1).chunk({'samples': 10000}).to_zarr(mapper, mode='w', consolidated=True)
    

In [None]:
def plot_vars(df):
    fig, axes = plt.subplots(
        ncols=len(df.columns), nrows=1, sharex=True, figsize=(22, 4)
    )

    for ax, (key, s) in zip(axes, df.items()):
        print(key)
        s.plot(ax=ax)
        ax.set_title(key)


d = train[0]
plot_vars(d[train_vars])
plot_vars(d[target_vars])

In [None]:
td = transform_df(trans, d)
plot_vars(td[train_vars])
plot_vars(td[target_vars])

In [None]:
# TODO: use ONNX for this
with fsspec.open(f'{SCRATCH}/metml/etl/fluxnet_all_transformers.joblib', mode='wb') as f:
    dump(trans, f)  # save for later

In [None]:
with fsspec.open(f'{SCRATCH}/metml/etl/fluxnet.csv', mode='w') as f:
    df.to_csv(f)
df.head()

In [None]:
with fsspec.open(f'{SCRATCH}/metml/etl/meta.csv', mode='w') as f:
    meta.to_csv(f)
meta.head()

In [None]:
with fsspec.open(f'{SCRATCH}/metml/etl/test_meta.csv', mode='w') as f:
    test_meta.to_csv(f)
test_meta.head()

In [None]:
trans