In [None]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import plotly.graph_objects as go
import plotly.express as px
import matplotlib.pyplot as plt
from nolds import dfa
import warnings
import matplotlib.patches as mpatches

In [None]:
df_resampled = pd.read_csv('../data/Marion.lnk/MG_D.csv')

column_names = df_resampled.columns
df_resampled.columns = ['date_time', 'X', 'Y', 'Z']
print(f"New column names: {df_resampled.columns}")

# Convert 'date_time' to datetime format
df_resampled['date_time'] = pd.to_datetime(df_resampled['date_time'])


In [None]:
# Load the activity data
activity = pd.read_csv('../data/activity_M.csv', sep=';', skipinitialspace=True)

# Remove spaces around column names
activity.columns = activity.columns.str.strip()

# Define the base date
base_date = pd.to_datetime("2024-04-15")

# Add the base date to recorded days
activity['date'] = base_date + pd.to_timedelta(activity['jour'], unit='D')

# Function to clean and normalize time values
def clean_time(time_str):
    """
    Cleans and normalizes the time format.
    - Removes spaces
    - Converts 'h' to ':'
    - Adds ":00" if missing
    - Ignores invalid values
    """
    if pd.isna(time_str) or not isinstance(time_str, str) or time_str.strip() == "":
        return np.nan  # Ignore empty values
    
    time_str = time_str.strip().replace("h", ":")  # Convert format '9h30' -> '9:30'
    
    if ":" not in time_str:
        return time_str + ":00"  # Add seconds if missing
    elif time_str.count(":") == 1:
        return time_str + ":00"  # Ensure HH:MM:SS format
    
    return time_str  # Return correctly formatted time

# Apply the correction to the 'start' and 'end' columns
activity['debut'] = activity['debut'].astype(str).apply(clean_time)
activity['fin'] = activity['fin'].astype(str).apply(clean_time)

# Merge date and time to obtain a proper datetime format
activity['debut'] = pd.to_datetime(activity['date'].astype(str) + " " + activity['debut'], format='%Y-%m-%d %H:%M:%S', errors='coerce')
activity['fin'] = pd.to_datetime(activity['date'].astype(str) + " " + activity['fin'], format='%Y-%m-%d %H:%M:%S', errors='coerce')

# Remove the temporary 'date' column if it is no longer needed
activity.drop(columns=['date'], inplace=True)

# Display the first rows of the updated DataFrame
print(activity.head())

In [None]:
class InactivityDetector:
    def __init__(self, df, activity, epoch_size=1, inactivity_threshold='dynamic', delta_threshold=0.01, gravity_threshold=0.9):
        """
        Initializes the InactivityDetector.
        
        Parameters:
        - df: DataFrame containing accelerometer data (columns: ['date_time', 'X', 'Y', 'Z']).
        - activity: DataFrame containing activity logs with start and end times.
        - epoch_size: Time window (in seconds) for aggregation.
        - inactivity_threshold: Acceleration threshold for inactivity detection ('dynamic' or fixed value).
        - delta_threshold: Threshold for acceleration variation.
        - gravity_threshold: Threshold to consider values close to Earth's gravity.
        """
        self.df = df
        self.activity = activity
        self.epoch_size = epoch_size
        self.inactivity_threshold = inactivity_threshold
        self.delta_threshold = delta_threshold
        self.gravity_threshold = gravity_threshold
        self.processed_data = None
        self._prepare_data()
    
    def _prepare_data(self):
        """Computes overall acceleration and prepares the data."""
        self.df['acceleration'] = np.sqrt(self.df['X']**2 + self.df['Y']**2 + self.df['Z']**2)
        self.df['timestamp'] = pd.to_datetime(self.df['date_time'])
        self.df = self.df.sort_values('timestamp')
        self.df['epoch'] = self.df['timestamp'].dt.floor(f'{self.epoch_size}s')
        self.df['date'] = self.df['timestamp'].dt.date
        self.df['delta_acc'] = self.df['acceleration'].diff().abs()
        
        self.activity['debut'] = pd.to_datetime(self.activity['debut'])
        self.activity['fin'] = pd.to_datetime(self.activity['fin'])

        def detect_inactivity(self):
        """Detects inactivity periods using an acceleration threshold and acceleration variation threshold."""
        grouped = self.df.groupby(['epoch', 'date']).agg({'acceleration': 'mean', 'delta_acc': 'mean'}).reset_index()
        
        if self.inactivity_threshold == 'dynamic':
            mean_acc = grouped['acceleration'].mean()
            std_acc = grouped['acceleration'].std()
            threshold = max(0.1, mean_acc - 0.5 * std_acc)
        else:
            threshold = self.inactivity_threshold
        
        grouped['inactive'] = ((grouped['acceleration'] < threshold) | 
                               ((grouped['delta_acc'] < self.delta_threshold) & 
                                (np.abs(grouped['acceleration'] - 1) < self.gravity_threshold)))
        grouped['inactive_group'] = (grouped['inactive'] != grouped['inactive'].shift()).cumsum()
        
        self.processed_data = grouped
        return grouped
    
    def visualize_inactivity_per_day(self):
        """Displays inactivity periods and activities with better visibility."""
        if self.processed_data is None:
            self.detect_inactivity()

        color_map = px.colors.qualitative.Set1
        activity_types = self.activity['activite'].unique()
        activity_colors = {activity: color_map[i % len(color_map)] for i, activity in enumerate(activity_types)}

        for day in self.processed_data['date'].unique():
            daily_data = self.processed_data[self.processed_data['date'] == day]
            fig = go.Figure()

            # Acceleration curve
            fig.add_trace(go.Scatter(
                x=daily_data['epoch'], y=daily_data['acceleration'], 
                mode='lines', name='Acceleration', line=dict(color='blue', width=2)
            ))

            # Inactivity periods in transparency
            inactive_added = False  # Pour éviter de répéter la légende
            for _, group in daily_data.groupby('inactive_group'):
                if group['inactive'].iloc[0]:
                    fig.add_trace(go.Scatter(
                        x=group['epoch'], y=group['acceleration'],
                        mode='lines', fill='tozeroy', 
                        fillcolor='rgba(255, 0, 0, 0.2)',
                        line=dict(width=0), 
                        name='Inactive Periods' if not inactive_added else None,
                        showlegend=not inactive_added  # Affiche la légende une seule fois
                    ))
                    inactive_added = True

            # Adding activities in the background as colored zones
            for _, row in self.activity[self.activity['debut'].dt.date == day].iterrows():
                activity_color = activity_colors.get(row['activite'], 'rgba(0, 100, 255, 0.3)')
                fig.add_shape(
                    type="rect", x0=row['debut'], x1=row['fin'], 
                    y0=daily_data['acceleration'].min(), y1=daily_data['acceleration'].max(), 
                    fillcolor=activity_color, line=dict(width=0), opacity=0.3
                )
                fig.add_annotation(
                    x=row['debut'], y=daily_data['acceleration'].max(),
                    text=row['activite'], showarrow=False,
                    font=dict(size=10, color="black"),
                    bgcolor="rgba(255,255,255,0.7)"
                )

            fig.update_layout(
                title=f"Inactivity Periods & Activities - {day}",
                xaxis_title="Time", yaxis_title="Acceleration",
                hovermode="x", template="plotly_white"
            )
            fig.show()


