# Implementazione del task 3 - From TimeSeries to Events

In generale, un dataset contiene due tipi di dati temporali (su un dominio del tempo $\mathbb{T}$):

- Dati basati su **serie temporali**, che possono essere visti come un mapping $ts: \mathbb{T} \rightarrow \mathbb{R}^m$ (ad esempio, le calorie nel dataset *fitbit*).

- Dati basati su **eventi**, che possono essere visti come un sottoinsieme di $E\subseteq \mathbb{T}\times \mathbb{R}^m \times \mathbb{L}^n$ dove per definizione $E$ non copre necessariamente tutti i possibili timestamp in $\mathbb{T}$ (ad esempio, gli esercizi nel dataset *fitbit*).

In [1]:
# IMPORT
import nbimporter
import ETLBasics_t1 as task1
import ProfilingBasics_t2 as task2
import datetime
import dateutil as du
import numpy as np
import pandas as pd

In [2]:
# VARIABILI
PATH = './pmdata/'
people = [1,2,3]

In [None]:
# IPER-PARAMETRI

# raggruppamento task 3
K_GROUP_DAY = 1
K_GROUP_HOUR = 1

### Implementazione del task 1

In [3]:
sedentary_minutes = task1.sedentary_minutes_to_df(PATH, people)
sleep_0, sleep_1 = task1.sleep_to_df(PATH, people)
exercise_0, exercise_1 = task1.exercise_to_df(PATH, people)
lightly_active_minutes = task1.lightly_active_minutes_to_df(PATH, people)
time_in_heart_rate_zones = task1.time_in_heart_rate_zones_to_df(PATH, people)
moderately_active_minutes = task1.moderately_active_minutes_to_df(PATH, people)
very_active_minutes = task1.very_active_minutes_to_df(PATH, people)
resting_heart_rate = task1.resting_heart_rate_to_df(PATH, people)
srpe = task1.srpe_to_df(PATH, people)
wellness = task1.wellness_to_df(PATH, people)
injury = task1.injury_to_df(PATH, people)

try:
    steps = pd.read_pickle("dataframes/steps.pkl").loc[people]
    distance = pd.read_pickle("dataframes/distance.pkl").loc[people]
    calories = pd.read_pickle("dataframes/calories.pkl").loc[people]
    heart_rate = pd.read_pickle("dataframes/heart_rate.pkl").loc[people]
except:
    steps = task1.steps_to_df(PATH, people)
    distance = task1.distance_to_df(PATH, people)
    calories = task1.calories_to_df(PATH, people)
    heart_rate = task1.heart_rate_to_df(PATH, people)

## From TimeSeries to Events

Ai fini dell'analisi che vogliamo implementare, gli eventi sono i più adatti ad essere rappresentati come transazioni temporali. Tuttavia, vogliamo prendere in considerazione nelle nostre transazioni temporali anche le informazioni provenienti dalle serie temporali. Un modo intuitivo per fare ciò consiste nel trasformare la serie temporale $ts: \mathbb{T} \rightarrow \mathbb{R}^m$ in un insieme di eventi $E_{ts} \subseteq \mathbb{T} \times \mathbb{R}^m \times \mathbb{L}^n$ dove $(t, r_1, \ldots, r_m, l_1, \ldots, l_n)$ rappresenta un comportamento specifico di $ts$ intorno al tempo $t$ (ad esempio, un intervallo di tempo di lunghezza $5$ minuti centrato in $t$).

