In [None]:
# OPTIONAL: Load the "autoreload" extension so that code can change
%load_ext autoreload

# OPTIONAL: always reload modules so that as you change code in src, it gets loaded
%autoreload 2

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

In [None]:
from pathlib import Path


In [None]:
fmap_rainfall = Path(r"./tests/data/test_rainfalldata")
fmap_erosivity = Path(r"./tests/data/test_erosivitydata/") # Folder path where results are written to (see above).

# Erosivity calculation

Input is a rainfall time series:

In [None]:
from rfactor.process import load_rain_folder

In [None]:
all_rainfall = load_rain_folder(Path("../../tests/data/test_rainfalldata/"))

In [None]:
all_rainfall.shape

In [None]:
all_rainfall.head()

Algorithm works on year/station basis, so let's pick a year and a station for further development:

In [None]:
P01_001_2018 = all_rainfall[(all_rainfall["station"] == "P01_001") & 
                            (all_rainfall["datetime"].dt.year == 2018)].copy()

In [None]:
P01_001_2018.set_index("datetime")["rain_mm"].plot(figsize=(18, 4))

In [None]:
P01_001_2018.head()

In [None]:
# From datetime to minutes difference since start of the year - bit hacky
current_year = int(P01_001_2018["datetime"].dt.year.unique()[0])
start_shift = (P01_001_2018.iloc[0, 2] - pd.Timestamp(f"{current_year}-01-01")).total_seconds() /60.
P01_001_2018["min_since_start_year"] = P01_001_2018["datetime"].diff().dt.total_seconds().cumsum() /60. + start_shift
P01_001_2018.iloc[0, -1] = start_shift

In [None]:
P01_001_2018.head()

## The erosivity implementation

### 1. Define starting point the individual rain events

In [None]:
event_split = "6 hours"

In [None]:
P01_001_2018["event_start"] = False
P01_001_2018.loc[P01_001_2018["datetime"].diff() >= event_split, "event_start"] = True  # var c in matlab
P01_001_2018.loc[P01_001_2018.index[0], "event_start"] = True

In [None]:
P01_001_2018.head(10)

In [None]:
max_idx_distance = pd.Series(P01_001_2018[P01_001_2018["event_start"] == True].index).diff().max() #var max in matlab
max_idx_distance

__?? `max` is the maximum distance in terms of index difference -> also in terms of time the longest event?__

-> use is to define the size of the matrix in the next step to make sure the longest event fits;

### 2. Isolate the individual rain events -> I add an event idx

In [None]:
P01_001_2018["event_idx"] = P01_001_2018["event_start"].cumsum()

In [None]:
P01_001_2018

__What about lines 72-86 in matlab? add time steps with zero-rain values?__

-> probably not required if everything fits in groupby approach

### 3a. Cumulative rain for each event

In [None]:
P01_001_2018["event_rain_cum"] = P01_001_2018.groupby("event_idx")["rain_mm"].cumsum()

In [None]:
P01_001_2018.head()

### 3b. Rain energy

Example with a single event

In [None]:
temp = P01_001_2018[P01_001_2018["event_idx"] == 1].copy()

In [None]:
temp["rain_energy"] = 0.1112*((temp["rain_mm"]*6.)**0.31)*temp["rain_mm"]

__?? line 105, should it be j or 1? BUG?__

```
% j is currently 135 and NOT 1
pluviophase(5,j,i)=0.1112*(pluviophase(4,j,i)^0.31);  % berekenen van neerslagenergie per mm
```

__?? factor 6 on line 104 -> tight linked to 10min interval?__  YES IT IS!

In [None]:
temp["rain_energy"].sum() # to make comparible with matlab version add `- 0.034867`

In [None]:
temp.head()

In [None]:
def rain_energy_per_unit(rain):
    """"""
    rain_energy = 0.1112*((rain*6.)**0.31)*rain
    return rain_energy.sum()

In [None]:
rain_energy_per_unit(temp["rain_mm"])

Apply the calculation to the entire set of defined rain events:

In [None]:
P01_001_2018["event_energy"] = P01_001_2018.groupby("event_idx")["rain_mm"].transform(rain_energy_per_unit)

In [None]:
P01_001_2018.head(20)

### 4. Maximum intensity within 30 minutes

First work on a subset of the data:

In [None]:
temp = P01_001_2018[P01_001_2018["event_idx"] == 1].copy()

In [None]:
temp.head()

