In [14]:
%matplotlib notebook
from src.main.mando_preprocessing import linear_resample
import numpy as np
import matplotlib.pyplot as plt
import os
import json
import plotly.graph_objs as go
from plotly.subplots import make_subplots
import pandas as pd
import matplotlib.dates as mdates
import re

In [16]:
def load_data(path):
    # Find the weight file in the directory
    files = [f for f in os.listdir(path) if f.startswith('weights_') and f.endswith('.txt')]
    if not files:
        raise ValueError(f"No weight files found in directory: {path}")
    # We assume there's only one file matching the pattern, hence we take the first one
    file_path = os.path.join(path, files[0])
    # Load the data using pandas
    weights = pd.read_csv(file_path, header=None, names=['weight'])
#     weights = weights[45:] # for 7,10,14,15,16 ->50,20,23,25,45
    return weights['weight']

In [None]:
def moving_average(data, window_size):
    """Calculate the moving average of the given data using a window of specified size."""
    cumsum_vec = np.cumsum(np.insert(data, 0, 0)) 
    return (cumsum_vec[window_size:] - cumsum_vec[:-window_size]) / window_size

In [None]:
def create_decreasing_curve(data):
    """Ensure that the data only decreases or stays constant over time to represent only food consumption."""
    decreased_data = np.copy(data)
    for i in range(1, len(decreased_data)):
        if decreased_data[i] > decreased_data[i - 1]:
            decreased_data[i] = decreased_data[i - 1]
    return decreased_data

In [None]:
def process_meal_data(raw_data):
    processed_data = []
    last_valid_weight = raw_data[0]  # Initialize with the first measurement

    for i in range(len(raw_data)):
        if i < len(raw_data) - 1:  # Check if not at the last element
            current_weight = raw_data[i]
            next_weight = raw_data[i + 1]

            # Check for a valid decrease
            if last_valid_weight - current_weight <= 80 and \
               abs(next_weight - current_weight) <= 3:
                last_valid_weight = current_weight

        processed_data.append(last_valid_weight)

    return processed_data

In [None]:
def process_meal_data_2(data, stability_range=3, max_decrease=70):
    """
    Process the meal weight data to ensure decreases are within specified limits and stable.

    Parameters:
    data (list or np.array): The raw meal weight data.
    stability_range (int): The allowed fluctuation range for considering a decrease as stable (default 3 grams).
    max_decrease (int): Maximum allowed decrease in weight between two consecutive measurements (default 40 grams).

    Returns:
    np.array: The processed meal weight data with filtered decreases.
    """
    if len(data) < 2:
        return np.array(data)  # Not enough data to process

    processed_data = np.copy(data)
    for i in range(1, len(data)):
        current_decrease = processed_data[i - 1] - processed_data[i]

        # Check if decrease is more than the maximum allowed
        if current_decrease > max_decrease:
            processed_data[i] = processed_data[i - 1]
        else:
            # Check for stability in the next measurement (if exists)
            if i < len(data) - 1:
                next_decrease = processed_data[i] - data[i + 1]
                if abs(next_decrease) > stability_range:
                    processed_data[i] = processed_data[i - 1]

    return processed_data

In [None]:
def plot(raw_data, decreasing_data):
    """
    Plot the raw, smoothed, and decreasing step-like weight data in separate subplots sharing the x-axis.

    Parameters:
    - raw_data (np.ndarray): The raw weight data array.
    - smoothed_data (np.ndarray): The smoothed weight data array.
    - decreasing_data (np.ndarray): The decreasing step-like weight data array.
    """
    fig, axs = plt.subplots(2, 1, figsize=(15, 10), sharex=True)
    
    # Raw data plot
    axs[0].plot(raw_data, label='Raw Data', alpha=0.5, color='green')
    axs[0].set_title('Raw Weight Data')
    axs[0].set_ylabel('Weight (grams)')
    axs[0].legend()

    # # Smoothed data plot
    # axs[1].plot(smoothed_data, label='Smoothed Data', color='orange')
    # axs[1].set_title('Smoothed Weight Data')
    # axs[1].set_ylabel('Weight (grams)')
    # axs[1].legend()

    # Decreasing step-like data plot
    axs[1].plot(decreasing_data, label='Decreasing Step-like Data', color='red')
    axs[1].set_title('Decreasing Step-like Weight Data')
    axs[1].set_xlabel('Time (arbitrary units)')
    axs[1].set_ylabel('Weight (grams)')
    axs[1].legend()
    
    plt.tight_layout()  # Adjust layout to prevent overlap
    plt.show()