Notiamo infatti che nei dati grezzi di alcune serie temporali la granularità è per *minuto*; questo significa che la serie temporale può essere opportunamente trasformata in una serie temporale con una granularità più grossolana (15 minuti, mezz'ora, 1 ora) aggregando i dati nello stesso gruppo tramite il calcolo di media e deviazione standard.

In [4]:
# NOTA: non tutti i DataFrame sono serie temporali poiché in alcuni sono presenti attributi categorici.
#       Questi DataFrame sono quindi già in forma di Evento.
def is_ts(df):
    '''
    Secondo la definizione teorica una serie temporale è composta di soli attributi continui.
    La condizione non è sufficiente perché la serie temporale ha anche i valori equamente distanziati nel tempo. 
    '''
    return len(task2.get_continuous_attributes(df)) == len(df.columns)

def from_ts_to_event_based_data(df, aggregation_level='HOUR', k=1, addit_cols=True):
    '''
    Trasformiamo la serie temporale in evento andando ad aggregare il dataframe in base ad 'aggregation_level'.
    Le colonne sono continue (dato che abbiamo una serie temporale) per cui andiamo a calcolare media e std in
    modo da ridurre considerevolemente il numero di righe all'interno del dataframe.
    '''
    k_aggregate_map = {'DAY': 'MONTH', 'HOUR': 'DAY', 'MINUTE': 'HOUR'}

    df = df.copy()
    columns = list(df.columns) # manteniamo gli attributi continui
    df['p_id'] = [x[0] for x in list(df.index)]
    
    if aggregation_level == 'DAY':
        df['DAY'] = [str(x[1])[0:10] for x in list(df.index)]
        
        # Con k>1 uso 3 colonne per fare il raggruppamento: serve quindi la colonna del 'livello superiore'.
        # Ad esempio, 'hour' ha come livello superiore 'day' mentre 'day' ha 'month'.
        # Tale informazione viene perciò recuperata dalla variabile k_aggregate_map inizializzata all'inizio della funzione.
        if k > 1:
            # In int(x[8:10])-1 il -1 serve perché i giorni cominciano per 1.
            # Se dunque aggreghiamo per k=2 osserviamo che 1//2-->0 e 2//2-->1.
            # Cioè il primo bin è in difetto di 1.
            df[f'{k}-DAY'] = df['DAY'].apply(lambda x: (int(x[8:10])-1)//k)
            df[k_aggregate_map['DAY']] = [str(x[1])[0:7] for x in list(df.index)]

    elif aggregation_level == 'HOUR':
        df['HOUR'] = [str(x[1])[0:13] for x in list(df.index)]
        
        if k > 1:
            df[f'{k}-HOUR'] = df['HOUR'].apply(lambda x: int(x[11:13])//k)
            df[k_aggregate_map['HOUR']] = [str(x[1])[0:10] for x in list(df.index)]
    
    elif aggregation_level == 'MINUTE':
        df['MINUTE'] = [str(x[1])[0:16] for x in list(df.index)]
        
        if k > 1:
            df[f'{k}-MINUTE'] = df['MINUTE'].apply(lambda x: int(x[14:16])//k)
            df[k_aggregate_map['MINUTE']] = [str(x[1])[0:13] for x in list(df.index)]
    
    
    def std(x): return np.std(x)
    
    agg_parameter = {}
    for c in columns: # per ogni attributo continuo
        agg_parameter[c] = ['mean', std]  # lista di funzioni da usare per il raggruppamento dei dati
    
    
    # Se specifico un k allora raggruppo secondo un'ulteriore colonna (aggregation_level // k)
    if k > 1:
        df_agg = df.groupby(['p_id', k_aggregate_map[aggregation_level], f'{k}-{aggregation_level}']).agg(agg_parameter, axis="columns")
        if aggregation_level == 'DAY':
            df_agg[aggregation_level] = [ f'{x[1]}-{(k*x[2]+1):02d}' for x in list(df_agg.index)]
        elif aggregation_level == 'HOUR':
            df_agg[aggregation_level] = [ f'{x[1]} {(k*x[2]):02d}' for x in list(df_agg.index)]
        else:
            df_agg[aggregation_level] = [ f'{x[1]}:{(k*x[2]):02d}' for x in list(df_agg.index)]
    else:
        df_agg = df.groupby(['p_id', aggregation_level]).agg(agg_parameter, axis="columns")
    
    for c in columns: # per gli attributi continui
        df_agg[c+'_mean'] = df_agg[c]['mean']
        df_agg[c+'_std'] = df_agg[c]['std']
        df_agg = df_agg.drop([c], axis=1)

    # A questo punto abbiamo un multi-indice nato dall'aggregation function: lo rimuoviamo e teniamo solo p_id
    if k > 1:
        df_agg = df_agg.reset_index()
        df_agg = df_agg.drop([k_aggregate_map[aggregation_level], f'{k}-{aggregation_level}'], axis=1, level=0)
        df_agg = df_agg.set_index('p_id')
    else:
        df_agg = df_agg.reset_index(level=aggregation_level)

    # Rinominiamo le colonne (per il clustering)
    df_agg.columns = list(map(''.join, df_agg.columns))

    # Colonne addizionali per livello DAY o minore
    if addit_cols:
        if aggregation_level != 'DAY':
            df_agg['hour'] = df_agg[aggregation_level].apply(lambda x: int(x[11:13]))
            df_agg['half_day'] = df_agg['hour'].apply(lambda x: int(x/12))
            df_agg['eight_part_day'] = df_agg['hour'].apply(lambda x: int(x/3))

        # queste colonne andranno aggiunte a tutti
        df_agg['week_day'] = df_agg[aggregation_level].apply(lambda x: du.parser.parse(x).strftime('%A'))
        df_agg['weekend']= df_agg['week_day'].apply(lambda x: True if x in ['Saturday', 'Sunday'] else False)

    return df_agg

## Esempi

In base alla struttura dei dati profilata nel task 2 possiamo capire quali parametri usare per aggregare diverse serie temporali. Ad esempio, 2 giorni per estrarre eventi dalla serie della frequenza cardiaca a riposo e 4 ore per estrarre eventi dalla serie delle calorie.

In [5]:
from_ts_to_event_based_data(resting_heart_rate,'DAY',2)

Unnamed: 0_level_0,DAY,resting_heart_rate_mean,resting_heart_rate_std,resting_heart_rate_error_mean,resting_heart_rate_error_std,week_day,weekend
p_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
1,2019-11-01,53.311285,0.429788,6.789773,0.001349,Friday,False
1,2019-11-03,53.766582,0.544559,6.787383,0.000148,Sunday,True
1,2019-11-05,53.231150,0.972039,6.787120,0.000016,Tuesday,False
1,2019-11-07,53.216032,0.126029,6.787090,0.000002,Thursday,False
1,2019-11-09,52.730650,0.310029,6.787087,0.000000,Saturday,True
...,...,...,...,...,...,...,...
3,2020-03-23,55.573131,0.443683,7.411790,0.321360,Monday,False
3,2020-03-25,28.361014,28.361014,3.599927,3.599927,Wednesday,False
3,2020-03-27,29.594325,29.594325,3.460969,3.460969,Friday,False
3,2020-03-29,58.274641,0.769284,7.026996,0.121592,Sunday,True


In [6]:
from_ts_to_event_based_data(calories,'HOUR',4).iloc[0:20]

Unnamed: 0_level_0,HOUR,calories_mean,calories_std,hour,half_day,eight_part_day,week_day,weekend
p_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
1,2019-11-01 00,1.394667,0.03349,0,0,0,Friday,False
1,2019-11-01 04,2.48425,2.340558,4,0,1,Friday,False
1,2019-11-01 08,2.334208,1.465262,8,0,2,Friday,False
1,2019-11-01 12,3.441208,2.783621,12,1,4,Friday,False
1,2019-11-01 16,3.540792,2.966757,16,1,5,Friday,False
1,2019-11-01 20,3.509458,2.89922,20,1,6,Friday,False
1,2019-11-02 00,1.39525,0.034604,0,0,0,Saturday,True
1,2019-11-02 04,1.449875,0.314837,4,0,1,Saturday,True
1,2019-11-02 08,2.812583,2.351379,8,0,2,Saturday,True
1,2019-11-02 12,4.463208,3.249349,12,1,4,Saturday,True
