## Imports

In [None]:
import os
import pandas as pd
from app.DataSpliter import GranularDataSplitter
from app.transformers.smoother import Smoother
from app.DataLoader import DataLoader
from app.visualization_utils import draw_plotly
import numpy as np
import matplotlib.pyplot as plt
from typing import Literal


In [None]:
%load_ext autoreload
%autoreload 2

## Data Loading


In [None]:
#don't use snowplow data
def read_files():
    path = "data/labeled/HUAWEI_MATE"
    files = os.listdir(path)
    dataframes = []
    for file in files:
        if file.endswith(".csv") and "snow_plow" not in file:
            print(f'{path}/{str(file)}')
            loader = DataLoader(f'{path}/{str(file)}')
            dataframes.append(loader.load_transform_data())
    return dataframes
labeled_dataframes = read_files()


## Data Smoothing and Grnularization

In [None]:

split_on = "Orientation"
def split_data_granular(dataframe_list : list[pd.DataFrame], split_on :str):
    orientation_dfs = []
    for df in dataframe_list:
        Granular_spliter = GranularDataSplitter(df)
        Granular_spliter.split_into_granular()
        orientation_dfs.append(Granular_spliter.granular_data[split_on].copy())
    return orientation_dfs

def smooth_data(dataframe_list : list[pd.DataFrame], sensor:str, window_size:int):
    smoothed_dataframes = []
    for df in dataframe_list:
        smoother = Smoother(df)
        smoothed_dataframes.append(smoother.smooth_data_backward_ma(window_size = window_size, columns= [sensor]))
    return smoothed_dataframes

orientation_dfs = split_data_granular(labeled_dataframes, split_on=split_on)
smoothed_orientation_dfs = smooth_data(orientation_dfs, sensor="roll", window_size=10)

In [None]:
smoothed_orientation_dfs[1]

In [None]:
for data in orientation_dfs:
    draw_plotly(data, "Orientation", granular=True)


In [None]:
for data in smoothed_orientation_dfs:
    draw_plotly(data, "Orientation", granular=True)


## Gradient Descent

In [None]:
#! pozbyc sie nanow z wygladzania

In [None]:
# () gradient descent with momentum? min max approach

# Przykład wczytania danych do DataFrame
# df = pd.read_csv('twoje_dane.csv')
# Zakładam, że kolumna "roll" istnieje i index jest poprawny
def plot_positions(series, positions, step, curves = None, find_maxima = None):
    plt.figure(figsize=(12, 6))
    plt.plot(series, label='Series', color='blue')
    plt.scatter(positions, series[positions], color='red', alpha=0.6, label='Positions')
    if curves is not None:
        plt.scatter(curves, series[curves], color='green', alpha=0.6,label='Curves')
    plt.xlabel('Index')
    plt.ylabel('Value')
    if find_maxima is None:
        plt.title('Positions at step {}'.format(step))
    else:
        extremum = "maxima" if find_maxima else "minima"
        plt.title('Positions at step {} for {}'.format(step, extremum))

    plt.legend()
    plt.show()

def get_curve_points(df:pd.DataFrame, turn: Literal["L", "R"] = None):
    if turn:
        return np.where(df[f'Curve'] == turn)[0].tolist()
    return np.where(df[f'Curve'] != False)[0].tolist()
    # Funkcja gradient descent do znajdowania ekstremów

def gradient_descent(df, series, normalized_series, normalized_positions,  start_indices, learning_rate, steps, momentum, find_maxima, printing):
    velocities = [0 for _ in start_indices]
    for step in range(steps):
        for i, idx in enumerate(normalized_positions):
            ahead_idx = int(min(1, idx + 1/(len(series) - 1)) * (len(series) - 1))
            behind_idx = int(max(0, idx - 1/(len(series) - 1)) * (len(series) - 1))

            grad = (normalized_series[ahead_idx] - normalized_series[behind_idx]) / 2

            # Update velocity and position with momentum
            if find_maxima:
                velocities[i] = momentum * velocities[i] + learning_rate * grad  # Gradient ascent for maxima
            else:
                velocities[i] = momentum * velocities[i] - learning_rate * grad  # Gradient descent for minima

            new_idx = idx + velocities[i]
            new_idx = max(0, min(1, new_idx))  # Ensure new_idx stays within 0-1

            normalized_positions[i] = new_idx
            #print for step for step
            #print(f'point: {i}, ahed_idx: {ahead_idx}, behind_idx: {behind_idx}, grad: {grad}, new_idx: {new_idx}, velocities: {velocities[i]}')
        positions = [int(idx * (len(series) - 1)) for idx in normalized_positions]
        if step % 10 == 0 and printing:
            plot_positions(series, positions, step, get_curve_points(df))
    # print(set(history))
    positions = [int(idx * (len(series) - 1)) for idx in normalized_positions]
    plot_positions(series, positions, "final", get_curve_points(df), find_maxima)
    return positions