# Run the detector
detector = InactivityDetector(df_resampled, activity, epoch_size=1, inactivity_threshold='dynamic', delta_threshold=0.01, gravity_threshold=0.9)
detector.detect_inactivity()
detector.visualize_inactivity_per_day()

In [None]:
warnings.simplefilter(action='ignore', category=FutureWarning)
class DFAAnalysis:
    def __init__(self, df, activity, inactivity_data, window_size=300, step_size=50):
        """
        Initializes the DFAAnalysis class.
        
        Parameters:
        - df: DataFrame containing time-series accelerometer data.
        - activity: DataFrame with activity logs (excluding inactive periods).
        - inactivity_data: DataFrame indicating inactivity periods.
        - window_size: Size of the moving window for DFA calculation.
        - step_size: Step size for sliding window analysis.
        """
        self.df = df
        self.activity = activity[activity['activite'] != "Inactif"]  # Exclude inactivity periods
        self.inactivity_data = inactivity_data
        self.window_size = window_size
        self.step_size = step_size
        self.dfa_results = {}
        self.dfa_min = None
        self.dfa_max = None
    
    def filter_active_periods(self):
        """Filters out inactive periods from the dataset."""
        self.df['epoch'] = pd.to_datetime(self.df['epoch'])
        self.inactivity_data['epoch'] = pd.to_datetime(self.inactivity_data['epoch'])
        
        active_df = self.df.merge(self.inactivity_data[['epoch', 'inactive']], on='epoch', how='left')

        # Ensure correct column names after merging
        if 'inactive_y' in active_df.columns:
            active_df.rename(columns={'inactive_y': 'inactive'}, inplace=True)
        if 'inactive_x' in active_df.columns:
            active_df.drop(columns=['inactive_x'], inplace=True)

        active_df['inactive'].fillna(False, inplace=True)
        return active_df[~active_df['inactive']].drop(columns=['inactive'])  # Keep only active periods
    
    def compute_dfa(self):
        """Computes the DFA exponent over active periods, excluding inactivity."""
        self.df['date'] = self.df['timestamp'].dt.date  
        unique_dates = self.df['date'].unique()
        dfa_all_values = []

        for day in unique_dates:
            active_df = self.filter_active_periods()
            active_daily_df = active_df[active_df['date'] == day]

            series = active_daily_df['acceleration'].values
            dfa_values = []
            time_stamps = []

            for i in range(0, len(series) - self.window_size, self.step_size):
                window = series[i:i + self.window_size]
                if len(window) == self.window_size:
                    alpha = dfa(window)
                    dfa_values.append(alpha)
                    time_stamps.append(active_daily_df['timestamp'].iloc[i])
                    dfa_all_values.append(alpha)

            self.dfa_results[day] = {'timestamps': time_stamps, 'dfa_values': dfa_values}

        # Set a common scale for all plots
        if dfa_all_values:
            self.dfa_min = min(dfa_all_values)
            self.dfa_max = max(dfa_all_values)
    
    def visualize_dfa_plotly(self):
        """Displays DFA exponent over time with activity background using Plotly."""
        color_map = sns.color_palette("husl", n_colors=len(self.activity['activite'].unique()))
        activity_colors = {act: f'rgba({int(r*255)}, {int(g*255)}, {int(b*255)}, 0.5)' 
                           for act, (r, g, b) in zip(self.activity['activite'].unique(), color_map)}
        
        for day, results in self.dfa_results.items():
            fig = go.Figure()

            # Plot DFA exponent
            fig.add_trace(go.Scatter(
                x=results['timestamps'], y=results['dfa_values'],
                mode='lines+markers', name='DFA Exponent'
            ))

            # Add background activities
            daily_activities = self.activity[self.activity['debut'].dt.date == day]
            for _, row in daily_activities.iterrows():
                fig.add_shape(
                    type='rect', x0=row['debut'], x1=row['fin'], 
                    y0=self.dfa_min, y1=self.dfa_max,  # Fixed scale
                    fillcolor=activity_colors.get(row['activite'], 'rgba(100,100,100,0.3)'),
                    line=dict(width=0), opacity=0.5
                )
                fig.add_annotation(
                    x=row['debut'], y=self.dfa_max,
                    text=row['activite'], showarrow=False, font=dict(size=10, color='black'),
                    bgcolor='rgba(255,255,255,0.7)'
                )

            fig.update_layout(
                title=f'DFA Exponent Over Time - {day}',
                xaxis_title='Time', yaxis_title='DFA Exponent',
                hovermode='x unified', template='plotly_white',
                yaxis=dict(range=[self.dfa_min, self.dfa_max])  # Apply common scale
            )
            fig.show()

