In [1]:
#| default_exp utils

In [2]:
#| hide
%load_ext autoreload
%autoreload 2

# Utils

> Utilities used in the rest of the notebooks

In [3]:
#| export
from dvats.imports import *
from fastcore.all import *
import wandb
import pickle
import pandas as pd
import numpy as np
#import tensorflow as tf
import torch.nn as nn
from fastai.basics import *
import time

## Printing

In [4]:
#| export
import sys
import datetime

In [5]:
#| export
def print_flush(
    mssg            : str,
    print_to_path   : bool  = False,
    print_path      : str   = "~/data/logs/logs.txt",
    print_mode      : str   = 'a',
    verbose         : int   = None,
    print_time      : bool  = False,
    print_both      : bool  = False,
    **kwargs        # print args
):
    mssg_ = ""
    if verbose is not None:
        mssg_ += f"[{verbose}] "
    if print_time: 
        now = datetime.datetime.now()
        mssg_ += now.strftime('%Y-%m-%d %H:%M:%S') + f".{now.microsecond // 1000:03d}"
        mssg_ += " | "
    mssg_ += mssg
    if print_to_path:
        print_path = os.path.expanduser(print_path)
        with open(print_path, print_mode) as f:
            print(mssg_, file=f, **kwargs)
    if (not print_to_path) or (print_to_path and print_both):
        if print_both: mssg_ += " | " + print_path
        print(mssg_, **kwargs)
    sys.stdout.flush()

## Generate random time series dataframe

In [6]:
#| export
def generate_TS_df(rows, cols):
    "Generates a dataframe containing a multivariate time series, where each column \
    represents a variable and each row a time point (sample). The timestamp is in the \
    index of the dataframe, and it is created with a even space of 1 second between samples"
    index = np.arange(pd.Timestamp.now(),
                      pd.Timestamp.now() + pd.Timedelta(rows-1, 'seconds'),
                      pd.Timedelta(1, 'seconds'))
    data = np.random.randn(len(index), cols)
    return pd.DataFrame(data, index=index)

In [7]:
#| hide
df = generate_TS_df(3, 5)

In [8]:
#| hide
test_eq(df.shape, (3, 5))

##  pandas Dataframe utilities

### Normalize columns

In [9]:
#| export
def normalize_columns(df:pd.DataFrame):
    "Normalize columns from `df` to have 0 mean and 1 standard deviation"
    mean = df.mean()
    std = df.std() + 1e-7
    return (df-mean)/std

In [10]:
#| hide
foo = generate_TS_df(3, 3)
foo.describe()

Unnamed: 0,0,1,2
count,3.0,3.0,3.0
mean,0.153647,0.053706,-0.232063
std,0.79058,1.939948,2.088105
min,-0.747919,-1.703933,-2.610741
25%,-0.133795,-0.987042,-0.997468
50%,0.480328,-0.270151,0.615804
75%,0.60443,0.932525,0.957275
max,0.728532,2.135201,1.298747


In [11]:
#| hide
bar = normalize_columns(foo)
bar.describe()

Unnamed: 0,0,1,2
count,3.0,3.0,3.0
mean,0.0,1.850372e-17,-1.850372e-17
std,1.0,0.9999999,1.0
min,-1.140385,-0.9060235,-1.139156
25%,-0.363584,-0.5364821,-0.3665548
50%,0.413217,-0.1669407,0.4060464
75%,0.570192,0.4530117,0.569578
max,0.727168,1.072964,0.7331096


In [12]:
#| hide
test_close(bar.describe().loc['mean'].values, np.repeat(0.0, len(bar.columns)))

In [13]:
#| hide
test_close(bar.describe().loc['std'].values, np.repeat(1.0, len(bar.columns)))

### Remove constant columns

In [14]:
#| export
def remove_constant_columns(df:pd.DataFrame):
    return df.loc[:, (df != df.iloc[0]).any()]

In [15]:
#| hide
foo = generate_TS_df(3, 3)
foo['constant'] = [0.0]*len(foo)
foo

Unnamed: 0,0,1,2,constant
2024-11-22 10:22:42.225099,-0.672069,-0.662154,-0.734045,0.0
2024-11-22 10:22:43.225099,-0.702293,-0.344449,0.477274,0.0
2024-11-22 10:22:44.225099,0.562166,0.79614,-0.337126,0.0


In [16]:
#| hide
bar = remove_constant_columns(foo)
bar

Unnamed: 0,0,1,2
2024-11-22 10:22:42.225099,-0.672069,-0.662154,-0.734045
2024-11-22 10:22:43.225099,-0.702293,-0.344449,0.477274
2024-11-22 10:22:44.225099,0.562166,0.79614,-0.337126


In [17]:
#| hide
column_diff = set(foo.columns) - set(bar.columns)
test_eq_type(column_diff, set(['constant']))

## Create wandb artifact containing just the reference to an object pass as argument

In [18]:
#| export
class ReferenceArtifact(wandb.Artifact):
    default_storage_path = Path('data/wandb_artifacts/') # * this path is relative to Path.home()
    "This class is meant to create an artifact with a single reference to an object \
    passed as argument in the contructor. The object will be pickled, hashed and stored \
    in a specified folder."
    @delegates(wandb.Artifact.__init__)
    def __init__(self, obj, name, type='object', folder=None, **kwargs):
        super().__init__(type=type, name=name, **kwargs)
        # pickle dumps the object and then hash it
        hash_code = str(hash(pickle.dumps(obj)))
        folder = Path(ifnone(folder, Path.home()/self.default_storage_path))
        with open(f'{folder}/{hash_code}', 'wb') as f:
            pickle.dump(obj, f)
        self.add_reference(f'file://{folder}/{hash_code}')
        if self.metadata is None:
            self.metadata = dict()
        self.metadata['ref'] = dict()
        self.metadata['ref']['hash'] = hash_code
        self.metadata['ref']['type'] = str(obj.__class__)

In [19]:
#| hide
foo = np.arange(10)
bar = ReferenceArtifact(obj=foo, name='foo', folder='.')
bar_path = Path(f'./{bar.metadata["ref"]["hash"]}')
test_eq(bar_path.exists(), True)
test_eq(bar.metadata['ref']['type'], "<class 'numpy.ndarray'>")

