In [None]:
#hide
import sys
sys.path.append("..")
%load_ext autoreload
%autoreload 2

In [None]:
#default_exp data

# Data

> Methods to load time series data from motion files

In [None]:
#export
from fastcore.all import *
from timeseries.all import *
import pandas as pd
import numpy as np
from mlchaos.utils import df_slicer
import seaborn as sns

In [None]:
base_path = Path.home()

## Motions defined in files Poincare.plt and index.plt

We call an `item` as a pair `(fname_poincare, fname_index)`. From the Poincare file we get the data of the time series (the x or independent variable) and from the index file we get the y or dependent variable

### Load motion data from poincare file (independent variables)

In [None]:
#export
def load_poincare_maps(fname):
    "Load the data from a Poincare map.\
    Returns a numpy array with a shape (n_orbits, 2, sequence_length). \
    The time column is removed from the data, since it is always multiples of 2*pi"
    df = pd.read_table(fname, sep='\s+', names=['time', 'x', 'y'])
    seq_length = set(df.time).__len__()
    df = df.drop('time', axis=1)
    nparr = df_slicer(df, w=seq_length, s=seq_length)
    return nparr.transpose([0, 2, 1])

In [None]:
fname = base_path/'data/fli_rotnum/model_1/eps=0.01/Poincare.plt'
foo = load_poincare_maps(fname)
bar = pd.read_table(fname, sep='\s+', names=['time', 'x', 'y'])

In [None]:
seq_len = set(bar.time).__len__()
test_eq(foo.shape[2], seq_len)

In [None]:
test_eq(foo.shape[0], bar.shape[0]/seq_len)

In [None]:
test_eq(foo[0,:,:], bar.drop('time', axis=1)[:seq_len].values.transpose(1,0))

### Load index of the motion from index file (dependent variable)

In [None]:
#export
def load_index_file(fname, index_col=7, uncertainty_index=-1.):
    "Returns the index of an index file. In case the argument `index_col` has more \
    than one value, the value in `uncertainty_index` will be set."
    indices = pd.read_table(fname,
                         sep='\s+',
                         header=None,
                         usecols=L(index_col),
                         squeeze=True).values
    # indices is a numpy array
    if len(indices.shape) > 1:
        indices = array([x[0] if np.all(x == x[0]) else uncertainty_index for x in indices])
    return indices

In [None]:
fname = base_path/'data/fli_rotnum/model_1/eps=0.01/index_fli_freqana.plt'
foo = load_index_file(fname)

In [None]:
test_eq(foo.__class__, np.ndarray)
test_eq(foo.shape.__len__(), 1)

### Load x and y at once from both the Poincare and the index file

In [None]:
#export
@delegates(to=load_index_file, but=['fname'])
def load_poincare_index_pair(fname_poincare, fname_index, **kwargs):
    "Load the x data from a Poincare file and the y data from the index file.\
    Returns a tuple of 2 numpy arrays: "
    "x : array with a shape (n_samples, n_channels, sequence_length)"
    "y : array with a shape (n_samples)"
    "for the Poincare maps, n_channels is 2 (x and y)"
    return load_poincare_maps(fname_poincare), load_index_file(fname_index, **kwargs)

In [None]:
fname_poincare = base_path/'data/fli_rotnum/model_1/eps=0.01/Poincare.plt'
fname_index = base_path/'data/fli_rotnum/model_1/eps=0.01/index_fli_freqana.plt'
foo, bar = load_poincare_index_pair(fname_poincare, fname_index)

In [None]:
test_eq(foo.__class__, np.ndarray)
test_eq(bar.__class__, np.ndarray)
test_eq(len(foo), len(bar))

### Create TSData object from a list of (Poincare.plt, index.plt) files

