## Data Cleaning Notebook  

This notebook cleans the data prepared by `prepare.ipynb`. Before running this notebook, ensure that `prepare.ipynb` has been executed, as it generates the necessary raw data in the `data/processed/` folder. You will need the path to the file created by `prepare.ipynb` to run this notebook.  

#### Current Scope  
- Cleans and preprocesses data for training.  
- Requires the processed data from `prepare.ipynb` in `data/processed/`.  
- Saves the cleaned data to `data/processed/` for use in training.  


In [1]:
import os
import json
import random
import statistics
import numpy as np
from collections import Counter
import plotly.graph_objs as go
import math

In [2]:
prepared_data_file_name = "merged-prepared-data-2025-03-08-16:27:23.json"  # this file name should be changed according to the file created after running prepare.ipynb

prepared_data_file_path = os.path.join(
    os.path.dirname(os.getcwd()), "data", "processed", prepared_data_file_name
)  # path to the prepared data file

os.path.exists(prepared_data_file_path)  # check if the file exists, must return True

True

In [3]:
original_data = []
cleaned_data = []

file = open(
    prepared_data_file_path, "r"
)  # not the cool way to open a file, but it works, and I don't want unnecessary identetion
original_data = json.load(file)
file.close()

intermediate_steps_lengths = [
    len(steps["path"]) for steps in original_data
]  # how many intemediate steps were taken in dataset between initial and final mouse position
intermediate_steps_counts = Counter(
    intermediate_steps_lengths
)  # count how many times each number of intermediate steps was taken

intermediate_steps_counts = dict(
    sorted(intermediate_steps_counts.items())
)  # sort the dictionary by keys

In [6]:
# Calculate statistics
mode_value = statistics.mode(intermediate_steps_lengths)
mean_value = statistics.mean(intermediate_steps_lengths)
median_value = statistics.median(intermediate_steps_lengths)

# Percentiles
first_quartile = np.percentile(intermediate_steps_lengths, 25)
third_quartile = np.percentile(intermediate_steps_lengths, 75)
ninetieth_percentile = np.percentile(intermediate_steps_lengths, 90)
ninety_fifth_percentile = np.percentile(intermediate_steps_lengths, 95)
ninety_ninth_percentile = np.percentile(intermediate_steps_lengths, 99)

# Standard Deviation and Variance
std_dev = np.std(intermediate_steps_lengths)
variance = np.var(intermediate_steps_lengths)

In [7]:
fig = go.Figure()

# Add a trace to the figure (line plot with improved markers)
fig.add_trace(
    go.Scatter(
        x=list(intermediate_steps_counts.keys()),
        y=list(intermediate_steps_counts.values()),
        mode="lines+markers",
        name="Intermediate Steps Count",
        line=dict(color="royalblue", width=2),
        marker=dict(size=6, color="darkblue", symbol="circle"),
    )
)

# Calculate some statistics for annotations
max_count = max(intermediate_steps_counts.values())
max_steps = list(intermediate_steps_counts.keys())[
    list(intermediate_steps_counts.values()).index(max_count)
]
mean_steps = statistics.mean(intermediate_steps_lengths)
median_steps = statistics.median(intermediate_steps_lengths)

# Add annotations for key statistics
fig.add_annotation(
    x=max_steps,
    y=max_count,
    text=f"Peak: {max_count} samples with {max_steps} steps",
    showarrow=True,
    arrowhead=1,
    ax=50,
    ay=-40,
)

# Add horizontal line for mean
fig.add_shape(
    type="line",
    x0=min(intermediate_steps_counts.keys()),
    y0=mean_steps,
    x1=max(intermediate_steps_counts.keys()),
    y1=mean_steps,
    line=dict(color="red", width=1, dash="dash"),
)

fig.add_annotation(
    x=max(intermediate_steps_counts.keys()) * 0.9,
    y=mean_steps,
    text=f"Mean: {mean_steps:.2f} steps",
    showarrow=False,
    font=dict(color="red"),
)

# Add statistical information in a box in the top right corner
stats_text = (
    f"<b>Statistics:</b><br>"
    f"Mean: {mean_value:.2f}<br>"
    f"Median: {median_value}<br>"
    f"Mode: {mode_value}<br>"
    f"Std Dev: {std_dev:.2f}<br>"
    f"Q1: {first_quartile:.0f}<br>"
    f"Q3: {third_quartile:.0f}<br>"
    f"90%: {ninetieth_percentile:.0f}<br>"
    f"95%: {ninety_fifth_percentile:.0f}<br>"
    f"99%: {ninety_ninth_percentile:.0f}"
)

fig.add_annotation(
    x=0.98,
    y=0.98,
    xref="paper",
    yref="paper",
    text=stats_text,
    showarrow=False,
    align="left",
    bgcolor="rgba(255, 255, 255, 0.8)",
    bordercolor="gray",
    borderwidth=1,
    font=dict(size=10),
    xanchor="right",
    yanchor="top",
)

# Update layout with hover functionality and improved styling
fig.update_layout(
    title={
        "text": "Distribution of Intermediate Steps in Mouse Movement Data",
        "y": 0.95,
        "x": 0.5,
        "xanchor": "center",
        "yanchor": "top",
        "font": dict(size=18),
    },
    xaxis_title="Number of Intermediate Steps",
    yaxis_title="Frequency (Number of Samples)",
    hovermode="x unified",
    hoverlabel=dict(bgcolor="white", font_size=12, font_family="Arial"),
    template="plotly_white",
    legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1),
    margin=dict(l=60, r=60, t=80, b=60),
)