ValueError: Path "file://./-6264641716327866930" must be a valid file or directory path

When a reference artifact is used by one wandb run, we should have a method to get the original object from it

In [None]:
#| export
@patch
def to_obj(self:wandb.apis.public.Artifact):
    """Download the files of a saved ReferenceArtifact and get the referenced object. The artifact must \
    come from a call to `run.use_artifact` with a proper wandb run."""
    if self.metadata.get('ref') is None:
        print_flush(f'ERROR:{self} does not come from a saved ReferenceArtifact')
        return None
    original_path = ReferenceArtifact.default_storage_path/self.metadata['ref']['hash']
    path = original_path if original_path.exists() else Path(self.download()).ls()[0]
    with open(path, 'rb') as f:
        obj = pickle.load(f)
    return obj

Test with Reference artifact from a df

In [None]:
#| hide
foo = generate_TS_df(3, 3)
bar = ReferenceArtifact(obj=foo, name='test_reference_artifact')
bar.manifest.entries.values()

In [None]:
#| hide
test_eq(bar.name, 'test_reference_artifact')

In [None]:
#| hide
test_eq(bar.metadata['ref']['type'], str(type(foo)))

TODO: Test method `to_obj`

ReferenceArtifact with a numpy array

In [None]:
#| hide
foo = np.random.randn(5)
bar = ReferenceArtifact(obj=foo, name='test_reference_artifact')
bar.manifest.entries.values()

In [None]:
#| hide
test_eq(bar.metadata['ref']['type'], str(type(foo)))

In [None]:
#| export
import torch.nn as nn
class PrintLayer(nn.Module):
    def __init__(self):
        super(PrintLayer, self).__init__()

    def forward(self, x):
        # Do your print / debug stuff here
        print_flush(x.shape)
        return x

In [None]:
#| export
@patch
def export_and_get(self:Learner, keep_exported_file=False):
    """
        Export the learner into an auxiliary file, load it and return it back.
    """
    aux_path = Path('aux.pkl')
    self.export(fname='aux.pkl')
    aux_learn = load_learner('aux.pkl')
    if not keep_exported_file: aux_path.unlink()
    return aux_learn

### get_wandb_artifacts

In [None]:
#| export
def get_wandb_artifacts(project_path, type=None, name=None, last_version=True):
    """
        Get the artifacts logged in a wandb project.
        Input:
        - `project_path` (str): entity/project_name
        - `type` (str): whether to return only one type of artifacts
        - `name` (str): Leave none to have all artifact names
        - `last_version`: whether to return only the last version of each artifact or not

        Output: List of artifacts
    """
    public_api = wandb.Api()
    if type is not None:
        types = [public_api.artifact_type(type, project_path)]
    else:
        types = public_api.artifact_types(project_path)

    res = L()
    for kind in types:
        for collection in kind.collections():
            if name is None or name == collection.name:
                versions = public_api.artifact_versions(
                    kind.type,
                    "/".join([kind.entity, kind.project, collection.name]),
                    per_page=1,
                )
                if last_version: res += next(versions)
                else: res += L(versions)
    return list(res)

In [None]:
#| hide
foo = get_wandb_artifacts('wandb/artifacts-example', type='model')
test_eq(len(foo), 2)
foo = get_wandb_artifacts('wandb/artifacts-example', type='model', name='convnet')
test_eq(len(foo), 1)
foo = get_wandb_artifacts('wandb/artifacts-example', type='model', name='convnet', last_version=False)
test_eq(len(foo), 2)

### get_pickle_artifact

In [None]:
#| export
def get_pickle_artifact(filename):

    with open(filename, "rb") as f:
        df = pickle.load(f)
    
    return df

## Exec from feather

In [None]:
#| export
import pyarrow.feather as ft
import pickle

In [None]:
#| export
def exec_with_feather(function, path = None, verbose = 0, *args, **kwargs):
    result = None
    if not (path is None):
        if verbose > 0: print_flush(f"--> Exec with feather | reading input from {path}")
        input = ft.read_feather(path)
        if verbose > 0: print_flush(f"--> Exec with feather | Apply function {path}")
        result = function(input, *args, **kwargs)
        if verbose > 0: print_flush(f"Exec with feather --> ", {path})
    return result

In [None]:
#| export
def py_function(module_name, function_name, verbose = 0):
    try:
        function = getattr(__import__('__main__'), function_name)
    except:
        module = __import__(module_name, fromlist=[''])
        function = getattr(module, function_name)
    print_flush(f"py function: {function_name}: {function}")
    return function

In [None]:
#| hide
def suma(a,b,c): return a+b+c
foo = py_function("main", "suma", True)
print_flush(f"foo: {foo(1,1,1)}")

In [None]:
#| hide
function_name = "prepare_forecasting_data"
module_name = "tsai.data.preparation"
foo = py_function(module_name, function_name, True)
foo

In [None]:
#| export
def exec_with_feather_k_output(
    function_name   : str, 
    module_name     : str   = "main", 
    path            : str   = None, 
    k_output        : int   = 0, 
    verbose         : int   = 0, 
    time_flag       : bool  = False, 
    *args, 
    **kwargs
):
    result = None
    function = py_function(module_name, function_name,verbose)
    if time_flag: t_start = time.time()
    if not (path is None):
        if verbose > 0: 
            print_flush(f"--> Exec with feather | reading input from {path}")
        input = ft.read_feather(path)
        if verbose > 0: print_flush(f"--> Exec with feather | Apply function {path}")
        result = function(input, *args, **kwargs)[k_output]
    if time_flag:
        t_end = time.time()
        print_flush(f"Exec with feather | time: {t_end-t_start}")
    if verbose > 0: print_flush(f"Exec with feather --> {path}")
    return result

In [None]:
#| hide
enc_input = exec_with_feather_k_output(
            function_name = "prepare_forecasting_data",
            module_name   = "tsai.data.preparation",
            path = "/home/macu/data/wandb_artifacts/3967977247651105648",
            k_output        = 0,
            verbose         = 1,
            time_flag       = True,
            fcst_history    = 450
        )
