# Testing the rolling apply method

In [2]:
try:
    import pandopt as pdo
except:
    import sys, os
    sys.path.append('/'.join(os.getcwd().split('/')[:-2]))
    import pandopt as pdo
import pandas as pd
import numpy as np
import tqdm 
import pandas as pd
import numpy as np
import timeit
import functools
import time
import plotly.express as px
from typing import Callable, Dict
import polars as pl
from IPython.display import display


def agg_sum(z):
    return np.sum(z)

def agg_mean(z):
    return np.mean(z)

def agg_max(z):
    return np.max(z)

def agg_min(z):
    return np.min(z)

def agg_std(z):
    return np.std(z)

def agg_sum(z):
    return np.sum(z)

def agg_custom(z):
    # NOT HANDLED YET
    print(z.shape)
    if z['A',-1] > z['B',0]:
        return np.median(z['C']) - np.std(np.log(z))
    return np.mean(z["C"]) / 2 / np.std(z)



## Basic test and work setup


In [3]:
def timeit(method_func, df, method_name, check_metric):
    if method_func is False:
        print(f'{method_name:<15} {'NA':<15} {'NA':<15} {'Discarded':<10}')
        return 
    t1 = time.time()
    result = method_func(df)
    result_np = result.to_numpy() if hasattr(result, 'to_numpy') else result
    t2 = time.time()
    execution_time = t2 - t1

    if isinstance(result, pd.Series):
        result_shape_str = f"{result.shape[0]} x 1"  
    else:
        result_shape_str = " x ".join(map(str, result.shape)) if hasattr(result, 'shape') else "N/A"
    
    metric = check_metric(result.to_numpy() if hasattr(result, 'to_numpy') else result)
    try:
        print(f'{method_name:<15} {result_shape_str:<15} {metric:<15.5f} {execution_time:<10.5f}')
    except:
        print(f'{method_name:<15} {result_shape_str:<15} {metric:<15} {execution_time:<10}')
    return execution_time

def format_number(num):
    """
    Convert a numerical value into a human-readable format,
    adding suffixes like K, M, B, etc.
    """
    for unit in ['', 'K', 'M', 'B', 'T']:
        if abs(num) < 1000:
            return f"{num:3.0f}{unit}"
        num /= 1000.0
    return f"{num:.1f}T"


def run_compare(methods: Dict[str, Callable], x_seconds: float = 1.5, start_ten_exponent: int = 1, check_metric: Callable = lambda x: (1 + np.abs(np.max(x) - np.min(x))) / np.median(x)):
    execution_times = {key: {} for key in methods}
    ten_exponent = start_ten_exponent
    while ten_exponent < 10: # Over than 9 become more SSD/RAM  - lazy loading options tes
        df_size = int(10**ten_exponent)
        formatted_size = format_number(df_size)
    
        print(f'\ntesting {format_number(df_size)} rows')
        print(f'{"method_name":<15} {"result shape":<15} {"compare metric":<15} {"execution_time":<10}')
    
        pandas_df = pd.DataFrame(np.random.randn(df_size, 4), columns=['A', 'B', 'C', 'D']).astype(np.float32)
    
        iteration_discards = []
        for method_name, method_func in methods.items():
            #Last is to avoid removing pandaopt at first iter due to compilation time
            if (e_time:=timeit(method_func, pandas_df, method_name, check_metric)) and e_time > x_seconds and ten_exponent > start_ten_exponent: 
                iteration_discards.append(method_name)
            execution_times[method_name][df_size] = e_time
                
        for method_name in iteration_discards:
            methods[method_name] = False
        
        # Break the loop if all methods are over the time limit
        if not methods:
            print("All methods exceeded the time limit. Ending tests.")
            break
        ten_exponent += 1
    
    results = pd.DataFrame(execution_times)
    results_long = results.reset_index().melt(id_vars=['index'], var_name='Method', value_name='Execution Time')
    results_long.rename(columns={'index': 'Rows'}, inplace=True)
    fig = px.line(results_long, x='Rows', y='Execution Time', color='Method', log_x=True, log_y=True, title='Execution Times by Method')
    fig.show()
    
    display(results)
    return results

## Comparison with rolling sum

In [None]:
# Define test methods
def pandas_apply(df):
    return df.rolling(25).apply(agg_sum)

def pandas_sum(df):
    return df.rolling(25).sum()

def pandopt_apply(df):
    return pdo.DataFrame(df).rolling(25).apply(agg_sum)

# No Polars nor Numpy as there is no direct implementation for this (to my knowledge, havent look long)

methods = {
    'pandas_apply': pandas_apply,
    'pandas_sum': pandas_sum,
    'pandopt_apply': pandopt_apply, 
}

run_compare(methods, x_seconds = 1.5, start_ten_exponent = 2)


testing 100 rows
method_name     result shape    compare metric  execution_time


## Rolling apply functionnality added:

## Wider tests:

In [None]:

def measure_performance(df, func, window_size=3):
    try:
        if type(df)==pdo.DataFrame:
            operation = functools.partial(df.rolling(window=window_size).apply, func, axis = 0,raw=True)
        else:
            operation = functools.partial(df.rolling(window=window_size).apply, func, raw=True)
        start_time = timeit.default_timer()
        result = operation()
        elapsed_time = timeit.default_timer() - start_time
        return np.sum(result.dropna().values), elapsed_time, None
    except Exception as e:
        return None, None, str(e)

def run_tests(data_size, agg_funcs):
    results = {}
    total_tests = (data_size - 1) * len(agg_funcs) 
    progress_bar = tqdm.tqdm(total=total_tests, desc="Running Tests", ncols=100)
    test_type = 'rolling'
    for test_num in range(1, data_size):
        df_size = int(10**test_num)
        for func in agg_funcs:
            for test_iter in range(15):
                pandas_df = pd.DataFrame(np.random.randn(df_size, 4), columns=['A', 'B', 'C', 'D']).astype(np.float32)
                pandopt_df = pdo.DataFramepandas_df)
                pandas_checksum, pandas_time, pandas_error = measure_performance(pandas_df, func)
                pandopt_checksum, pandopt_time, pandopt_error = measure_performance(pandopt_df, func)

                key = f"Size: 10^{test_num}, Func: {func.__name__}, Test: {test_type} - {test_iter}"
                results[key] = {
                    "Size":df_size,
                    "function": func.__name__, 
                    "test_type": test_type, 
                    "test_iter": test_iter,
                    "Pandas Time (s)": pandas_time,
                    "Pandopt Time (s)": pandopt_time,
                    "Checksum Pandas": pandas_checksum,
                    "Checksum Pandopt": pandopt_checksum,
                    "Pandas Error": pandas_error,
                    "Pandopt Error": pandopt_error
                }

            progress_bar.update(1)

    progress_bar.close()
    return pd.DataFrame.from_dict(results, orient='index')

agg_functions = [agg_sum, agg_mean, agg_max, agg_min, agg_std]
results_df = run_tests(data_size=8, agg_funcs=agg_functions)

In [None]:
results_df['time reduction'] = results_df["Pandopt Time (s)"] / results_df["Pandas Time (s)"] - 1
results_df['performance multiplicator'] = results_df["Pandas Time (s)"] / results_df["Pandopt Time (s)"] - 1
results_df

In [None]:
results_df.to_csv('benchmark_rolling.csv')