# DTW with python-dtw

! NOTE!

2023-09-18 11:10:00: this code contains my attempts to use `python-dtw` , originally in [dynamic_time_warping](./dynamic_time_warping.ipynb). As the package is intended primarily to return distance metrics for classification (?) purposes, it proved difficult to extract the warping path, and I opted to use `dtwalign` instead.


This notebook will explore the use of `dtw-python` for multiple signal alignment via DTW. It utilizes a representative sample method developed in [identifying_most_similar_signal](./identifying_most_similar_signal.ipynb)


## dtw-python

To use `dtw-python` , first we identify the reference sample, and then for every sample in the set, call `dtw` with `x` as the 'query', and `y` as the reference. This returns a `dtw` object for each sample which contains the results.

In the `dtw-python` package there are a number of constraint parameters which control how the software behaves, including algorithm constraints for handling special cases.

DTW alignment generally works by matching multiple elements in the query signal to one in the reference (compression(?)) or vice versa (stretching (?)). This behavior is handled by the `step_pattern` parameter.

Within the package, `index1` and the y-axis (where relevent) refer to the query, and `index2` and the x-axis to the reference.


In [None]:
import pandas as pd
import numpy as np
from dtw import dtw
from wine_analysis_hplc_uv import definitions
import seaborn as sns
import matplotlib.pyplot as plt
from wine_analysis_hplc_uv.old_signal_processing.mindex_signal_processing import (
    SignalProcessor,
)

scipro = SignalProcessor()
df = pd.read_parquet(definitions.XPRO_YPRO_DOWNSAMPLED_PARQ_PATH)
df.head()

In [None]:
# the keys for the primary index in either format.

sw_index = ["samplecode", "wine"]

First identify the reference sample


In [None]:
# test whetehr scipro.most_correlated returns the expected value

reference = df.pipe(scipro.most_correlated)
reference

In [None]:
# check whether the samplecode returned by `scipro.most_correlated` is the expected sample

df[reference]

In [None]:
# Create an aggregate dataframe of DTW objects for the sampleset

# df =
df_obj = (
    df.pipe(
        lambda df: df.stack(["samplecode", "wine"]) if df.columns.nlevels == 3 else df
    )
    .reorder_levels(["samplecode", "wine", "mins"])
    .sort_index()
    .groupby(["samplecode", "wine"])
    .apply(lambda x: dtw(x=x, y=df.loc[:, reference]))
    .to_frame(name="dtw_obj")
)
df_obj

In [None]:
# extract index1 and index2 for each sample from the objects
df_index1 = (
    df_obj.groupby(sw_index)
    .apply(lambda x: x["dtw_obj"].values[0].index1)
    .to_frame(name="index1")
    .explode("index1")
    .assign(i=lambda x: x.groupby(sw_index).cumcount())
    .set_index("i", append=True)
    .rename_axis("vars", axis=1)
    .unstack(sw_index)
    .reorder_levels(axis=1, order=["samplecode", "wine", "vars"])
)
df_index1

As we can see, there are NaNs in all columns EXCEPT for torbreck struie 2. This is because it takes a lot more work to align that signal to the reference, but then when the groups are realigned, the elements not present in the other signals are filled with NA.


In [None]:
df_index1.plot()

How much NA exactly?


In [None]:
(df_index1.isna().groupby(sw_index, axis=1).sum().sum() / df_index1.shape[0] * 100)

Not an extreme amount to be honest. It appears that all the signals need significant work to align with the reference.


It is not clear how to extract the aligned query series from the alignment process.