enc_input

In [None]:
#| export
def exec_with_and_feather_k_output(function_name, module_name = "main", path_input = None, path_output = None, k_output = 0, verbose = 0, time_flag = False, *args, **kwargs):
    result = None
    function = py_function(module_name, function_name, verbose-1)
    if time_flag: t_start = time.time()
    if not (path_input is None):
        if verbose > 0: print_flush(f"--> Exec with feather | reading input from {path_input}")
        input = ft.read_feather(path_input)
        if verbose > 0: 
            print_flush(f"--> Exec with feather | Apply function {function_name} input type: {type(input)}")
        
        result = function(input, *args, **kwargs)[k_output]
        ft.write_feather(df, path, compression = 'lz4')
    if time_flag:
        t_end = time.time()
        print_flush(f"Exec with feather | time: {t_end-t_start}")
    if verbose > 0: print_flush(f"Exec with feather --> {path_output}")
    return path_output

## Time handling

In [None]:
#| export 
import time
from dataclasses import dataclass, field

In [None]:
#| export
@dataclass
class Time:
    time_start  : float =  None
    time_end    : float =  None
    time_total  : float =  0.0
    function    : str   =  ''

    def start(self, verbose = 0): 
        if verbose > 0: print_flush(f"--> Start: {self.function}")
        self.time_start = time.time()
        return self.time_start

    def end(self, verbose= 0):
        self.time_end = time.time()
        self.time_total = self.duration()
        if verbose > 0: print_flush(f"End: {self.function} -->")
        return self.time_end
        
    def duration(self):
        self.time_total=self.time_end - self.time_start
        return self.time_total
    def show(self, 
        verbose         : int = None,
        print_to_path   : bool = False,
        print_path      : str  = "~/data/logs/logs.txt",
        print_mode      : str  = 'a'
    ):
        if self.time_start is None: 
            print_flush(f"[{self.function}] Not started", print_to_path = print_to_path, print_path = print_path, print_mode = print_mode, verbose = verbose )
        elif self.time_end is None:
            print_flush(f"[{self.function}] Not ended | Start: ", self.time_start, print_to_path = print_to_path, print_path = print_path, print_mode = print_mode, verbose = verbose )
        else:
            print_flush(f"[{self.function}] Start: {self.time_start} | End: {self.time_end} | Duration: {self.time_total} seconds", print_to_path = print_to_path, print_path = print_path, print_mode = print_mode, verbose = verbose )
        return self.time_total     

In [None]:
#| export
def funcname():
    """Get calling function name"""
    return inspect.stack()[1][3]

In [None]:
#| hide
# Timer basic example
foo = Time()
foo.start()
time.sleep(2) 
foo.end()
foo.show()
def foo(): return funcname()
foo()

## VSCode update path

In [None]:
#| export
#Function for making notebooks clearer
from IPython.display import clear_output, DisplayHandle
def update_patch(self, obj):
    clear_output(wait=True)
    self.display(obj)
    print_flush(f"... Enabling Vs Code execution ...")

In [None]:
#| hide
#from nbdev.export import notebook2script
#notebook2script()
beep(1)

# Styled printing

In [None]:
#| export
from IPython.display import display, HTML

In [None]:
#| export
def styled_print(text, color='black', size='16px', weight='normal'):
    html_text = f"<span style='color: {color}; font-size: {size}; font-weight: {weight};'>{text}</span>"
    display(HTML(html_text))

# Time Series

## Plots

In [None]:
#| export
def show_sequence(
    data         : List[ List [ float ] ] = None, 
    hide_rows    : bool = False, 
    hide_columns : bool = True
):
    """
    Show the sequence in a nice format similar to stumpy tutorials
    """
    df          = pd.DataFrame(data)
    styled_df   = df.style
    if hide_rows: 
        styled_df = styled_df.hide(axis='index')
    if hide_columns: 
        styled_df = styled_df.hide(axis='columns')
    styled_df = styled_df.set_table_styles([
        {'selector': '',
         'props': [('border', '2px solid black'),
                   ('text-align', 'center'),
                   ('font-family', 'Arial'),
                   ('border-collapse', 'collapse')]},
        {'selector': 'td',
         'props': [('border', '1px solid black'),
                   ('padding', '5px')]}
    ])
    display(styled_df)

In [None]:
#| export
def plot_with_dots(
    time_series             : List[float]    = None,
    xlabel                  : str            = 'Index (time)',
    ylabel                  : str            = 'Value',
    title                   : str            = 'Time series',
    sequence_flag           : bool           = True,
    show_sequence_before    : bool           = True, 
    hide_rows               : bool           = True,
    hide_columns            : bool           = False,
    show_title              : bool           = True,
    fontsize                : int            = 10,
    save_plot               : bool           = False,
    dots                    : bool           = True,
    figsize                 : Tuple[int, int]= (10, 6),
    plot_path               : str            = "./",
    plot_name               : str            = "",
    plot_format             : str            = "svg",
    plot_resolution         : int            = 1
  ) -> None:
    if sequence_flag and show_sequence_before: 
        show_sequence([time_series], hide_rows, hide_columns)
    n = len(time_series)
    x_coords = range(n)
    
    plt.figure(figsize=figsize)  # Crear la figura con el tamaño especificado
    
    if dots: 
        plt.plot(x_coords, time_series)
        plt.scatter(x_coords, time_series, color='red')
    else:
        plt.plot(x_coords, time_series, linestyle='-')
        
    if show_title: 
        plt.title(title, fontsize=fontsize)
    plt.xlabel(xlabel)
    plt.ylabel(ylabel)
    if save_plot:
        plot_path = os.path.expanduser(plot_path)
        if plot_name == "":
            plot_name = title
        plot_path = os.path.join(plot_path, f'{plot_name}.{plot_format}')
        plt.savefig(plot_path, format = plot_format)
    plt.show()
    if sequence_flag and not show_sequence_before:
        show_sequence([time_series], hide_rows, hide_columns)
    return None


