In [None]:
# results obtained from E1
degree = 3
compression_rate = 0.3
eps = 1e-8

In [None]:
%%time
from utils.data import load_ucr_dataset, load_ucr_data_short_and_medium, load_gbnc_data, load_ucr_data_short

# load data
data = load_ucr_data_short_and_medium()
#data = load_ucr_dataset(48)
#data = load_gbnc_data()
print("number of time series:", len(data.index))


In [None]:
data.reset_index(drop=True,inplace=True)
data.head()

In [None]:
def fit_curve_to_data(method: str, ts: [(float, float)], knots: [float], num_coeffs: int, degree=None):
    fitted_curve = None
    match method:
        case 'PAA':
            _, result = fit_max_spline(ts, knots, 0)
            fitted_curve = [evaluate_spline(knots, result, 0, x[0]) for x in ts]

        case 'PLA':
            _, result = fit_max_spline(ts, knots, 1)
            fitted_curve = [evaluate_spline(knots, result, 1, x[0]) for x in ts]

        case 'L8':
            if degree is None:
                degree = 3

            _, result = fit_max_spline(ts, knots, degree)
            fitted_curve = [evaluate_spline(knots, result, degree, x[0]) for x in ts]

        case 'L8 and L1':
            if degree is None:
                degree = 3

            _, result = fit_max_l1_spline(ts, knots, degree, eps=eps)
            fitted_curve = [evaluate_spline(knots, result, degree, x[0]) for x in ts]

        case 'LSQ':
            if degree is None:
                degree = 3

            result = fit_LSQ_spline(time_series=ts, knots=knots, degree=degree)
            fitted_curve = [evaluate_spline(knots, result, degree, x[0]) for x in ts]

        case 'DFT':
            result = fit_DFT(ts, num_coeffs)
            fitted_curve = calculate_inverse_DFT(len(ts), num_coeffs, result)

    return fitted_curve

In [None]:
def calculate_error_measures(data_points: [float], fitted_curve: [float]):
    assert len(data_points) == len(fitted_curve)

    max_dist = np.max(np.abs(np.array(fitted_curve) - np.array(data_points)))
    mse = mean_squared_error(fitted_curve, data_points)
    mae = mean_absolute_error(fitted_curve, data_points)
    

    return {'max_dist': max_dist,
            'MSE': mse,
            'MAE': mae}

# MIT AUSREISSERN, OHNE PLOTS

In [None]:
%%time
import matplotlib.pyplot as plt
from utils.plot import add_fitted_curve_to_plot
from tsfel import mean_diff
from utils.data import remove_outliers, replace_outliers
from utils.spline import generate_coeff_counts, generate_knot_vector_from_coeff_count
from utils.fit import fit_max_spline, fit_max_l1_spline, fit_DFT, calculate_inverse_DFT, fit_LSQ_spline
from sklearn.metrics import mean_squared_error, mean_absolute_error
from utils.spline import evaluate_spline
import pandas as pd
import numpy as np

results = []
num_rows = len(data.index)
print("num_rows", num_rows)
ts_properties = []
worse_without_outliers_counter = 0