this [stack overflow](https://stackoverflow.com/questions/25735766/understanding-dynamic-time-warping) post answer says that the `.indexn` contains the mapping, which is also what the documentation says, but what is not obvious is what I am supposed to do with it.

A bit of rationalisation - if the indexes are the warping functions, and they are given as integer vectors, then they must be the elementwise mapping of query to reference, i.e. index2 is the method of mapping 


Now do the same for index2


In [None]:
# extract index1 and index2 for each sample from the objects
df_index2 = (
    df_obj.groupby(sw_index)
    .apply(lambda x: x["dtw_obj"].values[0].index2)
    .to_frame(name="index2")
    .explode("index2")
    .assign(i=lambda x: x.groupby(sw_index).cumcount())
    .set_index("i", append=True)
    .rename_axis("vars", axis=1)
    .unstack(sw_index)
    .reorder_levels(axis=1, order=["samplecode", "wine", "vars"])
)
df_index2

In [None]:
df_index2.plot()

Now join them


In [None]:
df_index1.join(df_index2).sort_index(level="samplecode", axis=1)

From [@giorgino_2009], index1 is $\phi(k)_x$, and index2 is $\phi(k)_y$. If we 'apply' them by multiplying the original series by the vector, will we get the aligned signal?


So first plot the reference signal


In [None]:
# create aligned signal by indexing original series by index1

index1_154 = (df_index1.iloc[:, [0]]).dropna()

index1_154

In [None]:
original_154 = df.loc[:, ["154"]]
original_154

In [None]:
# index original 154 with corresponding index1
fig, ax = plt.subplots(1)
aligned_154 = original_154.iloc[index1_154.values.flatten()]
display(aligned_154)
aligned_154.plot(ax=ax)
df[reference].plot(ax=ax)
df["154"].plot(ax=ax)

## Producing an Aligned Tensor

Post-alignment we are expecting to output a tensor of sample series of the same length, aligned to the reference.


Now we've got 1015 rows and we should have 600. Presumably there are duplicates now, the result of stretching. One method, mentioned by @tomasi2004 would be to aggregate by mean the repeat time points in the warped signal.


In [None]:
aligned_154 = aligned_154.groupby("mins").mean()
aligned_154
# aligned_154.plot()

In [None]:
fig, ax = plt.subplots(1)
aligned_154.plot(ax=ax)
df[reference].plot(ax=ax)
df["154"].plot(ax=ax)
plt.tight_layout()
plt.legend(bbox_to_anchor=(1.05, 1), loc=2, borderaxespad=0.0)

At least they are the same length now, however, the peak maxima is misaligned, which I would regard as the most vital landmark within the signals to align. Lets observe all of the samples..


In [None]:
# add index1 index element integers with time series values sourced from df


def get_index(d):
    r = df.index[d["index1"]]
    return r


df_index1 = (
    df_index1.stack(["samplecode", "wine"])
    .assign(index1_td=lambda df: df.apply(get_index, axis=1))
    .rename_axis("warp_func", axis=1)
    .unstack(["samplecode", "wine"])
    .reorder_levels(["samplecode", "wine", "warp_func"], axis=1)
    .sort_index(axis=1)
)
df_index1

In [None]:
# get an index object with same levels and format as df

df_index1_mins_index = (
    df_index1.stack(["samplecode", "wine"])
    .rename({"index1_td": "mins"}, axis=1)
    .reset_index("i", drop=True)
    .set_index("mins", append=True)
    .index
)

df_index1_mins_index

In [None]:
# iterate through each sample and index with corresponding index1 then aggregate repeat time points
# fig, axs = plt.subplots(nrows=1, ncols=2)
a_df = (
    df.stack(["samplecode", "wine"])
    .reorder_levels(["samplecode", "wine", "mins"])
    .assign(
        aligned=lambda df: df.loc[df_index1_mins_index]
        .groupby(["samplecode", "wine", "mins"])
        .mean()
    )
    .reset_index()
    .rename({"value": "query"}, axis=1)
)
a_df

In [None]:
#

plot_df = (
    a_df.set_index("samplecode")
    .rename({"torbreck-struie": "tbs"})
    .reset_index()
    .assign(wine=lambda df: df.samplecode + "_" + df.wine)
    .set_index(["wine", "mins"])
    .drop("samplecode", axis=1)
    .melt(ignore_index=False)
    # .melt()
)
# df
#  .unstack(['samplecode','wine'])
#  .reorder_levels(['samplecode','wine','vars'],axis=1)
#  .sort_index(axis=1)

g = sns.FacetGrid(plot_df, col="vars")
g.map_dataframe(sns.lineplot, x="mins", y="value", hue="wine", legend=True)
g.add_legend()
display(plot_df)

As we can see, the expected aligned dataset is exactly the same as the original. This is because my approach of recompressing the stretched sections simply reverses the warping.