# Usage
detector = InactivityDetector(df_resampled, activity)
detector.detect_inactivity()
inactivity_data = detector.processed_data

detector.processed_data['timestamp'] = detector.processed_data['epoch']
dfa_analysis = DFAAnalysis(detector.processed_data, activity, inactivity_data, window_size=300, step_size=50)
dfa_analysis.compute_dfa()
dfa_analysis.visualize_dfa_plotly()


In [None]:
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import plotly.graph_objects as go


# ÉTAPE 1 : Calcul du DFA exponentiel
class DFAAnalysis:
    def __init__(self, df, activity, inactivity_data, window_size=300, step_size=50):
        """
        Initializes the DFAAnalysis class.
        
        Parameters:
        - df: DataFrame containing time-series accelerometer data.
        - activity: DataFrame with activity logs (excluding inactive periods).
        - inactivity_data: DataFrame indicating inactivity periods.
        - window_size: Size of the moving window for DFA calculation.
        - step_size: Step size for sliding window analysis.
        """
        self.df = df
        self.activity = activity[activity['activite'] != "Inactif"]  # Exclude inactivity periods
        self.inactivity_data = inactivity_data
        self.window_size = window_size
        self.step_size = step_size
        self.dfa_results = {}
        self.dfa_min = None
        self.dfa_max = None
    
    def filter_active_periods(self):
        """Filters out inactive periods from the dataset."""
        self.df['epoch'] = pd.to_datetime(self.df['epoch'])
        self.inactivity_data['epoch'] = pd.to_datetime(self.inactivity_data['epoch'])
        
        active_df = self.df.merge(self.inactivity_data[['epoch', 'inactive']], on='epoch', how='left')

        # Ensure correct column names after merging
        if 'inactive_y' in active_df.columns:
            active_df.rename(columns={'inactive_y': 'inactive'}, inplace=True)
        if 'inactive_x' in active_df.columns:
            active_df.drop(columns=['inactive_x'], inplace=True)

        active_df['inactive'].fillna(False, inplace=True)
        return active_df[~active_df['inactive']].drop(columns=['inactive'])  # Keep only active periods
    
    def compute_dfa(self):
        """Computes the DFA exponent over active periods, excluding inactivity."""
        self.df['date'] = self.df['timestamp'].dt.date  
        unique_dates = self.df['date'].unique()
        dfa_all_values = []

        for day in unique_dates:
            active_df = self.filter_active_periods()
            active_daily_df = active_df[active_df['date'] == day]

            series = active_daily_df['acceleration'].values
            dfa_values = []
            time_stamps = []

            for i in range(0, len(series) - self.window_size, self.step_size):
                window = series[i:i + self.window_size]
                if len(window) == self.window_size:
                    alpha = dfa(window)  # DFA function must be defined elsewhere
                    dfa_values.append(alpha)
                    time_stamps.append(active_daily_df['timestamp'].iloc[i])
                    dfa_all_values.append(alpha)

            self.dfa_results[day] = {'timestamps': time_stamps, 'dfa_values': dfa_values}

        # Set a common scale for all plots
        if dfa_all_values:
            self.dfa_min = min(dfa_all_values)
            self.dfa_max = max(dfa_all_values)
    
    def visualize_dfa_plotly(self):
        """Displays DFA exponent over time with activity background using Plotly."""
        color_map = sns.color_palette("husl", n_colors=len(self.activity['activite'].unique()))
        activity_colors = {act: f'rgba({int(r*255)}, {int(g*255)}, {int(b*255)}, 0.5)' 
                           for act, (r, g, b) in zip(self.activity['activite'].unique(), color_map)}
        
        for day, results in self.dfa_results.items():
            fig = go.Figure()

            # Plot DFA exponent
            fig.add_trace(go.Scatter(
                x=results['timestamps'], y=results['dfa_values'],
                mode='lines+markers', name='DFA Exponent'
            ))

            # Add background activities
            daily_activities = self.activity[self.activity['debut'].dt.date == day]
            for _, row in daily_activities.iterrows():
                fig.add_shape(
                    type='rect', x0=row['debut'], x1=row['fin'], 
                    y0=self.dfa_min, y1=self.dfa_max,  
                    fillcolor=activity_colors.get(row['activite'], 'rgba(100,100,100,0.3)'),
                    line=dict(width=0), opacity=0.5
                )
                fig.add_annotation(
                    x=row['debut'], y=self.dfa_max,
                    text=row['activite'], showarrow=False, font=dict(size=10, color='black'),
                    bgcolor='rgba(255,255,255,0.7)'
                )

            fig.update_layout(
                title=f'DFA Exponent Over Time - {day}',
                xaxis_title='Time', yaxis_title='DFA Exponent',
                hovermode='x unified', template='plotly_white',
                yaxis=dict(range=[self.dfa_min, self.dfa_max])
            )
            fig.show()