for idx, row in data.iterrows():
    original_time_series = row['data']
    time_series_without_outliers = remove_outliers(row['data'])
    time_series_with_replaced_outliers = replace_outliers(ts_without_outliers=time_series_without_outliers,
                                                          original_xs=[tup[0] for tup in original_time_series])

    n_outliers = len(original_time_series) - len(time_series_without_outliers)

    print(f"{idx}: {row['dataset']} no. {row['num']}, {round(idx / num_rows * 100, 2)}% processed")
    #print("n_outliers", n_outliers)

    """f, axes = plt.subplots(1, 2, sharey=True)
    f.set_figwidth(12)

    axes[0].set_title("Data including outliers")
    axes[1].set_title("Data without outliers")"""

    ts_list = [(original_time_series, 0)]

    if n_outliers > 0:
        ts_list.append((time_series_with_replaced_outliers, n_outliers))

    max_dist_with_outliers = None
    max_dist_without_outliers = None

    for ts, num_outliers_removed in ts_list:

        y_values = [tup[1] for tup in ts]

        ts_properties.append({
            'dataset': row['dataset'],
            'num': row['num'],
            'num_data_pts': len(ts),
            'num_outliers_removed': num_outliers_removed,
            'mean_diff': mean_diff(ts),
            'max_fst_derivative': max(abs(pd.Series(y_values).diff().dropna())),
            'max_snd_derivative': max(abs(pd.Series(y_values).diff().diff().dropna())),
            'avg_fst_derivative': abs(pd.Series(y_values).diff().dropna()).mean(),
            'avg_snd_derivative': abs(pd.Series(y_values).diff().diff().dropna()).mean()
        })

        min_num_coeffs = degree + 1
        num_coeffs_count = generate_coeff_counts(len(original_time_series), degree, [compression_rate])[0]
        num_coeffs = max(min_num_coeffs, num_coeffs_count - num_outliers_removed)
        if num_coeffs_count - num_outliers_removed < min_num_coeffs:
            print("num_coeffs_count is",
                  num_coeffs_count - num_outliers_removed,
                  "but required is at least", min_num_coeffs)
            print("setting num_coeffs to", num_coeffs)
        knots = generate_knot_vector_from_coeff_count(degree=degree, num_coeffs=num_coeffs)

        for method in ['L8', 'L8 and L1', 'LSQ', 'PAA', 'PLA', 'DFT']:
            plot_method = False
            axis = None
            metrics = None

            """if num_outliers_removed == 0:
                axis = axes[0]
                axis.scatter([d[0] for d in ts], [d[1] for d in ts], color="red", marker='.')
            elif num_outliers_removed > 0:
                axis = axes[1]
                axis.scatter([d[0] for d in original_time_series], [d[1] for d in original_time_series], color="brown",
                             marker='.')
                axis.scatter([d[0] for d in ts], [d[1] for d in ts], color="green", marker='.')"""

            fitted_curve = fit_curve_to_data(method, ts, knots, num_coeffs, degree)

            if num_outliers_removed == 0:
                metrics = calculate_error_measures(data_points=y_values, fitted_curve=fitted_curve)

            elif num_outliers_removed > 0:
                xs = [tup[0] for tup in time_series_without_outliers]
                indicators = [1 if tup[0] in set(xs) else 0 for tup in original_time_series]
                filtered_data_points = [y for y, indicator in
                                        zip([tup[1] for tup in time_series_with_replaced_outliers], indicators) if
                                        indicator == 1]
                filtered_fitted_curve = [y for y, indicator in zip(fitted_curve, indicators) if indicator == 1]
                metrics = calculate_error_measures(data_points=filtered_data_points, fitted_curve=filtered_fitted_curve)

            #add_fitted_curve_to_plot(axis, [x[0] for x in ts], fitted_curve, metrics['max_dist'], label=method)

            if method == 'L8':
                if num_outliers_removed == 0:
                    max_dist_with_outliers = metrics['max_dist']
                elif num_outliers_removed > 0:
                    max_dist_without_outliers = metrics['max_dist']

            if (max_dist_with_outliers is not None) and (max_dist_without_outliers is not None):
                if max_dist_without_outliers >= max_dist_with_outliers:
                    if method == 'L8':
                        worse_without_outliers_counter += 1
                    continue

            results.append({
                'dataset': row['dataset'],
                'num': row['num'],
                'num_data_pts': len(ts),
                'num_outliers_removed': num_outliers_removed if num_outliers_removed >= 0 else 0,
                'num_coeffs': num_coeffs,
                'method': method,
                'max_dist': metrics['max_dist'],
                'MSE': metrics['MSE'],
                'MAE': metrics['MAE']
            })

        # print error measures for ts
        """if num_outliers_removed == 0:
            df = pd.DataFrame(results)
            current_df = df[(df['dataset'] == row['dataset']) & (df['num'] == row['num']) & (
                    df['num_outliers_removed'] == num_outliers_removed)]
            if not current_df.empty:
                print("num_outliers_removed", num_outliers_removed)
                print(current_df.loc[:, ['method', 'max_dist', 'MSE', 'MAE']])"""

    """plt.subplots_adjust(bottom=0.25, top=0.95)
    axes[0].legend(loc='upper right')
    axes[1].legend(loc='upper right')
    plt.tight_layout()
    plt.show()"""