In [None]:
#export
class TSDataChaos(TSData):
    @classmethod
    @delegates(to=load_poincare_index_pair)
    def from_poincare_and_index_files(cls, fnames, **kwargs):
        "`fnames` is a list of pairs (poincare_file, index_file), or a single pair."
        self = cls(fnames)
        self.x = []
        self.y = []
        self.dsname = []
        self.fnames = []
        self.ds = []
        xs,ys, dss = [],[],[]
        if isinstance(fnames, list):
            for i, (fn_poincare, fn_index) in enumerate(fnames):
                dsname = fn_poincare.parent.name
                x, y = load_poincare_index_pair(fn_poincare, fn_index, **kwargs)
                xs.append(x)
                ys.append(y)
                dss.append(np.repeat(i, len(y))) # Map each item with the dataset it belongs to
                self.fnames.append((fn_poincare, fn_index))
                self.dsname.append(dsname)
            self.x = np.concatenate(xs)
            self.y = np.concatenate(ys)
            self.ds = np.concatenate(dss).squeeze()
        else:
            fn_poincare, fn_index = fnames
            dsname = fn_poincare.parent.name
            self.fnames.append(fnames)
            self.dsname.append(dsname)
            self.x, self.y = load_poincare_index_pair(fn_poincare, fn_index, **kwargs)
            self.ds = np.repeat(0, len(y))
        return self

    @classmethod
    def from_poincare_maps(cls, fnames):
        "`fnames` is a list of paths to Poincare maps. No ys are provided"
        self = cls(fnames)
        self.x = []
        self.y = None
        self.dsname = []
        self.fnames = []
        xs = []
        if isinstance(fnames, list):
            for fn_poincare in fnames:
                x = load_poincare_maps(fn_poincare)
                xs.append(x)
                self.fnames.append((fn_poincare, fn_index))
                self.dsname.append(fn_poincare.parent.name)
            self.x = np.concatenate(xs)
        else:
            fn_poincare = fnames
            self.fnames.append(fn_poincare)
            self.dsname.append(fn_poincare.parent.name)
            self.x = load_poincare_maps(fn_poincare)
        return self

In [None]:
fnames = (base_path/'data/fli_rotnum/model_1/eps=0.01/Poincare.plt', 
            base_path/'data/fli_rotnum/model_1/eps=0.01/index_fli_freqana.plt')
foo = TSDataChaos.from_poincare_and_index_files(fnames)
foo

In [None]:
test_eq(foo.__class__, TSDataChaos)

In [None]:
bar = TSDataChaos.from_poincare_and_index_files(fnames, index_col=8)
bar

In [None]:
x0, y0 = foo.x[0], foo.y[0]
y0

In [None]:
test_ne(foo.y, bar.y) 

In [None]:
show_timeseries(x0, title=y0)

## Get items from a TSDataChaos object

In [None]:
#export
def get_motion_items(fnames):
    "get_ts_items return list of tuples. Each tuple corresponds \
    to a timeserie (nump.ndarray) and a label (string). fnames is not \
    an array of paths, but an array of tuples of paths. Each tuple has the \
    path of the Poincare map and the path of the index file"
    data = TSDataChaos.from_poincare_and_index_files(fnames)
    return data.get_items()

### Show labelled initial condition map

In [None]:
#export
@delegates(sns.scatterplot, but=['data', 'x', 'y', 'hue', 'marker'])
def show_labelled_ic_map(poinc_maps:Union[torch.Tensor, np.ndarray]=None, 
                         lbls:list=None, **kwargs):
    "Show a scatter plot with the initial conditions (x0, y0) of each Poincare map in \
    `poinc_maps`, coloured with the labels given in `lbls`.'. The argument `poinc_maps` \
    expects a (n_items, n_channels, n_points) numpy array or tensor"
    initial_conditions = poinc_maps[:,:,0]
    qux = pd.DataFrame(initial_conditions, columns=['x0', 'y0'])
    qux['class'] = lbls
    return sns.scatterplot(data=qux, x='x0', y='y0', hue='class', marker='.', **kwargs)

In [None]:
mock_poinc_map = foo = np.random.randn(1000, 2, 100)
mock_lbls = random.choices(['chaotic', 'librational', 'rotational'], k=1000)

In [None]:
# NOTE: These FLI plots are completely random
show_fli_plot(poinc_maps=mock_poinc_map, lbls=mock_lbls)

Pass kwargs arguments to sns.scatterplot

In [None]:
show_fli_plot(poinc_maps=mock_poinc_map, lbls=mock_lbls, legend=False)

Test with a `TSDataChaos` object

In [None]:
mock_tdc = TSDataChaos('foo')
mock_tdc.x = mock_poinc_map
mock_tdc.y = mock_lbls

In [None]:
show_fli_plot(tdc=mock_tdc, legend=None)