# ÉTAPE 2 : Comparaison des DFA exponentiels entre occurrences d’une même activité

def compute_activity_correlation(dfa_analysis):
    """
    Calcule la corrélation de Pearson entre les DFA exponentiels des occurrences 
    d'une même activité et génère une heatmap pour visualiser la similarité.
    """
    activity_dfa = {}

    # Récupérer les DFA exponentiels pour chaque occurrence d'une activité
    for day, results in dfa_analysis.dfa_results.items():
        timestamps = np.array(results['timestamps'])  # Conversion en tableau NumPy
        dfa_values = np.array(results['dfa_values'])  # Conversion en tableau NumPy
        
        for _, row in dfa_analysis.activity[dfa_analysis.activity['debut'].dt.date == day].iterrows():
            activity = row['activite']
            mask = (timestamps >= row['debut']) & (timestamps <= row['fin'])  # Masque corrigé
            
            if np.any(mask):
                if activity not in activity_dfa:
                    activity_dfa[activity] = []
                activity_dfa[activity].append(dfa_values[mask])  # Stocker les valeurs filtrées

    # Générer une heatmap pour chaque activité ayant au moins 2 occurrences
    for activity, dfa_series in activity_dfa.items():
        if len(dfa_series) < 2:
            continue  

        # Normalisation optionnelle (Z-score)
        dfa_series = [((series - np.mean(series)) / np.std(series)) if np.std(series) != 0 else series for series in dfa_series]

        # Construire la matrice de corrélation
        dfa_matrix = np.array([np.interp(np.linspace(0, 1, 100), np.linspace(0, 1, len(series)), series) for series in dfa_series])
        corr_matrix = np.corrcoef(dfa_matrix)

        # Afficher la heatmap
        plt.figure(figsize=(8, 6))
        sns.heatmap(corr_matrix, annot=True, cmap="coolwarm", fmt=".2f", linewidths=0.5)
        plt.title(f"Corrélation DFA - Activité: {activity}")
        plt.xlabel("Occurrence")
        plt.ylabel("Occurrence")
        plt.show()

detector = InactivityDetector(df_resampled, activity)
detector.detect_inactivity()
inactivity_data = detector.processed_data

detector.processed_data['timestamp'] = detector.processed_data['epoch']
dfa_analysis = DFAAnalysis(detector.processed_data, activity, inactivity_data, window_size=300, step_size=50)
dfa_analysis.compute_dfa()
dfa_analysis.visualize_dfa_plotly()

#  heatmap 
compute_activity_correlation(dfa_analysis)


In [None]:
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
import pandas as pd
from scipy.spatial.distance import cdist

def compute_activity_distance(dfa_analysis):
    """
    Calcule la distance euclidienne entre les DFA exponentiels des occurrences 
    d'une même activité et génère une heatmap pour visualiser les différences.
    """
    activity_dfa = {}

    # Récupérer les DFA exponentiels pour chaque occurrence d'une activité
    for day, results in dfa_analysis.dfa_results.items():
        timestamps = np.array(results['timestamps'])  # Conversion en tableau NumPy
        dfa_values = np.array(results['dfa_values'])  # Conversion en tableau NumPy
        
        for _, row in dfa_analysis.activity[dfa_analysis.activity['debut'].dt.date == day].iterrows():
            activity = row['activite']
            mask = (timestamps >= row['debut']) & (timestamps <= row['fin'])  # Masque pour filtrer
            
            if np.any(mask):
                if activity not in activity_dfa:
                    activity_dfa[activity] = []
                activity_dfa[activity].append(dfa_values[mask])  # Stocker les valeurs filtrées

    # Générer une heatmap pour chaque activité ayant au moins 2 occurrences
    for activity, dfa_series in activity_dfa.items():
        if len(dfa_series) < 2:
            continue  # Passer si l'activité n'a pas assez d'occurrences

        # Interpolation pour aligner les longueurs
        dfa_matrix = np.array([np.interp(np.linspace(0, 1, 100), np.linspace(0, 1, len(series)), series) for series in dfa_series])

        # Calculer la distance euclidienne entre les occurrences
        dist_matrix = cdist(dfa_matrix, dfa_matrix, metric='euclidean')

        # Afficher la heatmap
        plt.figure(figsize=(8, 6))
        sns.heatmap(dist_matrix, annot=True, cmap="coolwarm", fmt=".2f", linewidths=0.5)
        plt.title(f"Distance Euclidienne DFA - Activité: {activity}")
        plt.xlabel("Occurrence")
        plt.ylabel("Occurrence")
        plt.show()