# EVALUATE RESULTS

In [None]:
worse_without_outliers_counter

In [None]:
print("time series worse without outliers:", round(worse_without_outliers_counter/num_rows * 100, 1), "%")

In [None]:
import pandas as pd

pd.set_option('display.float_format', '{:.10f}'.format)

e2_results = pd.DataFrame(results).drop_duplicates()
mean_values = e2_results.groupby('method').agg({'max_dist': 'mean', 'MSE': 'mean', 'MAE': 'mean'}).reset_index()
mean_values

In [None]:
import matplotlib.pyplot as plt

# Plot mean values
plt.figure(figsize=(10, 6))

plt.bar(mean_values['method'], mean_values['max_dist'], color='b', label='Mean Max Dist')
plt.bar(mean_values['method'], mean_values['MAE'], alpha=0.7, color='g', label='Mean MAE')
plt.bar(mean_values['method'], mean_values['MSE'], alpha=0.5, color='r', label='Mean MSE')
plt.xlabel('Method')
plt.ylabel('Mean Value')
plt.title('Mean max. distance, MSE, and MAE per method')
plt.legend()
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

In [None]:
# combine results df with ts properties df for analysis later
ts_props = pd.DataFrame(ts_properties).drop_duplicates()
e2 = pd.merge(e2_results, ts_props, how='left', on=['dataset', 'num', 'num_data_pts', 'num_outliers_removed'])
e2

# comparison: outliers vs. no outliers (for each method)

In [None]:
df_no_outliers = e2[e2['num_outliers_removed'] > 0]
df_with_outliers = e2[e2['num_outliers_removed'] == 0]
#df_no_outliers = e2_results[e2_results['num_outliers_removed'] > 0]
#df_with_outliers = e2_results[e2_results['num_outliers_removed'] == 0]

mean_values_no_outliers = df_no_outliers.groupby('method').agg(
    {'max_dist': 'mean', 'MSE': 'mean', 'MAE': 'mean'}).reset_index()
mean_values_no_outliers.rename(columns={'max_dist': 'max_dist w/o o.', 'MSE': 'MSE w/o o.', 'MAE': 'MAE w/o o.'},
                               inplace=True)

mean_values_with_outliers = df_with_outliers.groupby('method').agg(
    {'max_dist': 'mean', 'MSE': 'mean', 'MAE': 'mean'}).reset_index()
mean_values_with_outliers.rename(columns={'max_dist': 'max_dist', 'MSE': 'MSE', 'MAE': 'MAE'}, inplace=True)

mean_values_combined = pd.merge(mean_values_with_outliers, mean_values_no_outliers, on='method', how='outer')
mean_values_combined

In [None]:
import matplotlib.pyplot as plt
import numpy as np

# Example data (replace with your actual mean accuracy values)
methods = mean_values_combined['method']
metrics = ['MSE', 'MAE', 'max_dist']

# Set the width of the bars
bar_width = 0.2
num_methods = len(methods)
r = np.arange(num_methods)

# Create subplots for each accuracy measure
fig, axs = plt.subplots(len(metrics), figsize=(10, 15))

for i, metric in enumerate(metrics):
    metric_with_outliers = mean_values_combined[metric]
    metric_without_outliers = mean_values_combined[f'{metric} w/o o.']

    # Set the position of the bars for each accuracy measure
    r1 = r + bar_width / 2
    r2 = [x + bar_width for x in r1]

    # Create the bar plots for each accuracy measure
    axs[i].bar(r1, metric_with_outliers, color='b', width=bar_width, label='With Outliers')
    axs[i].bar(r2, metric_without_outliers, color='r', width=bar_width, label='Without Outliers')

    # Add labels and title
    axs[i].set_xlabel('Method', fontweight='bold')
    axs[i].set_ylabel(f'Mean {metric}', fontweight='bold')
    axs[i].set_xticks([r + bar_width / 2 for r in range(num_methods)])
    axs[i].set_xticklabels(methods, rotation=45, ha='right')
    axs[i].set_title(f'Comparison of Mean {metric} with and without Outlier Removal')
    axs[i].legend()

# Adjust layout and show plot
plt.tight_layout()
plt.show()


In [None]:
ts_props

In [None]:
e2_results