def gradient_descent_full(df, start_indices, learning_rate=0.01, steps=1000, momentum=0.98, printing = False):
    series = df['roll'].values

    positions = start_indices.copy()
    #normalize series data to 0-1
    normalized_series = (series - series.min()) / (series.max() - series.min())

    # Normalize start_indices to 0-1
    normalized_positions = [idx / (len(series) - 1) for idx in start_indices]

    #initial plot
    plot_positions(series, positions, "beginning", get_curve_points(df))

    min_positions = gradient_descent(df, series, normalized_series, normalized_positions,  start_indices, learning_rate, steps, momentum, False, printing)
    max_positions = gradient_descent(df, series, normalized_series, normalized_positions,  start_indices, learning_rate, steps, momentum, True, printing)

    return min_positions, max_positions



# Wybieramy kolumnę "roll" z DataFrame

# Krok 1: Losowe wybieranie punktów początkowych
#np.random.seed(42)  # Ustawienie ziarna losowości dla powtarzalności wyników
num_points = 10
start_indices =np.random.randint(1, len(smoothed_orientation_dfs[5]) , size=num_points)  #np.array([800])

# Krok 2: Znalezienie ekstremów przy użyciu gradient descent
final_positions_min, final_positions_maximums = gradient_descent_full(smoothed_orientation_dfs[5], start_indices, printing=False)

# # Krok 3: Filtracja najważniejszych ekstremów
# unique_positions, counts = np.unique(final_positions, return_counts=True)
# threshold = np.percentile(counts, 1)  # Przykładowy próg
# important_extrema_indices = unique_positions[counts >= threshold]

# Wynik - DataFrame z najważniejszymi ekstremami
#important_extrema_smoothed_roll = smoothed_roll.iloc[final_positions]


## Select Best 

In [None]:
# # Krok 3: Filtracja najważniejszych ekstremów

def filter_extremes(final_positions):
    unique_positions, counts = np.unique(final_positions, return_counts=True)
    threshold = np.percentile(counts, 75)  # Przykładowy próg
    important_extrema_indices = unique_positions[counts >= threshold]
    return important_extrema_indices

# unique_positions, counts = np.unique(final_positions_min, return_counts=True)
# threshold = np.percentile(counts, 75)  # Przykładowy próg
# important_extrema_indices = unique_positions[counts >= threshold]

# Wynik - DataFrame z najważniejszymi ekstremami
#important_extrema_smoothed_roll = smoothed_roll.iloc[final_positions]
important_min = filter_extremes(final_positions_min)
important_max = filter_extremes(final_positions_maximums)

plot_positions(smoothed_orientation_dfs[5]['roll'].values, important_min, "Selected min_point", get_curve_points(smoothed_orientation_dfs[5], turn="L"))

plot_positions(smoothed_orientation_dfs[5]['roll'].values, important_max, "Selected max_points", get_curve_points(smoothed_orientation_dfs[5], turn="R"))


In [None]:

def get_distance(df, positions, turn ):

    curves = np.where(df['Curve'].isin([turn]))[0]
    positions = np.sort(positions)
    curves = np.sort(curves)

    distances = []
    val_diff = []

    for i in range(len(positions)):
        j = 0
        while j < len(curves)-1 and abs(positions[i] - curves[j]) > abs(positions[i] - curves[j+1]):
            j += 1
        #distances.append(abs(positions[i] - curves[j]))
        distances.append(positions[i] - curves[j])
        val_diff.append(df['roll'].values[positions[i]] - df['roll'].values[curves[j]])
    return distances, val_diff

In [None]:
distances_R, val_diff_R = get_distance(smoothed_orientation_dfs[5], important_max, "R")
distances_L, val_diff_L = get_distance(smoothed_orientation_dfs[5], important_min, "L")


In [None]:
distances_L

In [None]:
val_diff_L

In [None]:
distances_R

In [None]:
val_diff_R