In [17]:
def plot_ly(data):
    fig = make_subplots(rows=2, cols=1, shared_xaxes=True)

    # Raw data plot
    fig.add_trace(go.Scatter(y=data, mode='lines', name='Raw Data', opacity=0.5, line=dict(color='green')), row=1, col=1)

    # Update yaxis properties
    fig.update_yaxes(title_text='Weight (grams)', row=1, col=1)

    # Update titles with light mode theme color    
    fig.update_layout(height=700, width=1000, title_text='Raw Weight Data', template='plotly_white')

    fig.show()

In [None]:
def calculate_bite_sizes(decreasing_data, threshold=4):
    """
    Calculate the number of bites and the weight of each bite from the decreasing step-like weight data,
    considering only those bites where the weight decrease is greater than a specified threshold.

    Parameters:
    - decreasing_data (np.ndarray): The decreasing step-like weight data array.
    - threshold (int): The minimum weight decrease to qualify as a bite.

    Returns:
    - int: The number of bites.
    - list: The list of weights for each qualified bite.
    """
    bite_sizes = []
    for i in range(1, len(decreasing_data)):
        if decreasing_data[i] < decreasing_data[i - 1]:
            bite_size = decreasing_data[i - 1] - decreasing_data[i]
            if bite_size >= threshold:
                bite_sizes.append(bite_size)
    
    number_of_bites = len(bite_sizes)
    return number_of_bites, bite_sizes

# You would then call this function as before in the main function.


In [None]:
from scipy.signal import find_peaks, savgol_filter

def calculate_bite_sizes_advanced(decreasing_data, threshold=4, window_length=21, polyorder=3):
    """
    Advanced calculation of the number of bites and the weight of each bite using signal processing
    techniques to detect the local minima in the smoothed signal.

    Parameters:
    - decreasing_data (np.ndarray): The decreasing step-like weight data array.
    - threshold (int): The minimum weight decrease to qualify as a bite.
    - window_length (int): The length of the filter window (number of coefficients). Must be a positive odd integer.
    - polyorder (int): The order of the polynomial used to fit the samples. Must be less than `window_length`.

    Returns:
    - int: The number of bites.
    - list: The list of weights for each qualified bite.
    """
    # Apply a Savitzky-Golay filter to the data to smooth it while preserving peaks
    smoothed_data = savgol_filter(decreasing_data, window_length=window_length, polyorder=polyorder, mode='interp')

    # Compute the first derivative of the smoothed data
    derivative = np.diff(smoothed_data, n=1)
    
    # Find peaks (local minima) in the negative derivative (which correspond to bites in the original data)
    peaks, _ = find_peaks(-derivative, height=-threshold)

    # Calculate bite sizes based on the peaks detected
    bite_sizes = np.diff(peaks, prepend=0)
    
    # Filter out the consecutive bites that are too close to each other, if necessary
    # This step is optional and can be customized based on domain knowledge
    
    number_of_bites = len(bite_sizes)
    return number_of_bites, bite_sizes.tolist()

# You would call this function in your main function after processing the data.


In [None]:
def calculate_bite_statistics(decreasing_data, threshold=4, min_bite_interval=10):
    """
    Calculate bite statistics by considering bite size, interval, and typical eating patterns.

    Parameters:
    - decreasing_data (np.ndarray): The decreasing step-like weight data array.
    - threshold (int): The minimum weight decrease to qualify as a bite.
    - min_bite_interval (int): The minimum number of seconds expected between bites.

    Returns:
    - int: The number of qualified bites.
    - list: The list of weights for each qualified bite.
    - float: The average interval between qualified bites.
    """
    bite_sizes = []
    bite_intervals = []
    last_bite_time = 0

    for i in range(1, len(decreasing_data)):
        if decreasing_data[i] < decreasing_data[i - 1]:
            bite_size = decreasing_data[i - 1] - decreasing_data[i]
            if bite_size >= threshold:
                bite_time = i
                bite_interval = bite_time - last_bite_time

                if bite_interval >= min_bite_interval or last_bite_time == 0:
                    bite_sizes.append(bite_size)
                    bite_intervals.append(bite_interval)
                    last_bite_time = bite_time

    number_of_bites = len(bite_sizes)
    average_bite_interval = np.mean(bite_intervals) if bite_intervals else 0
    return number_of_bites, bite_sizes, average_bite_interval

# Call this function in your main function after creating the decreasing_weights curve