# Lancement du programme avec Distance Euclidienne

compute_activity_distance(dfa_analysis)

In [None]:
from dtw import accelerated_dtw

def compute_activity_dtw(dfa_analysis):
    """
    Calcule la distance DTW entre les DFA exponentiels des occurrences 
    d'une même activité et génère une heatmap pour visualiser la similarité.
    """
    activity_dfa = {}

    # Récupérer les DFA exponentiels pour chaque occurrence d'une activité
    for day, results in dfa_analysis.dfa_results.items():
        timestamps = np.array(results['timestamps'])  # Conversion en tableau NumPy
        dfa_values = np.array(results['dfa_values'])  # Conversion en tableau NumPy
        
        for _, row in dfa_analysis.activity[dfa_analysis.activity['debut'].dt.date == day].iterrows():
            activity = row['activite']
            mask = (timestamps >= row['debut']) & (timestamps <= row['fin'])  # Masque pour filtrer
            
            if np.any(mask):
                if activity not in activity_dfa:
                    activity_dfa[activity] = []
                activity_dfa[activity].append(dfa_values[mask])  # Stocker les valeurs filtrées

    # Générer une heatmap pour chaque activité ayant au moins 2 occurrences
    for activity, dfa_series in activity_dfa.items():
        if len(dfa_series) < 2:
            continue  # Passer si l'activité n'a pas assez d'occurrences

        # Calcul de la matrice DTW
        n = len(dfa_series)
        dtw_matrix = np.zeros((n, n))

        for i in range(n):
            for j in range(n):
                if i != j:
                    dtw_matrix[i, j], _, _, _ = accelerated_dtw(dfa_series[i], dfa_series[j], dist='euclidean')

        # Afficher la heatmap
        plt.figure(figsize=(8, 6))
        sns.heatmap(dtw_matrix, annot=True, cmap="coolwarm", fmt=".2f", linewidths=0.5)
        plt.title(f"Distance DTW DFA - Activité: {activity}")
        plt.xlabel("Occurrence")
        plt.ylabel("Occurrence")
        plt.show()


# Lancement du programme avec DTW

compute_activity_dtw(dfa_analysis)


In [None]:
import numpy as np
import matplotlib.pyplot as plt
from tslearn.barycenters import dtw_barycenter_averaging
from tslearn.utils import to_time_series_dataset

def create_activity_motifs(dfa_analysis):
    """
    Génère un motif unique par activité en agrégeant les données DFA de plusieurs jours.
    """
    activity_motifs = {}

    # Récupération et regroupement des DFA par activité
    activity_dfa = {}
    for day, results in dfa_analysis.dfa_results.items():
        timestamps = np.array(results['timestamps'])  # Timestamps
        dfa_values = np.array(results['dfa_values'])  # Valeurs DFA
        
        # Associer chaque activité à ses segments DFA
        for _, row in dfa_analysis.activity[dfa_analysis.activity['debut'].dt.date == day].iterrows():
            activity = row['activite']
            mask = (timestamps >= row['debut']) & (timestamps <= row['fin'])
            if np.any(mask):
                if activity not in activity_dfa:
                    activity_dfa[activity] = []
                activity_dfa[activity].append(dfa_values[mask])  # Stockage des segments DFA

    # Calcul des motifs types par activité
    for activity, dfa_series in activity_dfa.items():
        if len(dfa_series) < 2:
            print(f"⚠️ Pas assez d'occurrences pour {activity}, motif non créé.")
            continue

        # Conversion en format compatible avec tslearn
        dfa_series = [np.array(series, dtype=float) for series in dfa_series]
        dfa_series = to_time_series_dataset(dfa_series)

        # Calcul du barycentre DTW
        motif = dtw_barycenter_averaging(dfa_series)

        activity_motifs[activity] = motif

    return activity_motifs

# Création des motifs types
activity_motifs = create_activity_motifs(dfa_analysis)

# Affichage des résultats
print(" Motifs par activité générés :", activity_motifs.keys())

In [None]:
def plot_activity_motifs(activity_motifs):
    """
    Affiche les motifs types pour chaque activité.
    """
    plt.figure(figsize=(10, 5))

    for activity, motif in activity_motifs.items():
        plt.plot(motif.ravel(), label=f"Activité: {activity}")

    plt.title("Motifs types DFA par activité")
    plt.xlabel("Temps")
    plt.ylabel("Valeur DFA")
    plt.legend()
    plt.show()

plot_activity_motifs(activity_motifs)

In [None]:
df1 = pd.read_csv('../data/Marion.lnk/................csv')

column_names = df1.columns
df1.columns = ['date_time', 'X', 'Y', 'Z']
print(f"New column names: {df1.columns}")