__- ?? Are the lines #L134-L144 effectively used in the code? In lines #L149-L150 the variables are overwritten (also warning by matlab itself__

__- ?? Why is on line #L159 a factor 2 required? As it gets multiplied again later?__

__- ?? Why is on line #L130 the lenght of the interval only 20minutes? If the concept is 20 + 5 min in each direction of the interval, one would assume 10' input data? Is this a valid assumption as I do not see any input check and other steps are based on checking the diff?__

__- ?? lines 132-133 can have side effects as well and - I think - only works as all series are prolonged during calculation (second dimension); maxprecip30min should not be overwritten as such__

A naive implementation reproducing the matlab implementation:

In [None]:
def maximum_intensity_matlab_clone(df):
    """Apply for a single_event method of Matlab implementation (Verstraete)"""
    
    current_year = df["datetime"].dt.year.unique()
    if not len(current_year) == 1:
        raise Exception("Data should all be in the same year.")

    df["minutes_since"] = (
        df["datetime"] - pd.Timestamp(f"{current_year[0]}-01-01")
    ).dt.total_seconds().values / 60    

    timestamps = df["minutes_since"].values
    rain = df["rain_mm"].values
    rain_cum = df["event_rain_cum"].values

    maxprecip_30min = 0.
    max_p30 = 0.

    if timestamps[-1] - timestamps[0] <= 30:
        maxprecip_30min = rain[0]*2  # *2 to mimick matlab

    for idx in range(len(df)-1):

        begin_30min = timestamps[idx]
        eind_30min = timestamps[idx] + 20
        begin_rain = rain_cum[idx] - rain[idx]
        if timestamps[idx + 1] > eind_30min: # next timestamp later than 30minutes interval - not effective
            precip_30min = rain[idx]        

        eind_rain = np.interp(eind_30min, timestamps, rain_cum)    
        precip_30min = eind_rain - begin_rain;

        if precip_30min > maxprecip_30min:
            maxprecip_30min = precip_30min

    return maxprecip_30min*2

In [None]:
max_intensity_event_matlab_clone = (P01_001_2018.groupby("event_idx")
 .apply(maximum_intensity_matlab_clone)
 .rename("max_30min_intensity_clone")
 .reset_index())

In [None]:
# Single measurement events
# P01_001_2018[P01_001_2018.groupby("event_idx")["station"].transform(len) == 1]

In [None]:
P01_001_2018 = pd.merge(P01_001_2018, max_intensity_event_matlab_clone, how="left")

In [None]:
P01_001_2018.head()

#### intermezzo _? Is it an option to change the algorithm on defining the max intensity ??_  

Alternative implementations to extract the max intensity of a 30min interval:

1. Rely on linear interpolation (cfr. matlab implementation) -> assume missing data is not zero

- interpolate linearly to interval (in this case 10min) of the data series (max value) 
- rolling interval of 30 minutes 
- take maximum of rolling interval sum of the rainfall within that period