In [None]:
#| hide
# Example following Stumpy's 13-length case
plt.close('all')
foo_data = np.array([0, 1, 3, 2, 9, 1, 14, 15, 1, 2, 2, 10, 7])
foo_title = title = "Example 1: Time series of length 13"
show_sequence([foo_data], hide_rows = True, hide_columns = False)
plot_with_dots(
    time_series             = foo_data,
    title                   = foo_title,
    sequence_flag           = False,
    fontsize                = 20,
    figsize                 = (10,3)
) 

## Downsampling

### Piecewise Aggregate Approximation

In [None]:
#| export
## -- Classes & types
from dataclasses import dataclass, field
from typing import List, Optional, Tuple, Callable

In [None]:
#| export
from sklearn.base import TransformerMixin, BaseEstimator
from sklearn.pipeline import Pipeline

In [None]:
#| export
@dataclass
class Interpolator(BaseEstimator, TransformerMixin):
    method            : str  ='linear'
    n_segments        : int  = 1
    plot_original_data: bool = False
    plot_interpolated : bool = False
    verbose           : int  = 0
    
    def fit(self, X, y=None):
        return self

    def transform(self, X):
                
        if X.ndim == 1:
            X = X.reshape(1, -1)
        
        if self.plot_original_data:
            if self.verbose > 0: print_flush(f"Interpolator | Plot original data")
            for dim in range (X.ndim-1):
                if self.verbose > 1: print_flush(f"Interpolator | Plot original data dimension {dim}")
                plot_with_dots(
                    X[dim], 
                    sequence_flag = False, 
                    title = f'Original data | dim {dim}'
                )
                
        n_samples, n_features = X.shape
        if n_features % self.n_segments != 0 or n_features == self.n_segments:
            raise ValueError(
                f"The number of segments {self.n_segments} must divide (and be different of) the number of features {n_features} | Reminder: {n_features // self.n_segments}"
            )

        segment_size = n_features // self.n_segments
        interpolated_result = np.full_like(X, np.nan)

        if self.verbose > 0: print_flush(f"NFeatures: {n_features} | NSegments: {self.n_segments} | segment_size: {segment_size} | interpolated result ~ {interpolated_result.shape}")
        
        for i in np.arange(self.n_segments):
            start = i * segment_size 
            end = start + segment_size
            segment_mean = np.nanmean(X[:, start:end], axis=1)
            for j in np.arange(n_samples):
                nan_mask = np.isnan(X[j, start:end])
                interpolated_result[j, start:end][nan_mask] = segment_mean[j]
        res = np.where(np.isnan(X), interpolated_result, X)
        if self.plot_interpolated:
            for dim in range (X.ndim-1):
                plot_with_dots(
                    res[dim], 
                    sequence_flag = False, 
                    title = f'Interpolated data | dim {dim}'
                )
            
        return res

In [None]:
foo_data = np.array([1.0, 2.0, np.nan, 4.0, 5.0, np.nan, 7.0, 8.0])

foo_inter = Interpolator(
            method='polynomial', 
            n_segments         = 4,
            plot_interpolated  = True,
            plot_original_data = False,
            verbose            = 2
        )

foo = foo_inter.fit_transform(foo_data)[0]

In [None]:
#| export
@dataclass
class PAATransformer(BaseEstimator, TransformerMixin):
    n_segments       : int  = 1
    plot_aggregated  : bool = True
    verbose          : int  = 0

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        n_samples, n_features = X.shape
        if n_features <= self.n_segments:
            raise ValueError(f"The number of segments ({self.n_segments}) must be lower than the number of points ({n_features})")

        segment_size = n_features // ( self.n_segments + 1)
        remainder = n_features % ( self.n_segments + 1)

        if self.verbose > 0: 
            print_flush(f"NFeatures: {n_features} | NSegments: {self.n_segments} | Segment size: {segment_size} | Reminder: {remainder}")

        # Crear un array para los resultados
        result = np.zeros((n_samples, self.n_segments + 1))

        if self.verbose > 1: print_flush(f"Result ~ {result.shape}")

        # Procesar cada segmento
        for i in range(self.n_segments+1):
            start = i * segment_size + min(i, remainder)
            end = start + segment_size + (1 if i < remainder else 0)
            result[:, i] = np.mean(X[:, start:end], axis=1)

        if self.plot_aggregated:
            for dim in range (X.ndim-1):
                if self.verbose > 1:
                    print_flush(f"Plos res | Dim", {dim}, verbose = verbose)
                plot_with_dots(
                    result[dim], 
                    sequence_flag = False, 
                    title = f'Aggregated data | dim {dim}',
                    fontsize = 20,
                    save_plot = True
                )

        return result


In [None]:
#| hide
foo_data = np.array([1.0, 2.0, np.nan, 4.0, 5.0, np.nan, 7.0, 8.0])
foo_paa_pipeline = Pipeline([
    (
        # Step for interpolating NaNs in the original data
        'interpolator', 
        Interpolator(
            method='polynomial', 
            n_segments         = 4, 
            plot_interpolated  = True,
            plot_original_data = False,
            verbose            = 2
        )
    ),
    (
        # Step for applying Peicewise Aggregated Approximation
        'paa', PAATransformer(
            n_segments      = 3, 
            plot_aggregated = True, 
            verbose         = 2
        )
    )
])


foo = foo_paa_pipeline.fit_transform(foo_data)[0]

In [None]:
#| export
# Errors definitions
class DownsampleError(Exception):
    """Exception raised for errors in the downsample process."""
    def __init__(self, message="Invalid number of min/max points for the proposed time series. You must allow cropping and check the final length"):
        self.message = message
        super().__init__(self.message)
class DivisorsError(Exception):
    def __init__(self, message = "Invalid parameters"):
        self.message = message
        super().__init__(self.message)