# Add grid and improve axes
fig.update_xaxes(showgrid=True, gridwidth=1, gridcolor="lightgray")
fig.update_yaxes(showgrid=True, gridwidth=1, gridcolor="lightgray")

fig.show()

In [8]:
suitable_steps = int(
    (mode_value + median_value + third_quartile) / 3
)  # it's just a intrusive guess and it is giving better results than the mean value
suitable_steps

39

In [9]:
def clean_equal_size(data):
    """
    Clean the data having equal number of intermediate steps as suitable_steps
    """
    processed_data = []
    processed_data.append([data[0]["x"], data[0]["y"]])
    for index, step in enumerate(data):
        if index == 0 or index == len(data) - 1:
            continue
        current_step = [step["x"], step["y"]]
        processed_data.append(current_step)

    processed_data.append([data[-1]["x"], data[-1]["y"]])
    return processed_data

In [10]:
def clean_smaller_size(data, suitable_steps):
    """
    Clean the data having smaller number of intermediate steps than suitable_steps
    """
    processed_data = [[0, 0] for _ in range(suitable_steps)]
    processed_data[0] = [
        data[0]["x"],
        data[0]["y"],
    ]  # saving the intial position as it is, no need to modify it
    processed_data[-1] = [
        data[-1]["x"],
        data[-1]["y"],
    ]  # saving the final position as it is, no need to modify it

    random_positions_to_fill = (
        np.linspace(1, suitable_steps - 2, suitable_steps - len(data), endpoint=False)
        .astype(int)
        .tolist()
    )

    for index, rp in enumerate(random_positions_to_fill):
        if index < len(data) - 1:
            processed_data[rp] = [data[index]["x"], data[index]["y"]]

    # saving the indices of original data
    org_data_indices = [0, suitable_steps - 1]
    org_data_indices.extend(random_positions_to_fill)
    org_data_indices.sort()

    last_index = 0
    last_value = processed_data[last_index]

    for current_index in org_data_indices:
        if current_index == 0 or current_index == suitable_steps - 1:
            continue

        gap_between_data = current_index - last_index

        for i in range(last_index, current_index):
            to_add = [
                (processed_data[current_index][0] - last_value[0]) / gap_between_data,
                (processed_data[current_index][1] - last_value[1]) / gap_between_data,
            ]
            processed_data[i] = [
                last_value[0] + to_add[0],
                last_value[1] + to_add[1],
            ]

        last_value = processed_data[last_index]

    return processed_data

In [11]:
def clean_larger_size(data, suitable_steps):
    processed_data = [[0, 0] for _ in range(suitable_steps)]
    processed_data[0] = [
        data[0]["x"],
        data[0]["y"],
    ]  # saving the intial position as it is, no need to modify it
    processed_data[-1] = [
        data[-1]["x"],
        data[-1]["y"],
    ]  # saving the final position as it is, no need to modify it

    distances = [
        math.sqrt(
            (data[i + 1]["x"] - data[i]["x"]) ** 2
            + (data[i + 1]["y"] - data[i]["y"]) ** 2
        )
        + (
            math.sqrt(
                (data[i]["x"] - data[i - 1]["x"]) ** 2
                + (data[i]["y"] - data[i - 1]["y"]) ** 2
            )
        )
        for i in range(1, len(data) - 1)
    ]
    distances_arr = np.array(distances)
    sorted_indices = np.argsort(
        distances_arr
    )  # sorting the indices of distances in ascending order

    positions_to_remove = sorted_indices[
        : (len(data) - suitable_steps)
    ]  # removing the first n elements from the sorted indices; removing the smallest n distances
    processed_data = [
        [step["x"], step["y"]]
        for index, step in enumerate(data)
        if index not in positions_to_remove
    ]
    return processed_data

In [12]:
random.shuffle(original_data)  # shuffle the data to get random samples
cleaned_data = {"input": [], "output": []}

for data in original_data:
    input_data = data["initial"] + data["final"]
    current_path_movement_data = data["path"]
    if len(current_path_movement_data) < suitable_steps:
        cleaned_output_data = clean_smaller_size(
            current_path_movement_data, suitable_steps
        )
    elif len(current_path_movement_data) > suitable_steps:
        cleaned_output_data = clean_larger_size(
            current_path_movement_data, suitable_steps
        )
    else:
        cleaned_output_data = clean_equal_size(current_path_movement_data)
    cleaned_data["input"].append(input_data)
    cleaned_data["output"].append(cleaned_output_data)

In [None]:
cleaned_data_file_name = (
    f"cleaned-data-{suitable_steps}-steps-{prepared_data_file_name}"
)
cleaned_data_file_path = os.path.join(
    os.path.dirname(os.getcwd()), "data", "processed", cleaned_data_file_name
)
print(cleaned_data_file_path)
os.path.exists(cleaned_data_file_path)  # check if the file exists, must return False

In [115]:
file = open(cleaned_data_file_path, "w")
json.dump(cleaned_data, file)
file.close()