# Convert 'date_time' to datetime format
df1['date_time'] = pd.to_datetime(df1['date_time'])
print(df1.head())

In [None]:
class InactivityDetector:
    def __init__(self, df, activity, epoch_size=1, inactivity_threshold='dynamic', delta_threshold=0.01, gravity_threshold=0.9):
        """
        Détecte les périodes d'inactivité à partir des données d'accéléromètre.
        """
        self.df = df
        self.activity = activity
        self.epoch_size = epoch_size
        self.inactivity_threshold = inactivity_threshold
        self.delta_threshold = delta_threshold
        self.gravity_threshold = gravity_threshold
        self.processed_data = None
        self._prepare_data()
    
    def _prepare_data(self):
        """Prépare les données et calcule l'accélération globale."""
        self.df['acceleration'] = np.sqrt(self.df['X']**2 + self.df['Y']**2 + self.df['Z']**2)
        self.df['timestamp'] = pd.to_datetime(self.df['date_time'])
        self.df = self.df.sort_values('timestamp')
        self.df['epoch'] = self.df['timestamp'].dt.floor(f'{self.epoch_size}s')
        self.df['date'] = self.df['timestamp'].dt.date
        self.df['delta_acc'] = self.df['acceleration'].diff().abs()
        
        self.activity['debut'] = pd.to_datetime(self.activity['debut'])
        self.activity['fin'] = pd.to_datetime(self.activity['fin'])
    
    def detect_inactivity(self):
        """Détecte les périodes d'inactivité."""
        grouped = self.df.groupby(['epoch', 'date']).agg({'acceleration': 'mean', 'delta_acc': 'mean'}).reset_index()
        
        if self.inactivity_threshold == 'dynamic':
            mean_acc = grouped['acceleration'].mean()
            std_acc = grouped['acceleration'].std()
            threshold = max(0.1, mean_acc - 0.5 * std_acc)
        else:
            threshold = self.inactivity_threshold
        
        grouped['inactive'] = ((grouped['acceleration'] < threshold) | 
                               ((grouped['delta_acc'] < self.delta_threshold) & 
                                (np.abs(grouped['acceleration'] - 1) < self.gravity_threshold)))
        grouped['inactive_group'] = (grouped['inactive'] != grouped['inactive'].shift()).cumsum()
        
        self.processed_data = grouped
        return grouped
    
# Détection de l’inactivité
detector = InactivityDetector(df1, activity)
detector.detect_inactivity()
df1_active = detector.df.merge(detector.processed_data[['epoch', 'inactive']], on='epoch', how='left')
df1_active = df1_active[~df1_active['inactive']].copy()
print("Inactivité détectée et supprimée.")

In [None]:
from tslearn.barycenters import dtw_barycenter_averaging
from tslearn.utils import to_time_series_dataset

class DFAAnalysis:
    def __init__(self, df, window_size=300, step_size=50):
        """
        Analyse DFA sur les périodes actives uniquement.
        """
        self.df = df
        self.window_size = window_size
        self.step_size = step_size
        self.dfa_results = {}

    def compute_dfa(self):
        """Calcule l’exposant DFA sur les périodes actives du DataFrame."""
        self.df['date'] = self.df['date_time'].dt.date  
        unique_dates = self.df['date'].unique()

        for day in unique_dates:
            daily_df = self.df[self.df['date'] == day]
            series = daily_df['acceleration'].values
            dfa_values = []
            time_stamps = []

            for i in range(0, len(series) - self.window_size, self.step_size):
                window = series[i:i + self.window_size]
                if len(window) == self.window_size:
                    alpha = np.std(window)  # Simplification temporaire du calcul DFA
                    dfa_values.append(alpha)
                    time_stamps.append(daily_df['date_time'].iloc[i])

            self.dfa_results[day] = {'timestamps': time_stamps, 'dfa_values': dfa_values}

# Calcul du DFA
dfa_analysis = DFAAnalysis(df1_active)
dfa_analysis.compute_dfa()
print("DFA calculé sur les périodes actives.")


In [None]:
import numpy as np
import pandas as pd
from tslearn.metrics import dtw
from tslearn.utils import to_time_series

def compute_similarity_score(segment, motif):
    """
    Calcule la similarité entre un segment DFA et un motif type en utilisant DTW.
    """
    distance = dtw(segment, motif)
    max_distance = max(np.linalg.norm(segment), np.linalg.norm(motif))
    similarity = 1 - (distance / max_distance) if max_distance > 0 else 0
    return similarity

def recognize_activities(dfa_analysis, activity_motifs, similarity_threshold=0.8):
    """
    Identifie les activités sur le second jeu de données en comparant
    chaque segment DFA aux motifs types avec un seuil de similarité.
    """
    recognized_activities = []
    
    for day, results in dfa_analysis.dfa_results.items():
        timestamps = np.array(results['timestamps'])
        dfa_values = np.array(results['dfa_values'])
        
        for i, segment in enumerate(dfa_values):
            best_match = None
            best_score = 0
            
            for activity, motif in activity_motifs.items():
                similarity = compute_similarity_score(segment, motif)
                if similarity > best_score and similarity >= similarity_threshold:
                    best_score = similarity
                    best_match = activity
            
            if best_match:
                recognized_activities.append({
                    'timestamp': timestamps[i],
                    'activity': best_match,
                    'similarity': best_score
                })
    
    return pd.DataFrame(recognized_activities)