In [None]:
#| export
def divisors(
    N : int, 
    min_val:int, 
    max_val:int, 
    verbose = 0
) -> List [ int ] : 
    if verbose > 0: print_flush(f"--> divisors | verbose: {verbose}", verbose = verbose)
    if verbose > 0: 
        print_flush(f"Looking for the divisors of {N} between {min_val} and {max_val}")
    if (N < 0 or min_val < 0):
        mssg = f"N, min_val, max_val {N}, {min_val}, {max_val} must be a positive integer (>0)"
        raise DivisorsError(mssg)
    elif ( min_val > max_val):
        mssg = f"min_val > max_val ({min_val} > {max_val}). Please take a look"
        raise DivisorsError(mssg)
    arr = np.arange(min_val,max_val+1)
    arr = arr[ N % arr == 0]
    if verbose > 0: print_flush(f"Found {len(arr)} divisors of {N} between {min_val} and {max_val}")
    return arr

def downsample_propose_crop_(
    N            : int, 
    min_points   : int, 
    max_points   : int, 
    verbose      : int  = 0,
    allow_crop   : bool = True,
    nearest_val  : bool = False,
    potential_val: int = 1
) -> int:
    if verbose > 0: 
        print_flush(f"Verbose: {verbose}")
        print_flush(f"Downsample Propose Crop | Prev N: {N}")
    all_divisors = divisors(
        N       = N, 
        min_val = min_points, 
        max_val = max_points,
        verbose = verbose-1
    )
    val = 0
    if len(all_divisors) == 0:
        if ( not nearest_val or potential_val < 1):
            raise ValueError("No valid divisors found for the given N within the min and max points range.")
    else:
        if ( nearest_val and potential_val > 0):
                val = min(all_divisors, key=lambda x: abs(x - potential_val))
        elif (divisors_flag):
            val = divisors(
                N       = N, 
                min_val = min_points, 
                max_val = max_points, 
                verbose = verbose-1
            )[-1]
    
    if (allow_crop):
        while (val < min_points and N > min_points): 
            N = N-1
            all_divisors = divisors(
                N       = N, 
                min_val = min_points, 
                max_val = max_points, 
                verbose = verbose-1
            )
            if len(all_divisors) > 0:
                if ( nearest_val and potential_val > 0):
                    val = min(all_divisors, key=lambda x: abs(x - potential_val))
    else: 
        raise DownsampleError()
        return -1
    if verbose > 0: print_flush(f"Downsample Propose Crop | Post N: {N} | Largest Divisor: {val}")
    return (val, N)

In [None]:
#| hide
#print_flush(downsample_propose_crop_(7397222, 10000, 20000, 1, False))
print_flush(downsample_propose_crop_(
    N = 7397222, 
    min_points = 10000, 
    max_points = 20000, 
    verbose = 2, 
    allow_crop = True, 
    nearest_val = True, 
    potential_val = 14500
))