In [None]:
def dif(og, c):
    difference = og - c
    # print(f"Original: {og}\nCalculated: {c}")
    # print(f"Difference original - calculated = {difference}")
    percentage = (difference / og) if og else 0  # Check for division by zero if og is zero
    # print(f"Percentage: {percentage * 100:.2f} %")  # Format to two decimal places
    return percentage

In [None]:
def grid_search(data, og_bites, weight_thresholds, time_thresholds):
    best_acc = 200  # Initialize with a very high number to find min
    best_param = (None, None)

    for weight_threshold in weight_thresholds:
        for time_threshold in time_thresholds:
            number_of_bites, bite_sizes, bite_interval = calculate_bite_statistics(
                data, 
                threshold=weight_threshold, 
                min_bite_interval=time_threshold
            )

            if og_bites is not None and number_of_bites is not None:
                current_error = dif(og_bites, number_of_bites)
                # Update if the current_error is less than the best_accuracy found so far
                if abs(current_error) < best_acc:
                    best_acc = abs(current_error)
                    best_param = (weight_threshold, time_threshold)

    return best_acc, best_param

In [None]:
with open('../config/mando_config.json') as config_file:
    config = json.load(config_file)
bites = config['ground_truth']['num_of_bites']

In [None]:
accuracy = []
cursed = [1, 2, 4, 6, 9, 14, 15]
gram_thresholds = [0, 1, 2, 3, 4, 5, 6, 7, 8]
time_thresholds = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

In [None]:

# for subject in range(len(bites)):
def main():    
   # if subject in cursed:
   #      continue
    subject = 18
    print(f"Subject {subject + 1}")
    
    path = f'../data/raw/{subject+1}'
    
    # 1. Load the data
    weights = load_data(path).to_numpy()    
    
    # 2. Obtain the decreasing step-like curve
    # decreasing_data = create_decreasing_curve(weights)
    decreasing_data = process_meal_data_2(weights)
    # 3. Grid search for the best parameters
    best_accuracy, best_params = grid_search(decreasing_data, bites[subject], gram_thresholds, time_thresholds)
    print(f"Best accuracy: {((1-abs(best_accuracy))) * 100:.2f} %")
    print(f"Best parameters: {best_params[0]}, {best_params[1]}")
    # # 3. Get the number of bites and the weight of each bite using the 'simple' method
    # number_of_bites, bite_sizes = calculate_bite_sizes(decreasing_data, 2)
    # print(f"Number of bites simple: {number_of_bites}")
    # print(f"Weight of each bite simple: {bite_sizes}")
    
    # 3. Get the number of bites and the weight of each bite using the 'advanced' method
    # adv_number_of_bites, adv_bite_sizes, bite_interval = calculate_bite_statistics(decreasing_data, 1, 2)
    
    accuracy.append(1-best_accuracy) 
    calc_bites, sizes, intervals = calculate_bite_statistics(decreasing_data, best_params[0], best_params[1])
    print(f"Number of bites advanced: {calc_bites}")
    print(f"Weight of each bite advanced: {sizes}")
    print(f"Average interval between bites: {intervals}")
    plot_ly(decreasing_data)


In [None]:
main()

In [None]:
path = '../data/raw/19'

# 1. Load the data
weights = load_data(path).to_numpy()    

# 2. Obtain the decreasing step-like curve
decreasing_data = create_decreasing_curve(weights)
print(f"True number of bites: {bites[18]}")
calc_bites, sizes, intervals = calculate_bite_statistics(decreasing_data, 5, 1)
print(f"Number of bites advanced: {calc_bites}")
print(f"Weight of each bite advanced: {sizes}")
print(f"Average interval between bites: {intervals}")

In [18]:
path = '../data/raw/19'
weights = load_data(path)
plot_ly(weights)
resampled_weights = linear_resample(weights, 100)

plot_ly(resampled_weights)

In [None]:
plot_ly(resampled_weights)

In [12]:
def plot_weights_data(data):
    fig, ax = plt.subplots(figsize=(10,6))
    ax.plot(data.index, data['weight'], label='weight', color='b')
    ax.set(xlabel="Samples", ylabel="Weight values (g)", title="Mandometer data")

    # Format the x-axis to display time only
    date_format = mdates.DateFormatter('%H:%M:%S')
    ax.xaxis.set_major_formatter(date_format)
    
    ax.grid()
    plt.show()

In [13]:
plot_weights_data(resampled_weights)

<IPython.core.display.Javascript object>