# Reconnaissance des activités
recognized_activities_df = recognize_activities(dfa_analysis, activity_motifs, similarity_threshold=0.8)

# Affichage des résultats
def display_recognized_activities(recognized_activities_df):
    import matplotlib.pyplot as plt
    
    plt.figure(figsize=(12, 5))
    for activity in recognized_activities_df['activity'].unique():
        subset = recognized_activities_df[recognized_activities_df['activity'] == activity]
        plt.scatter(subset['timestamp'], [activity] * len(subset), label=activity)
    
    plt.xlabel("Temps")
    plt.ylabel("Activité reconnue")
    plt.title("Activités reconnues sur le second jeu de données")
    plt.legend()
    plt.xticks(rotation=45)
    plt.show()

display_recognized_activities(recognized_activities_df)


In [None]:
def compute_similarity_score(segment, motif):
    """
    Calcule la similarité entre un segment DFA et un motif type en utilisant DTW.
    """
    distance = dtw(segment, motif)
    max_distance = max(np.linalg.norm(segment), np.linalg.norm(motif))
    similarity = 1 - (distance / max_distance) if max_distance > 0 else 0
    return similarity

def recognize_activities(dfa_analysis, activity_motifs, similarity_threshold=0.8):
    """
    Identifie les activités sur le second jeu de données en comparant
    chaque segment DFA aux motifs types avec un seuil de similarité.
    """
    recognized_activities = []
    
    for day, results in dfa_analysis.dfa_results.items():
        timestamps = np.array(results['timestamps'])
        dfa_values = np.array(results['dfa_values'])
        
        for i, segment in enumerate(dfa_values):
            best_match = None
            best_score = 0
            
            for activity, motif in activity_motifs.items():
                similarity = compute_similarity_score(segment, motif)
                if similarity > best_score and similarity >= similarity_threshold:
                    best_score = similarity
                    best_match = activity
            
            if best_match:
                recognized_activities.append({
                    'timestamp': timestamps[i],
                    'activity': best_match,
                    'similarity': best_score,
                    'date': timestamps[i].date()
                })
    
    return pd.DataFrame(recognized_activities)

# Reconnaissance des activités
recognized_activities_df = recognize_activities(dfa_analysis, activity_motifs, similarity_threshold=0.8)

# Affichage des résultats avec courbe d'accélération, périodes d'inactivité et bandes colorées pour les activités
def display_recognized_activities_with_acceleration(df, recognized_activities_df, inactivity_detector):
    color_map = px.colors.qualitative.Set1
    activity_types = recognized_activities_df['activity'].unique()
    activity_colors = {activity: color_map[i % len(color_map)] for i, activity in enumerate(activity_types)}
    
    for day in recognized_activities_df['date'].unique():
        daily_data = df[df['date_time'].dt.date == day]
        daily_activities = recognized_activities_df[recognized_activities_df['date'] == day]
        daily_inactivity = inactivity_detector.processed_data[inactivity_detector.processed_data['date'] == day]
        
        fig = go.Figure()
        
        # Courbe d'accélération
        fig.add_trace(go.Scatter(
            x=daily_data['date_time'], y=daily_data['acceleration'],
            mode='lines', name='Acceleration', line=dict(color='blue', width=2)
        ))
        
        # Ajout des périodes d'inactivité en transparence
        inactive_added = False
        for _, group in daily_inactivity.groupby('inactive_group'):
            if group['inactive'].iloc[0]:
                fig.add_trace(go.Scatter(
                    x=group['epoch'], y=group['acceleration'],
                    mode='lines', fill='tozeroy',
                    fillcolor='rgba(255, 0, 0, 0.2)',
                    line=dict(width=0),
                    name='Inactive Periods' if not inactive_added else None,
                    showlegend=not inactive_added
                ))
                inactive_added = True
        
        # Ajout des activités reconnues sous forme de bandes colorées
        for _, row in daily_activities.iterrows():
            activity_color = activity_colors.get(row['activity'], 'rgba(0, 100, 255, 0.3)')
            fig.add_shape(
                type="rect", x0=row['timestamp'], x1=row['timestamp'] + pd.Timedelta(seconds=10),
                y0=daily_data['acceleration'].min(), y1=daily_data['acceleration'].max(),
                fillcolor=activity_color, line=dict(width=0), opacity=0.3
            )
            fig.add_annotation(
                x=row['timestamp'], y=daily_data['acceleration'].max(),
                text=row['activity'], showarrow=False,
                font=dict(size=10, color="black"),
                bgcolor="rgba(255,255,255,0.7)"
            )
        
        fig.update_layout(
            title=f"Inactivity Periods, Acceleration & Recognized Activities - {day}",
            xaxis_title="Time", yaxis_title="Acceleration",
            hovermode="x", template="plotly_white"
        )
        fig.show()

display_recognized_activities_with_acceleration(df1, recognized_activities_df, detector)