:-) works for different time series intervals; :-( interpolation error as linear interpolation induces errors (overestimation)

In [None]:
time_series_resampled = temp.resample("10Min", on="datetime")["rain_mm"].max() # missing 10min are now Nan
time_series_resampled_interp = time_series_resampled.interpolate(method='linear')
rolling_sum = time_series_resampled_interp.rolling("30Min").sum()
rolling_sum.max()

In [None]:
def maximum_intensity_interpolate(df, interval="30Min", input_freq="10Min"):
    """Apply for a single_event method with linear interpolation"""
    time_series_resampled = df.resample(input_freq, on="datetime")["rain_mm"].max()
    time_series_resampled_interp = time_series_resampled.interpolate(method='linear')
    rolling_sum = time_series_resampled_interp.rolling(interval).sum()
    return rolling_sum.max() * 2   # formula requires mm/hr, intensity is on half an hour

2. Resample with the available data directly

- using the data available, rolling/resample over 30min interval directly
- take the maximum of all intervals

(This is equivalent with the previous method, but instead of interpolation, missing intervals are assumed to have no rainfall.)

:-) straight forward using available data only; :-( probably an underestimation?

In [None]:
temp.rolling("30Min", on="datetime")["rain_mm"].sum().max()

In [None]:
def maximum_intensity_fillna(df, interval="30Min", input_freq="10Min"):
    """Apply for a single_event method with zero values for no value"""
    # formula requires mm/hr, intensity is on half an hour
    return df.rolling("30Min", on="datetime")["rain_mm"].sum().max()*2

For ilustration, the result would be the same as doing:

```
time_series_resampled = temp.resample("10Min", on="datetime")["rain_mm"].max() # missing 10min are now Nan
time_series_resampled_interp = time_series_resampled.fillna(value=0)
rolling_sum = time_series_resampled_interp.rolling("30Min").sum()
rolling_sum.max()
```

Let's compare these methods...

In [None]:
# run on the single event
(maximum_intensity_fillna(temp, interval="30Min", input_freq="10Min"), 
 maximum_intensity_interpolate(temp, interval="30Min", input_freq="10Min"))

Apply to the entire set of events:

In [None]:
max_intensity_event_interpolate = (P01_001_2018.groupby("event_idx")[["datetime", "rain_mm"]]
 .apply(maximum_intensity_interpolate)
 .rename("max_30min_intensity_interpolate")
 .reset_index())

max_intensity_event_fillna = (P01_001_2018.groupby("event_idx")[["datetime", "rain_mm"]]
 .apply(maximum_intensity_fillna)
 .rename("max_30min_intensity_fillna")
 .reset_index())

In [None]:
max_intensity_event_fillna.shape

In [None]:
P01_001_2018 = pd.merge(P01_001_2018, max_intensity_event_interpolate, how="left")
P01_001_2018 = pd.merge(P01_001_2018, max_intensity_event_fillna, how="left")

In [None]:
P01_001_2018.head(10)

The difference between the methods:

In [None]:
fig, ax = plt.subplots(figsize=(16, 8))
max_intensity_event_matlab_clone["max_30min_intensity_clone"].plot(ax=ax, label="matlab_clone", 
                                                                   linewidth=4, linestyle='-.')
max_intensity_event_interpolate["max_30min_intensity_interpolate"].plot(ax=ax, label="interpolate")
max_intensity_event_fillna["max_30min_intensity_fillna"].plot(ax=ax, label="fillna")
ax.legend()

In [None]:
P01_001_2018.head()

### 5. Gather results for each event

In [None]:
P01_001_2018.head()

Extract summary for each event:

In [None]:
columns = ["event_rain_cum", "max_30min_intensity_clone", "event_energy", "datetime"]
P01_001_2018_events = P01_001_2018.groupby("event_idx")[columns].agg({"datetime": "first",
                                                                      "event_rain_cum": "last", 
                                                                      "max_30min_intensity_clone": "last", 
                                                                      "event_energy": "last", 
                                                                     })
P01_001_2018_events

In [None]:
P01_001_2018_events["erosivity"] = P01_001_2018_events["event_energy"] * P01_001_2018_events["max_30min_intensity_clone"]
P01_001_2018_events.head(10)

Add cumulative value of the events combined:

In [None]:
P01_001_2018_events["all_events_cum"] = P01_001_2018_events["event_rain_cum"].shift(1, fill_value=0.).cumsum()

In [None]:
P01_001_2018_events.head()

__?? Is this shift(1) really required? - done to mimick the `cumul` var in matlab__

Add days from start of year (conform matlab implementation):

In [None]:
current_year = P01_001_2018_events["datetime"].dt.year.unique()
if not len(current_year) == 1:
    raise Exception("Data should all be in the same year.")

days_since_start = (
    (P01_001_2018_events["datetime"] - pd.Timestamp(f"{current_year[0]}-01-01")).dt.total_seconds()
    / 60.0
    / 1440.0
)

P01_001_2018_events["days_since_start_year"] = days_since_start

Exclude events with total rainfall lower than threshold:

In [None]:
MIN_CUMUL = 1.27

#P01_001_2018_events.loc[P01_001_2018_events["event_rain_cum"] < MIN_CUMUL, 
#                        ["erosivity", "all_events_cum", 
#                         "max_30min_intensity_clone", "days_since_start_year"]] = 0.

P01_001_2018_events_subset = P01_001_2018_events[P01_001_2018_events["event_rain_cum"] > MIN_CUMUL].copy()
P01_001_2018_events_subset

__?? Should all_events_cum include the events that were excluded as below threshold? A matter of order of execution that matters; if someone want to do cumul, this won't be reproducible anymore__

### 6. Cumulative erosivity

In [None]:
P01_001_2018_events_subset["erosivity_cum"] = P01_001_2018_events_subset["erosivity"].cumsum()

In [None]:
P01_001_2018_events_subset.head(15)

### Derived stats

### Berekenen van maandelijkse erosiviteit

__?? line 212 what about leap years ?__

__! the implementation is not returned by Matlab anyhow__

TO CHECK -> IS THIS correct interpretation of the description/implementation?

In [None]:
# Monthly
P01_001_2018_events_subset.resample("M", on="datetime")["erosivity"].sum().plot()

In [None]:
# Bi-weekly
P01_001_2018_events_subset.resample("SM", on="datetime")["erosivity"].sum().plot()

### Use the new package implementation

In [None]:
from rfactor import compute_erosivity, maximum_intensity_matlab_clone, maximum_intensity
from rfactor.rfactor import _compute_erosivity

In [None]:
# overview of all the station-year combinations
all_rainfall.groupby(["station", all_rainfall["datetime"].dt.year]).first().index

In [None]:
station_year = all_rainfall[(all_rainfall["station"] == "P01_003") & 
                            (all_rainfall["datetime"].dt.year == 2005)].copy()

In [None]:
# compare the algorithms in term of erosivity -- TODO- should be done for all together
new_alg = compute_erosivity(station_year, maximum_intensity)
clone_alg = compute_erosivity(station_year, maximum_intensity_matlab_clone)

fig, ax = plt.subplots(figsize=(5, 5))
ax.scatter(new_alg["erosivity"], clone_alg["erosivity"], alpha=0.5)
ax.set_xscale("log")
ax.set_yscale("log")

In [None]:
new_alg["erosivity_cum"].iloc[-1]

In [None]:
new_alg.resample("M", on="datetime")["erosivity"].sum()

### Apply to multiple station/years - (parallel application)

In [None]:
# for testing, smaller dataset
rainfall_subset = all_rainfall[all_rainfall["station"].isin(["P01_001", "P01_003", "P11_043"])]

Apply the erosivity method to all combinations of year and station:

In [None]:
from functools import partial

In [None]:
%%time

my_fun = partial(_compute_erosivity, intensity_method=maximum_intensity)

# overview of all the station-year combinations
all_erosivity = rainfall_subset .groupby(   # all_rainfall
    ["station", all_rainfall["datetime"].dt.year]
).apply(my_fun)

In [None]:
all_erosivity

This is an ideal use case for parallel execeution:

In [None]:
from joblib import Parallel, delayed
import multiprocessing as mp

In [None]:
%%time

#from functools import partial
#erosivity_apply = partial(erosivity, intensity_method=maximum_intensity) # missing the name columns :-(


def erosivity_apply(name, group):
    """Wrapper function for parallel execution of erosivity on groups"""
    df = _compute_erosivity(group, maximum_intensity)
    df[["station", "year"]] = name
    return df

grouped = rainfall_subset.groupby(["station", all_rainfall["datetime"].dt.year])
results = Parallel(n_jobs=mp.cpu_count() - 1)(delayed(erosivity_apply)(name, group) for name, group in grouped)
all_erosivity_par = pd.concat(results)

In [None]:
all_erosivity_par

__?? ok to use joblib or should we use the tqdm `process_map` function of current package?__

### Integration with legacy file/folder setup

1. Read rainfall from the original format  => from lots of files => single rain df
1. Push output to the original output

In [None]:
def _extract_metadata_from_file_path(file_path):
    """Get metadata from file format
    
    Parameter
    ---------
    file_path : pathlib.Path
    """
    return {"station": "_".join(file_path.stem.split("_")[:-1]), 
            "year": file_path.stem.split("_")[-1] }

def load_rain_data(file_path, station, year):
    """ Load rainfall data file -> adjusted current package 
    """
    raw_data = pd.read_csv(file_path, delimiter=" ", header=None, 
                           names=["minutes_since", "rain_mm"])
    raw_data["datetime"] = pd.Timestamp(f"{year}-01-01") + pd.to_timedelta(raw_data["minutes_since"], 
                                                                           unit="min")
    raw_data["station"] = station
    return raw_data

In [None]:
file_path = Path("../../tests/data/test_rainfalldata/P01_001_2018.txt")
station, year = _extract_metadata_from_file_path(file_path).values()
station, year

In [None]:
df = load_rain_data("../../tests/data/test_rainfalldata/P01_001_2018.txt", station, year)

For a large set of files - making the service compatible (need to put this into function):

In [None]:
folder_path = Path("../../tests/data/test_rainfalldata")

lst_df = []
for file_path in folder_path.glob("*.txt"):
    station, year = _extract_metadata_from_file_path(file_path).values()
    lst_df.append(load_rain_data(file_path, station, year))
    
all_rain = pd.concat(lst_df)
all_rain = all_rain.sort_values(["station", "minutes_since"])
all_rain.index = range(len(all_rain))
all_rain

In [None]:
from rfactor import compute_erosivity
compute_erosivity(rainfall_subset)

Compatibility with the old format in terms of erosivity output:

In [None]:
demo_rain = load_rain_data("../../tests/data/test_rainfalldata/P01_001_2018.txt", station, year)
demo_erosivity = compute_erosivity(df, maximum_intensity).head()

In [None]:
demo_erosivity["year"].unique()

In [None]:
from rfactor.process import write_erosivity_data

Works for a single and multiple station/year combinations:

In [None]:
write_erosivity_data(demo_erosivity, Path("/home/stijnvh/draft/"))

In [None]:
write_erosivity_data(all_erosivity_par, Path("/home/stijnvh/draft/"))

### New workflow proposal:

- Read in whatever files you want to read in; directly or legacy format ( -> not part of the package  ? sacha, ok)
- Filter resulting rain dataframe however you want (pandas is there for you)
- Apply the erosivity function to it
- Do with the erosivity data whatever you want; save as such or legacy format


_+ convenience function to load erosivity from legacy format  ? sacha required?

In [None]:
from rfactor import maximum_intensity_matlab_clone

In [None]:
from rfactor.process import load_rain_folder, write_erosivity_data, load_rain_file
from rfactor import compute_erosivity_parallel

In [None]:
load_rain_file(Path("../../tests/data/test_rainfalldata/P01_001_2018.txt"))

In [None]:
wf_rain = load_rain_folder(Path("../../tests/data/test_rainfalldata/"))
wf_rain

In [None]:
wf_rain_selected = wf_rain[(wf_rain["datetime"].dt.year.isin([2005, 2006])) & 
                           (wf_rain["station"].isin(['P09_012', 'P09_016']))]

In [None]:
wf_rain_selected.shape

In [None]:
rfactor = compute_erosivity_parallel(wf_rain_selected)

voor rapportage:

- rfactor for jaar
- resample halfmaandelijks ('SM') en maandelijk ('M')

In [None]:
rfactor.groupby(["station", "year"])["erosivity_cum"].last().reset_index() # r for jaar

In [None]:
write_erosivity_data(rfactor, Path("/home/stijnvh/draft/rfactors"))

In [None]:
df = load_rain_file(Path("../../tests/data/test_rainfalldata/P01_001_2018.txt"))
ei = compute_erosivity_parallel(df)
write_erosivity_data(ei, Path("/home/stijnvh/draft/"))

In [None]:
rfactor

What if we have another source of data? - 10min required...

In [None]:
import os
from pywaterinfo import Waterinfo
vmm = Waterinfo("vmm", token=os.environ.get("VMM_TOKEN")) 

In [None]:
rain_stations = vmm.get_timeseries_list(timeseriesgroup_id=192896)
rainfall = vmm.get_timeseries_values(ts_id=rain_stations["ts_id"][0], start="2019-01-01", end="2019-12-31")

In [None]:
waterinfo_demo = rainfall[["Timestamp", "Value"]].rename(columns={"Timestamp": "datetime", "Value": "rain_mm"})
waterinfo_demo = waterinfo_demo[waterinfo_demo['rain_mm'] > 0]

In [None]:
compute_erosivity(waterinfo_demo, intensity_method=maximum_intensity)

__CODA__ -> the rainfall statistics function...

In [None]:
from rfactor.process import compute_rainfall_statistics

In [None]:
all_rain["year"] = all_rain["datetime"].dt.year
all_rain.head()

In [None]:
compute_rainfall_statistics(all_rain.rename(columns={"rain_mm": "value"}))

Remaining questions?

- clip_erosivity_data function -> explain?
- is er een reden om het weg te schrijven in die specifieke vorm?
- noodzaak van de file overview met de considers? -> filter met pandas als vervanging?
- rainfall statistics -> "x", "y" ??

-- quick intermediate check --

In [None]:
# Check if the time difference works   - OK
diffs = np.loadtxt("tests/data/test_rainfalldata/P01_001_2018.txt", usecols=0)
np.testing.assert_allclose(P01_001_2018["diff"].values, diffs)