In [None]:
#| export
def downsample(
    data  : List [ float ] = None,
    min_position : int  = 0,
    max_position : int  = -1, 
    min_points   : int  = 1,
    max_points   : int  = 10000,
    verbose      : int  = 1,
    show_plots   : bool = False,
    allow_crop   : bool = True
) -> Tuple [ List [ float ], float ]:  
    if max_points >= data.shape[0]: return data, 1
    if verbose > 1: print_flush(f"[ Downsample | Position ] Before | Pos ({min_position}, {max_position})")
    min_position = min_position if min_position > 0 else 0
    max_position = max_position if ( max_position > -1 and max_position < data.shape[0]) else data.shape[0]
    if verbose > 1: print_flush(f"[ Downsample | Position ] After | Pos ({min_position}, {max_position})")
    
    n_timestamps = max_position - min_position
    paa_factor   = np.maximum(1, n_timestamps // max_points)

    min_points   = max(1,min(min_points, data.shape[0]))
    max_points   = min(data.shape[0], min(max_points, max_position-min_position))

    if verbose > 1:
        print_flush(f"[ Downsample | downsample_propose_crop ] Max points: {max_points}")
        print_flush(f"[ Downsample | downsample_propose_crop ] Min points: {min_points}")
    
    
    min_points   = min(min_points, max_points)
    
    if verbose > 1:
        print_flush(f"[ Downsample | downsample_propose_crop ] N timestamps {n_timestamps}")
        print_flush(f"[ Downsample | downsample_propose_crop ] PAA factor: {paa_factor}")
        
        print_flush(f"[ Downsample | downsample_propose_crop ] allow_crop: {allow_crop}")

    potential_segments = np.floor(n_timestamps / paa_factor).astype(int)
    
    N = max_position-min_position
    
    if verbose > 1:
        print_flush(f"[ Downsample | downsample_propose_crop ] N: {N}")
        print_flush(f"[ Downsample | downsample_propose_crop ] potential_segments: {potential_segments}")
        
    n_segments, N = downsample_propose_crop_(
        N             = N, 
        min_points    = min_points,
        max_points    = max_points,
        verbose       = verbose-1,
        allow_crop    = allow_crop,
        nearest_val   = allow_crop, # If allow_crop, try to get as near of potential_segment as possible
        potential_val = potential_segments # The most desired one 
    ) 

    if allow_crop: 
        if verbose > 1: print_flush(f"[ Downsample | downsample_propose_crop ] Allow crop => change n_timestamp | Before {n_timestamps}")
        max_position = min_position + N
        if verbose > 1: print_flush(f"[ Downsample | downsample_propose_crop ] Allow crop => change n_timestamp | After {n_timestamps}")
    
    data = data[min_position:max_position]
    n_timestamps = data.shape[0]

    if verbose > 0: 
        print_flush(f"[ Downsample | downsample_propose_crop --> ] | N segments: {n_segments} | Data ~ {data.shape}")
        print_flush(f"[ Downsample | downsample_propose_crop --> ] | N = {N} | n_timestamps = {n_timestamps} | min_position {min_position} | max_position {max_position}")

    if n_timestamps < max_points: 
        if verbose > 0: 
            print_flush(f"[ Downsample ] n_timestamps {n_timestamps} < max_points {max_points}")
        return data, 1
        
    #| export
    paa_pipeline = Pipeline([
        (
            # Step for interpolating NaNs in the original data
            'interpolator', 
            Interpolator(
                method             = 'polynomial', 
                n_segments         = n_segments, 
                plot_original_data = show_plots,
                plot_interpolated  = show_plots
            )
        ),
        (
            # Step for applying Peicewise Aggregated Approximation
            'paa', PAATransformer(
                n_segments      = n_segments, 
                plot_aggregated = show_plots
            )
        )
    ])

    ts_paa = paa_pipeline.fit_transform(data[min_position:max_position])[0]
    if verbose > 0: 
        print_flush(f"Downsample | ts_paa~{len(ts_paa)}")
        print_flush(f"Downsample ------------------------>")
    return ts_paa, paa_factor


In [None]:
#| hide
foo_data = np.array([1.0, 2.0, np.nan, 4.0, 5.0, np.nan, 7.0, 8.0])
foo_data_2 = downsample(foo_data, min_points = 3, max_points = 5, verbose = 5, show_plots = True)
print_flush(foo_data_2)

In [None]:
#| hide
# Testing failed case
foo = np.random.rand(7397222)
downsample(foo, min_points = 10000, max_points = 20000, verbose = 5, show_plots = True)



## Sizes

### Automatic sequence length selection

#### Following ClaSP example: best sequence length
In a similar way to ClaSP algorithm, our algorithms take the window size, w as hyper-parameter (see https://github.com/aeon-toolkit/aeon/blob/main/examples/segmentation/segmentation_with_clasp.ipynb). A simple method for choosing the window size is the dominant frequency of the Fourier Transform.

In [None]:
from aeon.segmentation._clasp import ClaSPSegmenter, find_dominant_window_sizes
from aeon.datasets import load_electric_devices_segmentation
from aeon.visualisation import plot_series_with_change_points, plot_series_with_profiles

In [None]:
#| hide
? aeon.segmentation._clasp.find_dominant_window_sizes

In [None]:
#| hide
ts, period_size, true_cps = load_electric_devices_segmentation()
_ = plot_series_with_change_points(ts, true_cps, title="Electric Devices")
dominant_period_size = find_dominant_window_sizes(ts)
print_flush(f"Dominant Period {dominant_period_size}")
#| hide
clasp = ClaSPSegmenter(period_length=dominant_period_size, n_cps=5)
found_cps = clasp.fit_predict(ts)
profiles = clasp.profiles
scores = clasp.scores

_ = plot_series_with_profiles(
    ts,
    profiles,
    true_cps=true_cps,
    found_cps=found_cps,
    title="ElectricDevices",
)

### Best nsizes sequence lengths

In [None]:
#| export
def find_dominant_window_sizes_list_single_old(
        X            : List [ float ],
        nsizes       : int  = 1,
        offset       : float= 0.05, 
        min_distance : int  = 1,
        verbose      : int  = 0
    ) -> List [ int ]:

    if verbose > 0: print_flush(f"---> Find_dominant_window_sizes_list")
    if verbose > 1:
        print_flush( f"Find_dominant_window_sizes_list | X ~ {X.shape}" )
        print_flush( f"Find_dominant_window_sizes_list | Looking for - at most - the best {nsizes} window sizes")
        print_flush( f"Find_dominant_window_sizes_list | Offset {offset} max size: {offset*len(X)}")
    if verbose > 0: print_flush( "Find_dominant_window_sizes_list | --> Freqs")
        
    X = np.array(X)
    
    fourier = np.absolute(np.fft.fft(X))   
    freqs = np.fft.fftfreq(X.shape[0], 1)
    
    if verbose > 1: 
        print_flush( f"Find_dominant_window_sizes_list | Freqs {freqs} -->")
        print_flush( f"Find_dominant_window_sizes_list | coefs {fourier} -->")
    if verbose > 0: print_flush( f"Find_dominant_window_sizes_list | Freqs -->")

    coefs = []
    window_sizes = []

    for coef, freq in zip(fourier, freqs):
        if coef and freq > 0:
            coefs.append(coef)
            window_sizes.append(1 / freq)

    coefs = np.array(coefs)
    window_sizes = np.asarray(window_sizes, dtype=np.int64)
    
    if verbose > 0: 
        print_flush( "Find_dominant_window_sizes_list | Coefs and window_sizes -->")
        print_flush( "Find_dominant_window_sizes_list | --> Find and return valid window_sizes")

    idx = np.argsort(coefs)[::-1]
    
    if verbose > 1: 
        print_flush( f"Find_dominant_window_sizes_list | Find and return valid window_sizes | ... 0 ... {idx}")
        
    sorted_window_sizes = window_sizes[idx]
    
    if verbose > 1: 
        print_flush( "Find_dominant_window_sizes_list | Find and return valid window_sizes | ... 1 ...")

    # Find and return all valid window sizes
    valid_window_sizes = [
        int(window_size / 2) for window_size in sorted_window_sizes
        #if 20 <= window_size < int(X.shape[0] * offset)
        if 20 <= window_size < int(len(X) * offset)
    ]
    
    if verbose > 1: 
        print_flush( "Find_dominant_window_sizes_list | Find and return valid window_sizes | ... 2 ...")

    # If no valid window sizes are found, return the first from sorted list
    if not valid_window_sizes:
        if verbose > 1: print_flush( f"Find_dominant_window_sizes_list | Find and return valid window_sizes | ... 2a ... {nsizes}")
        sizes = [sorted_window_sizes[0] // 2][:nsizes]
    else:
        if verbose > 1: print_flush( f"Find_dominant_window_sizes_list | Find and return valid window_sizes | ... 2b ... {nsizes}")
        sizes = valid_window_sizes[:nsizes]
        
    if verbose > 0: 
        print_flush( "Find_dominant_window_sizes_list | Find and return valid window_sizes -->")
    if verbose > 1:
        print_flush(f"Find_dominant_window_sizes_list | Sizes: {sizes}")
    if verbose > 0:
        print_flush( "Find dominant_window_sizes_list --->" )
    
    return sizes

In [None]:
#| export 
def select_separated_sizes(
    xs : List [ int ],
    min_distance : int = 1,
    nsizes          : int = 1
) -> List [ int ]:
    ys = []
    for window_size in xs:
        if not ys or abs(window_size - ys[-1]) >= min_distance:
            ys.append(window_size)
        if len(ys) == nsizes:
            break
    return ys

In [None]:
#| hide 
xs = [0, 1, 3, 5, 4, 14, 23, 10, 6, 9, 13, 18, 16, 11, 15, 19, 8, 2, 22, 26, 7, 28, 12, 24, 17, 38, 34, 25, 30, 20, 54, 49, 27, 31, 55, 39, 128, 52, 46, 36, 53, 21, 68, 50, 89, 56, 41, 117, 66, 67, 40, 60, 112, 96, 114, 87, 65, 88, 81, 64, 75, 57, 235, 135, 43, 58, 59, 86, 63, 70, 177, 62, 125, 93, 32, 162, 172, 143, 84, 110, 95, 100, 77, 35, 150, 161, 48, 51, 82, 164, 192, 29, 101, 42, 91, 73, 78, 215, 156, 230, 108, 45, 214, 248, 165, 145, 115, 90, 136, 80, 104, 160, 208, 94, 216, 140, 263, 85, 159, 126, 242, 148, 151, 218, 79, 168, 131, 270, 223, 265, 179, 271, 232, 186, 207, 111, 71, 191, 266, 204, 132, 250, 174, 129, 37, 210, 205, 166, 121, 189, 154, 201, 139, 190, 74, 175, 141, 118, 196, 197, 228, 123, 113, 149, 147, 217, 102, 182, 134, 173, 33, 240, 259, 105, 269, 171, 130, 243, 76, 195, 72, 245, 167, 198, 47, 97, 120, 137, 241, 153, 98, 219, 116, 187, 267, 238, 220, 124, 251, 260, 92, 106, 188, 169, 253, 133, 170, 44, 142, 194, 61, 213, 99, 193, 212, 200, 203, 234, 256, 262, 236, 69, 246, 225, 264, 222, 273, 122, 158, 258, 202, 119, 247, 244, 229, 231, 183, 233, 261, 181, 227, 224, 127, 138, 144, 226, 254, 103, 176, 237, 157, 155, 146, 83, 199, 255, 107, 252, 221, 180, 209, 184, 268, 257, 178, 206, 211, 185, 239, 109, 163, 249, 272, 152]
select_separated_sizes(xs, 5, 3)

In [None]:
#| export
def find_dominant_window_sizes_list_single(
        X            : List[float],
        nsizes       : int               = 1,
        offset       : float             = 0.05, 
        min_distance : int               = 1,    # Asegurar distancia mínima entre tamaños
        # Print options
        verbose      : int               = 0,
        print_to_path                   : bool          = False,
        print_path                      : str           = "~/data/logs/logs.txt",
        print_mode                      : str           = 'a'
    ) -> List[int]:

    if verbose > 0: print_flush( "---> Find_dominant_window_sizes_list", print_to_path = print_to_path, print_path = print_path, print_mode = print_mode, verbose = verbose )
    if verbose > 1:
        print_flush( f"Find_dominant_window_sizes_list | X ~ {X.shape}", print_to_path = print_to_path, print_path = print_path, print_mode = print_mode, verbose = verbose )
        print_flush( f"Find_dominant_window_sizes_list | Looking for - at most - the best {nsizes} window sizes", print_to_path = print_to_path, print_path = print_path, print_mode = print_mode, verbose = verbose )
        print_flush( f"Find_dominant_window_sizes_list | Offset {offset} max size: {offset*len(X)}", print_to_path = print_to_path, print_path = print_path, print_mode = print_mode, verbose = verbose )
    if verbose > 0: print_flush( "Find_dominant_window_sizes_list | --> Freqs", print_to_path = print_to_path, print_path = print_path, print_mode = print_mode, verbose = verbose )
        
    X = np.array(X)
    
    fourier = np.absolute(np.fft.fft(X))   
    freqs = np.fft.fftfreq(X.shape[0], 1)
    
    if verbose > 2: 
        print_flush( f"Find_dominant_window_sizes_list | Freqs {freqs} -->", print_to_path = print_to_path, print_path = print_path, print_mode = print_mode, verbose = verbose )
        print_flush( f"Find_dominant_window_sizes_list | coefs {fourier} -->", print_to_path = print_to_path, print_path = print_path, print_mode = print_mode, verbose = verbose )
    if verbose > 0: print_flush( f"Find_dominant_window_sizes_list | Freqs -->", print_to_path = print_to_path, print_path = print_path, print_mode = print_mode, verbose = verbose )

    coefs = []
    window_sizes = []

    for coef, freq in zip(fourier, freqs):
        if coef and freq > 0:
            coefs.append(coef)
            window_sizes.append(1 / freq)

    coefs = np.array(coefs)
    window_sizes = np.asarray(window_sizes, dtype=np.int64)
    
    if verbose > 0: 
        print_flush( "Find_dominant_window_sizes_list | Coefs and window_sizes -->", print_to_path = print_to_path, print_path = print_path, print_mode = print_mode, verbose = verbose )
        print_flush( "Find_dominant_window_sizes_list | --> Find and return valid window_sizes", print_to_path = print_to_path, print_path = print_path, print_mode = print_mode, verbose = verbose )

    idx = np.argsort(coefs)[::-1]
    
    if verbose > 1: 
        print_flush(f"Find_dominant_window_sizes_list | Find and return valid window_sizes | ... 0 ... {idx}", print_to_path = print_to_path, print_path = print_path, print_mode = print_mode, verbose = verbose )
        
    sorted_window_sizes = window_sizes[idx]
    
    if verbose > 1: 
        print_flush(f"Find_dominant_window_sizes_list | Find and return valid window_sizes | ... 1 ...", print_to_path = print_to_path, print_path = print_path, print_mode = print_mode, verbose = verbose )

    # Find and return all valid window sizes
    valid_window_sizes = [
        int(window_size) for window_size in sorted_window_sizes
        if window_size < int(len(X) * offset)
    ]
    
    if verbose > 1: 
        print_flush( "Find_dominant_window_sizes_list | Find and return valid window_sizes | ... 2 ...", print_to_path = print_to_path, print_path = print_path, print_mode = print_mode, verbose = verbose )

    # Ensure sizes separated at least at "min_distance" 
    sizes = select_separated_sizes(valid_window_sizes, min_distance, nsizes)

    # If no valid window sizes are found, return the first from sorted list
    if not sizes:
        if verbose > 1: print_flush(f"Find_dominant_window_sizes_list | Find and return valid window_sizes | ... 2a ... {nsizes}", print_to_path = print_to_path, print_path = print_path, print_mode = print_mode, verbose = verbose )
        sizes = sorted_window_sizes[0][:nsizes]
    else:
        if verbose > 1: print_flush(f"Find_dominant_window_sizes_list | Find and return valid window_sizes | ... 2b ... {nsizes}", print_to_path = print_to_path, print_path = print_path, print_mode = print_mode, verbose = verbose )

    if verbose > 0: 
        print_flush(f"Find_dominant_window_sizes_list | Find and return valid window_sizes -->", print_to_path = print_to_path, print_path = print_path, print_mode = print_mode, verbose = verbose )
    if verbose > 1:
        print_flush(f"Find_dominant_window_sizes_list | Sizes: {sizes}", print_to_path = print_to_path, print_path = print_path, print_mode = print_mode, verbose = verbose )
    if verbose > 0:
        print_flush( "Find dominant_window_sizes_list --->", print_to_path = print_to_path, print_path = print_path, print_mode = print_mode, verbose = verbose )
    return sizes


In [None]:
#| hide
steam_df = pd.read_csv("https://zenodo.org/record/4273921/files/STUMPY_Basics_steamgen.csv?download=1")
steam_df.head()

In [None]:
#| hide
foo = steam_df['steam flow']
dominant_period_size = find_dominant_window_sizes(
    foo, 
    offset = 0.05
)
print_flush(f"Dominant Period {dominant_period_size}")
dominant_period_sizes = find_dominant_window_sizes_list(
    foo, 
    nsizes = 5, 
    offset = 0.05,
    verbose = 1
)
print_flush(f"Dominant Period Sizes {dominant_period_sizes}")
dominant_period_sizes = find_dominant_window_sizes_list(
    foo, 
    nsizes = 5, 
    offset = 0.6,
    verbose = 1
)
print_flush(f"Dominant Period Sizes {dominant_period_sizes}")

In [None]:
#| export
def group_similar_sizes(vars_sizes, nsizes, tolerance=2):
    """
    Selects the best window sizes across multiple variables,
    ensuring no repetitions and that the sizes are sufficiently close.
    """
    indices = [0] * len(vars_sizes)  # Indices for each variable
    selected_sizes = []  # Selected window sizes

    while len(selected_sizes) < nsizes:
        # Get the smallest available size across all variables
        current_sizes = [vars_sizes[i][indices[i]] for i in range(len(vars_sizes)) if indices[i] < len(vars_sizes[i])]
        min_size = min(current_sizes)

        # Select sizes close to the minimum and avoid duplicates
        for i in range(len(vars_sizes)):
            if indices[i] < len(vars_sizes[i]) and abs(vars_sizes[i][indices[i]] - min_size) <= tolerance:
                if vars_sizes[i][indices[i]] not in selected_sizes:  # Avoid duplicates
                    selected_sizes.append(vars_sizes[i][indices[i]])
                indices[i] += 1  # Move to the next size for that variable

                if len(selected_sizes) >= nsizes:
                    break

        # End if no more sizes are left in any variable
        if all(idx >= len(vars_sizes[i]) for i, idx in enumerate(indices)):
            break

    # Remove duplicates from the selected sizes and return the first nsizes
    selected_sizes = list(dict.fromkeys(selected_sizes))  # Remove duplicates
    return selected_sizes[:nsizes]



In [None]:
#| export
def find_dominant_window_sizes_list(
        X,
        nsizes          : int   = 1,
        offset          : float = 0.05, 
        verbose         : int   = 0,
        min_distance    : int   = 1,
        #- Printing options for debugging
        print_to_path   : bool  = False,
        print_path      : str   = "~/data/logs/logs.txt",
        print_mode      : str   = 'a'
    ) -> List [ int ]:

    if verbose > 0:
        print_flush( f"---> Find_dominant_window_sizes_list", print_to_path = print_to_path, print_path = print_path, print_mode = print_mode, verbose = verbose, print_time = print_to_path)
    
    if len(X.shape) == 1: 
        sizes = find_dominant_window_sizes_list_single(X,nsizes,offset, min_distance, verbose, print_to_path = print_to_path, print_path = print_path, print_mode = 'a')
    else: 
        if ( isinstance(X, pd.DataFrame ) ): X = X.values
        if verbose > 0: print_flush( f"Find_dominant_window_sizes_list | X ~ {X.shape}", print_to_path = print_to_path, print_path = print_path, print_mode = 'a', verbose = verbose, print_time = print_to_path)
        vars_sizes = []
        for var in range( X.shape[1] ):
            if verbose > 1: print_flush( f"Find_dominant_window_sizes_list | Get sizes for var {var}", print_to_path = print_to_path, print_path = print_path, print_mode = print_mode, verbose = verbose )
            var_sizes = find_dominant_window_sizes_list_single(X[:, var], nsizes, offset, min_distance, verbose, print_to_path = print_to_path, print_path = print_path, print_mode = 'a')
            vars_sizes.append(var_sizes)
            if verbose > 1: 
                print_flush( f"Find_dominant_window_sizes_list | Get sizes for var {var} | {var_sizes}", print_to_path = print_to_path, print_path = print_path, print_mode = 'a', verbose = verbose, print_time = print_to_path)
        if verbose > 0: print_flush( f"Find_dominant_window_sizes_list | Grouping sizes", print_to_path = print_to_path, print_path = print_path, print_mode = print_mode, verbose = verbose )
        sizes = group_similar_sizes(vars_sizes, nsizes, tolerance = 2)
        if verbose > 1:
            print_flush(f"find_dominant_window_sizes_list | Final selected window sizes: {sizes}", print_to_path = print_to_path, print_path = print_path, print_mode = 'a', verbose = verbose, print_time = print_to_path)
    if verbose > 0: print_flush( f"Find_dominant_window_sizes_list -->", print_to_path = print_to_path, print_path = print_path, print_mode = print_mode, verbose = verbose )
    return sizes