In [None]:
class DFAAnalysis:
    def __init__(self, df, activity, inactivity_data, window_size=300, step_size=50):
        """
        Initializes the DFAAnalysis class.

        Parameters:
        - df: DataFrame containing accelerometer time-series data.
        - activity: DataFrame containing activity logs.
        - inactivity_data: DataFrame indicating inactive periods.
        - window_size: Number of points per DFA window.
        - step_size: Step size for the sliding DFA window.
        """
        self.df = df
        self.activity = activity
        self.inactivity_data = inactivity_data
        self.window_size = window_size
        self.step_size = step_size
        self.dfa_results = {}
        self.activity_colors = self.generate_activity_colors()

    def generate_activity_colors(self):
        """Generates a unique color for each activity type."""
        unique_activities = self.activity['activite'].unique()
        color_palette = plt.colormaps.get_cmap("tab10")
        return {activity: color_palette(i / len(unique_activities)) for i, activity in enumerate(unique_activities)}

    def filter_active_periods(self):
        """Filters the DataFrame to exclude inactive periods."""
        
        self.df['epoch'] = pd.to_datetime(self.df['epoch'])
        self.inactivity_data['epoch'] = pd.to_datetime(self.inactivity_data['epoch'])

        active_df = self.df.merge(self.inactivity_data[['epoch', 'inactive']], on='epoch', how='left')

        if 'inactive_y' in active_df.columns:
            active_df.rename(columns={'inactive_y': 'inactive'}, inplace=True)
        if 'inactive_x' in active_df.columns:
            active_df.drop(columns=['inactive_x'], inplace=True)
        active_df['inactive'].fillna(False, inplace=True)
        active_df = active_df[active_df['inactive'] == False].drop(columns=['inactive'])
        return active_df
    
    def compute_dfa(self):
        """Computes the DFA exponent over active periods using a sliding window."""
        self.df['date'] = self.df['timestamp'].dt.date
        unique_dates = self.df['date'].unique()

        for day in unique_dates:
            daily_df = self.df[self.df['date'] == day]
            active_df = self.filter_active_periods()
            active_daily_df = active_df[active_df['date'] == day]
            series = active_daily_df['acceleration'].values
            dfa_values = []
            time_stamps = []

            for i in range(0, len(series) - self.window_size, self.step_size):
                window = series[i:i + self.window_size]
                if len(window) == self.window_size:
                    alpha = dfa(window)
                    dfa_values.append(alpha)
                    time_stamps.append(active_daily_df['timestamp'].iloc[i])

            self.dfa_results[day] = {'timestamps': time_stamps, 'dfa_values': dfa_values}

    def visualize_dfa(self):
        """Plots the DFA exponent over time for each day with activity periods highlighted and improved legend."""
        all_values = [val for res in self.dfa_results.values() for val in res['dfa_values']]
        min_dfa, max_dfa = min(all_values), max(all_values)

        for day, results in self.dfa_results.items():
            plt.figure(figsize=(12, 5))
            plt.plot(results['timestamps'], results['dfa_values'], marker='o', linestyle='-', label='DFA Exponent')
            
            legend_handles = []
            activity_labels = set()
            
            for _, row in self.activity.iterrows():
                if row['debut'].date() == day and row['activite'] not in activity_labels:
                    legend_handles.append(mpatches.Patch(color=self.activity_colors[row['activite']], label=row['activite']))
                    activity_labels.add(row['activite'])
                    plt.axvspan(row['debut'], row['fin'], color=self.activity_colors[row['activite']], alpha=0.3)
            
            plt.xlabel('Time')
            plt.ylabel('DFA Exponent')
            plt.ylim(min_dfa, max_dfa)
            plt.title(f'DFA Exponent Over Time - {day}')
            plt.legend(handles=legend_handles, title="Activities", loc='upper right', bbox_to_anchor=(1.15, 1))
            plt.show()

    def correlate_dfa_with_activities(self):
        """Associates DFA values with recorded activities."""
        activity_dfa_mapping = {activity: [] for activity in self.activity['activite'].unique()}

        for day, results in self.dfa_results.items():
            for timestamp, alpha in zip(results['timestamps'], results['dfa_values']):
                matching_activities = self.activity[(self.activity['debut'] <= timestamp) & (self.activity['fin'] >= timestamp)]
                for _, row in matching_activities.iterrows():
                    activity_dfa_mapping[row['activite']].append(alpha)

        return activity_dfa_mapping

    def plot_activity_dfa_distribution(self):
        """Plots the distribution of DFA exponent values for different activities."""
        activity_dfa_mapping = self.correlate_dfa_with_activities()
        df_dfa = pd.DataFrame([(act, alpha) for act, alphas in activity_dfa_mapping.items() for alpha in alphas], 
                              columns=['Activity', 'DFA'])

        plt.figure(figsize=(12, 6))
        sns.boxplot(data=df_dfa, x='Activity', y='DFA')
        plt.xticks(rotation=90)
        plt.title('DFA Exponent Distribution by Activity')
        plt.show()


#usage
detector = InactivityDetector(df_resampled, activity)
detector.detect_inactivity()
inactivity_data = detector.processed_data
detector.processed_data['timestamp'] = detector.processed_data['epoch']
dfa_analysis = DFAAnalysis(detector.processed_data, activity, inactivity_data, window_size=300, step_size=50)
dfa_analysis.compute_dfa()
dfa_analysis.visualize_dfa()
dfa_analysis.plot_activity_dfa_distribution()
