In [None]:
import pandas as pd
import csv

def read_csv(csv_raw_clock_file_path: str, data_types: dict, separator: str, quote_column: str) -> pd.DataFrame:
    """Reads the csv file and returns a pandas DataFrame. 
    The csv file should have the following columns:
    - date: the date of the clock-in
    - time: the time of the clock-in"""
    dframe = pd.read_csv(csv_raw_clock_file_path, dtype=data_types, sep=separator, quoting=csv.QUOTE_NONE, quotechar='"', converters={quote_column: lambda x: x})
    dframe[quote_column] = dframe[quote_column].str.findall(r'"([^"]*)"')    
    return dframe

df = read_csv("...", {"tzone": str}, ",", "outline")
df.name = "Complete Clocks"

In [None]:
import re

def filter_tag(tags: str, excluded_tags_re: list) -> frozenset:
    """Splits a string of tags separated by ":" into a set of tags, 
    removing the one that match any of the regexp patterns in the useless_tags_re list.
    Returns the result as a frozenset."""
    set_tags = {i for i in tags.split(':') if i}
    # skips the set of tags if it contains any of the useless tags 
    set_tags = {i for i in set_tags if not any([re.match(f, i) for f in excluded_tags_re])}
    return frozenset(set_tags)

def add_frozen_tag_column(dframe: pd.DataFrame, single_string_tags_column: str ="tags", new_tags_column: str="frozen_tags", excluded_tags_re: list=["u", "e", "@[A-z]", "travel", "nil"], inPlace=False) -> pd.DataFrame:
    """Adds a column to the DataFrame containing the tags as a frozenset.
    Expects dframe[single_string_tags_column] to have strings of tags separated by ":" (where ":" is at the beginning and end).
    Adds a new column with the tags as a frozenset, removing the tags that match any of the regexp patterns in the excluded_tags_re list.
    Calls the function filter_tag using apply on the tags column.
    Returns the DataFrame with the new column only if inPlace is False."""
    if not inPlace:
        dframe_copy = dframe.copy()
        dframe_copy[new_tags_column] = dframe_copy[single_string_tags_column].apply(lambda x: filter_tag(x, excluded_tags_re))
        return dframe_copy
    
    dframe[new_tags_column] = dframe[single_string_tags_column].apply(lambda x: filter_tag(x, excluded_tags_re))
    
df = add_frozen_tag_column(df)

In [None]:
df["frozen_tags_outline"] = df["outline"].apply(lambda x: tuple(x))

In [None]:
def parse_timestamp_string(timestamp: str, format="%Y-%m-%d %H:%M %z") -> pd.Timestamp:
    """Parses the timestamp using the format, then converts it and returns a pd.Timestamp object with uct time."""
    return pd.to_datetime(timestamp, format=format, utc=True)

def datetime_to_minute_of_day(series: pd.Series, offset:int=0) -> pd.Series:
    """Converts the datetime series to the minute of the day.
    Adds an offset to the result and returns the result modulo 24*60."""
    return series.dt.time.apply(lambda x: (x.hour*60 + x.minute + offset)%(24*60)) 

def datetime_to_day_of_week(series: pd.Series) -> pd.Series:
    """Converts the datetime series to the day of the week."""
    return series.dt.dayofweek

def datetime_to_week_of_year(series: pd.Series) -> pd.Series:
    """Converts the datetime series to the week of the year."""
    return series.dt.isocalendar().week

def datetime_to_month(series: pd.Series) -> pd.Series:
    """Converts the datetime series to the month."""
    return series.dt.month

def datetime_to_year(series: pd.Series) -> pd.Series:
    """Converts the datetime series to the year."""
    return series.dt.year

from datetime import timedelta

def add_start_and_end_timestamp_columns(dframe: pd.DataFrame, ignore_tzone: bool=True, inPlace=True) -> pd.DataFrame:
    """Adds two columns to the DataFrame containing the start and end timestamps:
    - start: the start timestamp, obtained from dframe["date"] + " " + dframe["start"] + " " + dframe["tzone"]
    - end: the end timestamp, obtained from start + dframe["duration"]
    Returns the DataFrame with the new column only if inPlace is False.
    - begin_datetime: the start timestamp as a pd.Timestamp object
    - duration_timedelta: the duration as a pd.Timedelta object
    - end_datetime: the end timestamp as a pd.Timestamp object
    - ignore_tzone: if True, ignores the timezone column"""
    if not inPlace:
        dframe = dframe.copy()
    
    if ignore_tzone:
        dframe["begin_datetime"] = parse_timestamp_string(df["date"] + " " + df["start"], format="%Y-%m-%d %H:%M")
    else:
        dframe["begin_datetime"] = parse_timestamp_string(df["date"] + " " + df["start"] + " " + df["tzone"])
        
    dframe["duration_timedelta"] = pd.to_timedelta(df["duration"], unit="m")
    dframe["end_datetime"] = dframe["begin_datetime"] + dframe["duration_timedelta"]
        
    if not inPlace:
        return dframe

add_start_and_end_timestamp_columns(df)

In [None]:
def get_diff_outliers_description(cl: pd.Series) -> str:
    """Returns a string with the mean and the standard deviation of the differences in sorted values."""
    diff = cl.sort_values().diff()
    return f"Mean: {diff.mean()}, Std: {diff.std()}"

def extract_outliers_mask(cl: pd.Series, threshold_sd: int) -> pd.Series:
    """Extracts the outliers of the Series.
    Returns a DataFrame with the outliers."""
    return (cl - cl.mean()).abs() > threshold_sd * cl.std()    

def extract_outliers_by_group_mask(dframe: pd.DataFrame, group_column: str, value_column: str, threshold_sd: int) -> pd.Series:
    """Extracts the outliers of the DataFrame by group.
    Returns a DataFrame with the outliers."""
    return dframe.groupby(group_column)[value_column].transform(lambda x: (x - x.mean()).abs() > threshold_sd * x.std())

outliers_mask = extract_outliers_by_group_mask(df, "tags", "duration_timedelta", 3)

In [None]:
df[outliers_mask].head()

In [None]:
def extract_problematic_clocks_mask(dframe: pd.DataFrame) -> pd.DataFrame:
    """Returns the DataFrame, restricted to the rows where the end_datetime is before the begin_datetime."""
    negative_duration_mask = dframe["end_datetime"] < dframe["begin_datetime"]
    zero_duration_mask = dframe["end_datetime"] == dframe["begin_datetime"]
    
    problematic = pd.DataFrame()
    
    # append one column per mask, with the value equal to that column's mask
    problematic["negative_duration_mask"] = negative_duration_mask
    problematic["zero_duration_mask"] = zero_duration_mask
    
    # check outliers beyond 3 sd, using groupby for the outline column
    problematic["outliers"] = extract_outliers_by_group_mask(df, "tags", "duration_timedelta", 4)
    
    return problematic

problematic_indices = extract_problematic_clocks_mask(df)

def print_problematic_indices_statistics(dframe: pd.DataFrame, problematic_indices: pd.DataFrame) -> None:
    """Prints the number of problematic indices and the number of problematic indices per column.
    Obtains the problematic indices using extract_problematic_clocks(dframe)."""
        
    # count the rows with any true value in the columns
    excluded = problematic_indices.any(axis=1)
    total_excluded = excluded.sum()
    
    column_justification = 25
    value_justification = 5
    
    print(f"Problematic from {len(dframe)}".rjust(column_justification) + ":  " + f"{total_excluded}, ".rjust(value_justification) + f"{total_excluded/len(df):.2%}")
    for col in problematic_indices.columns:
        # padding number of problematic indices with spaces
        print(f"{col}".rjust(column_justification) + ":" + f"{problematic_indices[col].sum()}".rjust(value_justification) + f", {problematic_indices[col].mean():.2%}")

print_problematic_indices_statistics(df, problematic_indices)

In [None]:
def extract_problematic_clocks(dframe: pd.DataFrame, problematic_indices: pd.DataFrame, problematic_column: str) -> pd.DataFrame:
    """Returns the DataFrame, restricted to the rows matching one of the problematic_columns."""
    return dframe.loc[problematic_indices[problematic_column]]

problematic_clocks = extract_problematic_clocks(df, problematic_indices, "zero_duration_mask")

In [None]:
def remove_problematic_indices(dframe: pd.DataFrame, inPlace=True) -> pd.DataFrame:
    """Removes the problematic indices from the DataFrame, following the rules in extract_problematic_clocks.
    Returns the DataFrame with the problematic indices removed, only if inPlace is False.
    Otherwise modifies the DataFrame in place."""
    
    # get the mask of the rows that are not problematic in any of the columns
    mask = ~extract_problematic_clocks_mask(dframe).any(axis=1)
    if inPlace:
        dframe.drop(index=dframe.index[~mask], inplace=True)
    else:
        dframe_extract = dframe[mask]
        return dframe_extract

remove_problematic_indices(df)

In [None]:
import matplotlib.pyplot as plt
        
def plot_histogram(series: pd.Series, bins: int=100, disable_xticks=False, title:str = "") -> plt.Figure:
    """Plots the histogram of the duration of the events.
    - If the series is of type timedelta64[ns], it converts it to nearest minute, rounded down, before plotting.
    - If the series is of type float64 or int64, it plots the histogram.
    Otherwise, it plots the value counts of the series.
    The number of bins can be set using the bins parameter.
    If disable_xticks is True, it removes the xticks. This is useful when only the shape of the histogram is needed.
    The name of the series is used as the xticks if xticks are enabled."""
    
    fig, ax = plt.subplots()

    if series.dtype == pd.Timedelta:
        (series.dt.total_seconds()/60).plot.hist(bins=bins, edgecolor="black", density=True, ax=ax)
    elif series.dtype == "float64" or series.dtype == "int64":
        series.plot.hist(bins=bins, edgecolor="black", density=True, ax=ax)
    else:  # plot value counts
        if disable_xticks:
            series.value_counts().plot(kind="bar", edgecolor="black", ylabel="Occurrences", ax=ax)
            ax.set_xticks([])
        else:
            series.value_counts().plot(kind="bar", edgecolor="black", ylabel="Occurrences", ax=ax)

    if title:
        ax.set_title(title)

    plt.close(fig)
    return fig
    
plot_histogram(df["duration"], title="Duration histogram", bins=50)

In [None]:
def get_formatted_time(df: pd.DataFrame, time_column: str, format: str, inPlace=False) -> pd.DataFrame:
    """Returns the DataFrame with the time column formatted using the format string."""
    if not inPlace:
        dframe = df.copy()
        dframe[time_column] = pd.to_datetime(dframe[time_column], format=format)
        return dframe
    df[time_column] = pd.to_datetime(df[time_column], format=format)

In [None]:
# outlines are list of nested (sub)headings, used to specify the hierarchy of the log

def get_exact_outline_mask(df: pd.DataFrame, outline: list) -> pd.DataFrame:
    """Returns a mask for the DataFrame where the outline column equals the given outline.
    - df: the DataFrame
    - outline: the outline to check for, given as a list"""
    return df['outline'].apply(lambda x: x == outline)

def get_any_outline_mask(df: pd.DataFrame, outline: str) -> pd.DataFrame:
    """Returns a mask for the DataFrame where the outline column contains the given outline.
    - df: the DataFrame
    - outline: the outline to check for, given as a string"""
    return df['outline'].apply(lambda x: outline in x)

def get_index_outline(df: pd.DataFrame, outline: str, index: int) -> pd.DataFrame:
    """Returns a mask for the DataFrame where the outline column contains the given outline at the given index.
    - df: the DataFrame
    - outline: the outline to check for, given as a string
    - index: the index of the outline to check for, which can be negative, referring to the element from last"""
    return df['outline'].apply(lambda x: x[index] == outline if (0 <= index and index < len(x)) or (-index <= len(x) and index < 0) else False)

In [None]:
def get_subset_match_tags_mask(cl: pd.Series, tags) -> pd.Series:
    """Returns the DataFrame with only the rows where the tags match the mask."""
    if isinstance(tags, str):
        tags = {tags}
    elif isinstance(tags, list):
        tags = set(tags)
    return cl.apply(lambda x: tags.issubset(x))

def remove_tags_from_column(cl: pd.Series, tags: set) -> pd.DataFrame:
    """Returns the Series with the tags removed."""
    if isinstance(tags, str):
        tags = {tags}
    elif isinstance(tags, list):
        tags = set(tags)
    return cl.apply(lambda x: x - tags)

In [None]:
# TODO automate recognising periods of time

# university lessons period
uni = [("2022-09-06", "2022-10-15"), 
        ("2022-10-26", "2022-12-07"),
        ("2023-02-06", "2023-03-11"), 
        ("2023-03-21", "2023-04-08"), # easter break
        ("2023-04-17", "2023-06-13"),
        
        ("2023-09-06", "2023-10-14"),
        ("2023-11-03", "2023-12-07"),
        ("2024-02-05", "2024-03-09"), 
        ("2024-03-20", "2024-03-29"), # easter break
        ("2024-04-11", "2024-05-11")] 

# university exam session
exa = [("2022-10-15", "2022-10-26"), 
        ("2022-12-07", "2022-12-21"), 
        ("2023-01-02", "2023-01-18"), 
        
        ("2023-03-11", "2023-03-21"), 
        ("2023-06-13", "2023-06-19"), 
        
        ("2023-10-14", "2023-10-31"),
        ("2023-12-07", "2023-12-21"),
        ("2023-12-27", "2024-01-30"),
        ("2024-03-09", "2023-03-20"),
        ("2024-05-11", "2024-06-14"),
        ("2024-06-23", "2024-07-10")]


def get_period_mask(dframe: pd.DataFrame, periods: list, start_column: str, end_column: str) -> pd.DataFrame:
    """Returns the DataFrame with the mask of the periods."""
    mask = pd.Series(False, index=dframe.index)
    for start, end in periods:
        mask |= (dframe[start_column] >= start) & (dframe[end_column] < end)
    return mask

In [None]:
# shortcuts in simplifying the summary table
# note that the tag values should be exclusive to avoid double counting
tag_tree = {
    "standard": {
        "Sleep": ["SWO", "SFR"],
        "Lessons": ["LES"],
        "Revision": ["REV", "EXM"],
        "Repetitive": ["BUR", "WRK", "TDY", "ORG", "REP"],
        "Projects": ["PRJ"],
        "Media": ["MDI"],
        "Social": ["CAL", "OUT", "EVE", "DOG"],
    },
    "censored": {
        "Sleep": ["SWO", "SFR"],
        "Lessons": ["LES"],
        "Revision": ["REV", "EXM"],
        "Repetitive": ["BUR", "WRK", "TDY", "ORG", "REP"],
        "Projects": ["PRJ"],
        "Media": ["MDI"],
        "Social": ["CAL", "OUT", "EVE", "DOG"],
    },
    "uncensored": {
        "Sleep": ["SWO", "SFR"],
        "Lessons": ["LES"],
        "Revision": ["REV", "EXM"],
        "Repetitive": ["BUR", "WRK", "TDY", "ORG"],
        "Projects": ["PRJ"],
        "Media": ["MDI"],
        "Social": ["CAL", "OUT", "EVE", "DOG"],
    },
    "study": {
        "Theory": ["R"],
        "Exercise": ["E"],
        "Projects": ["P"],
        "Exams": ["EXM"],
        "Lessons": ["LES"],
    },
    "extended": {
        "Sleep": ["SWO", "SFR"],
        "Lessons": ["LES"],
        "Revision": ["REV"],
        "Exams": ["EXM"],
        "Repetitive": ["REP"],
        "Bureaucracy": ["BUR"],
        "Work": ["WRK"],
        "Tidying": ["TDY"],
        "Organization": ["ORG"],
        "Projects": ["PRJ"],
        "Media": ["MDI"],
        "Times": ["TM"],
        "Calls": ["CAL"],
        "Going Out": ["OUT", "EVE", "DOG"],
    },
}

In [None]:
# use to merge the tags into a smaller set of tags in events
def merge_tags(rule: dict, fset: frozenset) -> frozenset:
    """For each original tag, appends to a set the new tag associated with it.
    - rule: entry of the tag_tree, associates the new value of a tag to a list of original tags"""
    new_tag_description = set()
    for k, v in rule.items():
        for tag in v:
            if tag in fset:
                new_tag_description.add(k)
    return frozenset(new_tag_description)

def merge_tags_column(dframe: pd.DataFrame, tags_matching: dict) -> pd.Series:
    """Merges the tags in the DataFrame using the tag_tree.
    - df: the DataFrame, with a column "frozen_tags" containing the tags as a frozenset
    - tag_tree: the dictionary containing the tag_tree"""
    return dframe["frozen_tags"].apply(lambda x: merge_tags(tags_matching, x))

In [None]:
import matplotlib.pyplot as plt
from sklearn.preprocessing import LabelEncoder

def plot_2D_histogram(series_x: pd.Series, series_y: pd.Series, bins: int=100, formatter_x: callable=None, formatter_y: callable=None, title: str="") -> plt.Figure:
    """Plots the 2D histogram of the duration of the events.
    - If the series is of type timedelta64[ns], it converts it to nearest minute, rounded down, before plotting.
    - If the series is of type float64 or int64, it plots the histogram.
    Otherwise, it plots the value counts of the series."""
    
    le = LabelEncoder()
    fig, ax = plt.subplots()
    
    if series_x.dtype == "timedelta64[ns]":
        x = (series_x.dt.total_seconds()/60)
    elif series_x.dtype == "float64" or series_x.dtype == "int64":
        x = series_x
    else: # plot value counts
        x = le.fit_transform(series_x)
    
    if series_y.dtype == "timedelta64[ns]":
        y = (series_y.dt.total_seconds()/60)
    elif series_y.dtype == "float64" or series_y.dtype == "int64":
        y = series_y
    else: # plot value counts
        y = le.fit_transform(series_y)
        # set y ticks to go from min to max of the y variable in 15 steps
        # associate the labels with the values
        ax.set_yticks(range(len(le.classes_)))
        ax.set_yticklabels(le.classes_)
        
    ax.hist2d(x, y, bins=bins, cmap="Blues")
    
    title = title if title else f"2D histogram of {series_x.name} and {series_y.name}"
    ax.set_title(title)
    ax.set_xlabel(series_x.name)
    ax.set_ylabel(series_y.name)

    if formatter_x:
        ax.xaxis.set_major_formatter(plt.FuncFormatter(formatter_x))
        
    if formatter_y:
        ax.yaxis.set_major_formatter(plt.FuncFormatter(formatter_y))
        
    # color the 0 value white
    ax.set_facecolor("white")
    
    plt.close(fig)
    return fig

def minute_to_string_formatter(x: int, pos) -> str:
    """Converts the minute to a string in the format HH:MM."""
    return f"{int((x%(24*60))/60):02d}:{int(x%60):02d}"

def minute_to_string_offset_formatter(offset: int) -> str:
    """Returns a function that converts the minute to a string in the format HH:MM, with an offset.
    - offset: the offset to add to the time, in minutes"""
    return lambda x, pos: minute_to_string_formatter(x-offset, pos)
   
# select some events from the whole dataset
selection = df[get_subset_match_tags_mask(df["frozen_tags"], "SLP")]

# hour of the day when the graph should start *60
offset = 6*60
plot_2D_histogram(datetime_to_minute_of_day(selection["begin_datetime"], 24*60 - offset), selection["duration"], formatter_x=minute_to_string_offset_formatter(24*60 - offset), formatter_y=minute_to_string_offset_formatter(0), bins=20, title="Sleep duration by time of day")

In [None]:
def cast_to_list_str(df: pd.DataFrame) -> pd.DataFrame:
    return df.apply(lambda x: str(list(x)))

plot_2D_histogram(selection["duration"], cast_to_list_str(selection["frozen_tags"]), bins=15)

In [None]:
def plot_2D_scatterplot(series_x: pd.Series, series_y: pd.Series, type_identifiers: pd.Series, formatter_x: callable=None, formatter_y: callable=None) -> plt.Figure:
    """Plots the 2D scatterplot of the duration of the events.
    - If the series is of type timedelta64[ns], it converts it to nearest minute, rounded down, before plotting.
    - If the series is of type float64 or int64, it plots the histogram.
    - type_identifiers: the series containing the type of the event, used to color the points:
        - if the series is of type timedelta64[ns], it converts it to nearest minute, rounded down, before plotting.
        - if the series is of type float64 or int64, it plots the histogram.
        - if the series is of type object, it uses the LabelEncoder to transform it to number.
    Otherwise, it plots the value counts of the series.
    """
    
    le = LabelEncoder()
    fig, ax = plt.subplots()
    
    if series_x.dtype == "timedelta64[ns]":
        x = (series_x.dt.total_seconds()/60)
    elif series_x.dtype == "float64" or series_x.dtype == "int64":
        x = series_x
    else: # plot value counts
        x = le.fit_transform(series_x)
    
    if series_y.dtype == "timedelta64[ns]":
        y = (series_y.dt.total_seconds()/60)
    elif series_y.dtype == "float64" or series_y.dtype == "int64":
        y = series_y
    else: # plot value counts
        y = le.fit_transform(series_y)
        # set y ticks to go from min to max of the y variable in 15 steps
        # associate the labels with the values
        ax.set_yticks(range(len(le.classes_)))
        ax.set_yticklabels(le.classes_)

    # ensures the colors don't repeat
    scatter = ax.scatter(x, y, c=le.fit_transform(type_identifiers.apply(lambda x: str(x))), cmap="tab20", s=20)
    ax.set_facecolor("white")

    handles, _ = scatter.legend_elements(prop="colors", num=len(le.classes_))
    labels = le.classes_

    # allows the legend to grow vertically as much as needed to accomodate all lables
    plt.legend(handles, labels, title="Type", bbox_to_anchor=(1.05, 1), loc='upper left', fontsize='small')
    # ensure the whole legend is displayed
    plt.subplots_adjust(right=0.7)
    
    ax.set_title(f"2D scatterplot of {series_x.name} and {series_y.name}")
    ax.set_xlabel(series_x.name)
    ax.set_ylabel(series_y.name)

    if formatter_x:
        ax.xaxis.set_major_formatter(plt.FuncFormatter(formatter_x))
        
    if formatter_y:
        ax.yaxis.set_major_formatter(plt.FuncFormatter(formatter_y))
        
    plt.close(fig)
    return fig

selection = df[get_any_outline_mask(df, "Series") & get_subset_match_tags_mask(df["frozen_tags"], "2024")]
dropped_tags_column = selection["frozen_tags"]
selection_restricted_tags_column = merge_tags_column(selection, tag_tree["standard"])

plot_2D_scatterplot(datetime_to_minute_of_day(selection["begin_datetime"], offset), selection["duration"], selection["outline"], formatter_x=minute_to_string_offset_formatter(offset), formatter_y=minute_to_string_offset_formatter(0))

In [None]:
def plot_histogram_deltas(cl: pd.Series, exclude_outliers_sdev:int = 1000, bins: int=100, title: str="") -> plt.Figure:
    """Plots the histogram of the deltas between the beginning of each event.
    - cl: the column containing the datetime objects
    - exclude_outliers_sdev: the number of standard deviations to exclude
    - bins: the number of bins for the histogram
    - title: the title of the histogram"""
    diff = cl.sort_values().diff()
    diff = diff[~diff.isna()]
    regular_values = diff[~extract_outliers_mask(diff, exclude_outliers_sdev)]
    if regular_values.empty:
        print("No regular values")
        return
    elif regular_values.dtype == "timedelta64[ns]":
        regular_values = regular_values.dt.total_seconds()/(60*60*24)
    
    return plot_histogram(regular_values, bins=bins, title=title)

plot_histogram_deltas(selection["begin_datetime"], title="Distribution of the difference between the selection's start times", bins=20, exclude_outliers_sdev=4)

In [None]:
def get_extreme_difference_values(dframe: pd.DataFrame, column: str, threshold_sd: int) -> pd.DataFrame:
    """Returns the DataFrame with the values that are above the threshold.
    Sorts the DataFrame by the column and returns the values that are above the threshold in standard deviations.
    Uses extract_outliers_mask to get the mask of the outliers."""
    sorted_df = dframe.sort_values(column)
    return sorted_df[extract_outliers_mask(sorted_df[column], threshold_sd)]

print(get_diff_outliers_description(selection["begin_datetime"]))
get_extreme_difference_values(selection, "begin_datetime", 1.9)

In [None]:
def sort_events(df: pd.DataFrame, sort_column: str, ascending: bool=True) -> pd.DataFrame:
    """Sorts the events by the sort_column.
    Returns the DataFrame sorted."""
    return df.sort_values(by=sort_column, ascending=ascending)

In [None]:
def extract_unique_tags(dframe: pd.DataFrame, frozen_tags_column:str) -> pd.Series:
    """Returns the unique tags in the DataFrame as a Series."""
    return dframe[frozen_tags_column].unique()

def produce_unique_tags_strings(unique_tags:pd.Series) -> list:
    """Returns the unique tags in the DataFrame, as a list of strings."""
    return list(map(lambda x: " ".join(list(x)), unique_tags))

def project_same_step(row: pd.Series, summary: pd.DataFrame, beginning_indices: pd.Series, matching_columns: pd.DataFrame):
    """Projects the duration of the event on the summary table, for events starting and ending on the same step."""
    destination_index = beginning_indices.loc[row.name]
    
    # if the destination index is within the range of the summary table
    if destination_index >= 0 and destination_index < len(summary):
        columns_with_matching_tags = matching_columns.loc[row.name]["relevant_tags_strings"]
        summary.loc[destination_index, columns_with_matching_tags] += row["duration_timedelta"]

def project_different_steps(row: pd.Series, from_time: pd.Series, step_size: pd.Timedelta, summary: pd.DataFrame, beginning_indices: pd.Series, ending_indices: pd.Series, matching_columns: pd.DataFrame):
    """Projects the duration of the event on the summary table, for events starting and ending on different steps.
    Since the event may span multiple steps, the duration is assigned to the indices depending on the split."""
    beginning_index = beginning_indices.loc[row.name]
    ending_index = ending_indices.loc[row.name]
    
    columns_with_matching_tags = matching_columns.loc[row.name]["relevant_tags_strings"]

    if beginning_index >= -1 and beginning_index + 1 < len(summary):
        # adds the time between start and T1 
        summary.loc[beginning_index+1, columns_with_matching_tags] += from_time[beginning_index+1] - row["begin_datetime"]

        if ending_index-1 < len(summary):
            # adds to all blocks covered fully the full duration
            summary.loc[beginning_index+1:ending_index-1, columns_with_matching_tags] += step_size

        if ending_index >= 0 and ending_index < len(summary):
            # adds the time between T3 and end
            summary.loc[ending_index, columns_with_matching_tags] += row["end_datetime"] - from_time[ending_index]


def produce_summary_table(dframe: pd.DataFrame, start: str, end: str, step: str, frozen_tags_column:str = "frozen_tags", formatter=lambda x: x) -> pd.DataFrame:
    range_min = parse_timestamp_string(start)
    range_max = parse_timestamp_string(end)
    range_delta = range_max - range_min

    step_size = pd.to_timedelta(step)
    
    steps = range_delta // step_size
    
    unique_tags = extract_unique_tags(dframe, frozen_tags_column)
    unique_tags_strings = produce_unique_tags_strings(map(formatter, unique_tags))

    # initialises the summary table as empty
    summary = pd.DataFrame(timedelta(0), index=range(steps), columns= ["from_time"] + unique_tags_strings)
    
    # sets the first two columns to be the time range
    summary["from_time"] = range_min + summary.index * step_size
    
    # exclude events starting after the end and ending before the start 
    filtered = dframe[~((dframe["begin_datetime"] > range_max) | (dframe["end_datetime"] < range_min))]
    
    # construct a dataframe with the tags and the tags as strings
    # - "relevant_tags": extract_unique_tags(dframe)
    # - "relevant_tags_strings": produce_unique_tags_strings(dframe, frozen_tags_column)
    tags_dataframe = pd.DataFrame(data=[unique_tags, unique_tags_strings]).T.rename(columns={0: "relevant_tags", 1: "relevant_tags_strings"})
    # for each row, selects the columns with the same tags
    matching_columns = filtered.apply(lambda row: tags_dataframe[tags_dataframe["relevant_tags"] == row[frozen_tags_column]], axis=1)

    # produces the indices for the beginning and ending of the events
    beginning_indices = (filtered["begin_datetime"] - range_min).div(range_delta).mul(steps).astype(int)
    ending_indices = (filtered["end_datetime"] - range_min).div(range_delta).mul(steps).astype(int)
    
    # events starting and ending on the same step    
    same_step = filtered[beginning_indices == ending_indices]
    same_step.apply(lambda row: project_same_step(row, summary, beginning_indices, matching_columns), axis=1)
        
    # events not starting and ending on the same step
    spanning_step = filtered[beginning_indices != ending_indices]
    spanning_step.apply(lambda row: project_different_steps(row, summary["from_time"], step_size, summary, beginning_indices, ending_indices, matching_columns), axis=1)

    return summary

time_step_description = "7 days"
beginning_summary_str = "2023-01-01 00:00 +0200"
ending_summary_str = "2024-07-20 00:00 +0200"


df["note_tag"] = df["note"].apply(lambda x: tuple([x]))

filtered_df = df[get_any_outline_mask(df, "Series")]
summary = produce_summary_table(filtered_df, beginning_summary_str, ending_summary_str, time_step_description, frozen_tags_column="frozen_tags_outline", formatter=lambda x: tuple([x[-1]]))

In [None]:
def restrict_summary_to_non_zero_columns(summary: pd.DataFrame) -> pd.DataFrame:
    """Returns the summary table, restricted to the columns where the sum is not zero."""
    return summary.loc[:, (summary != timedelta(0)).any(axis=0)]

summary = restrict_summary_to_non_zero_columns(summary)

In [None]:
summary.columns

In [None]:
def sum_columns_by_tag(raw: pd.DataFrame, columns_dict: dict, separator = " ", keep_columns = ["from_time"]):
    """Adds some columns of the dataset using the columns_dict as reference.
    - raw: the raw dataset, containing the columns to be summed and the columns to be kept
    - columns_dict: key-list pairs where the key is the name of the new column and the list contains tag strings. 
    Each new columns is obtained by summing the columns whose names contain any of the strings in the list."""
    
    if not all([kept_column in raw.columns for kept_column in keep_columns]):
        missing_columns = [kept_column for kept_column in keep_columns if kept_column not in raw.columns]
        raise KeyError("keep_columns is defined but some of its elements are not in raw.columns", missing_columns)

    original_columns = raw.columns.to_series()

    # creates a new dataframe from the keys of the dictionary
    dframe = pd.DataFrame(columns= keep_columns + list(columns_dict.keys()))
    
    for col in keep_columns:
        if col in raw.columns:
            dframe[col] = raw[col]
    
    for new_column_name in columns_dict.keys():
        selection = original_columns.str.split(separator).map(lambda x: any([f in x for f in columns_dict[new_column_name]]))
        dframe[new_column_name] = raw.loc[:, selection].sum(axis=1)

    return dframe
    
column_grouping_selection = "censored"
summary = sum_columns_by_tag(summary, tag_tree[column_grouping_selection])

In [None]:
def convert_timedelta_columns_to_minutes(dframe: pd.DataFrame, inPlace=True) -> pd.DataFrame:
    """Converts all columns after the first to minutes in float format and removes the columns that are empty."""
    
    if not inPlace:
        dframe = dframe.copy()

    for col in dframe.columns[1:]:
        if pd.api.types.is_timedelta64_dtype(dframe[col]):
            # Convert timedelta to minutes
            dframe[col] = dframe[col].dt.total_seconds() / 60.0
    
    dframe = dframe.dropna(axis=1, how='all')
    
    if not inPlace:
        return dframe
           
summary_minutes = convert_timedelta_columns_to_minutes(summary, False)

In [None]:
def extract_data(dframe: pd.DataFrame) -> pd.DataFrame:
    """Extracts the data from the DataFrame, by removing the columns that are not of type number."""
    return dframe.select_dtypes(include=["number"])

data = extract_data(summary_minutes)

 Using the timeline series, 10 labels are sampled at uniform distance, 
    formatted to be YYYY-MM-DD strings and used as x axis labels.
    The y axis labels are given by the names of the columns of the DataFrame.

In [None]:
def produce_summary_graph(dframe: pd.DataFrame, timeline: pd.Series, title: str="") -> plt.Figure:
    """Draws a graph of the summary data.
    Each column of dframe is plotted on a separate row of the graph, with its own line. The columns are kept separate.
    The line for each is horizontal, varying in thickness, with the height given by the value of the first column.
    The graph uses the timeline as the x-axis. 
    """
    fig, ax = plt.subplots()
    
    for i, col in enumerate(dframe.columns):
        ax.plot(timeline, dframe[col] + i, label=col)
    
    ax.set_yticks(range(len(dframe.columns)))
    ax.set_yticklabels(dframe.columns)
    
    ax.set_title(title)
    ax.set_xlabel("Time")
    ax.set_ylabel("Tags")
    
    plt.close(fig)
    return fig

produce_summary_graph(data, summary_minutes["from_time"])

In [None]:
def produce_feature_histogram(data: pd.DataFrame, column_selection: str, time_step: str, bins: int=50, exclude_zero_values:bool=True) -> plt.Figure:
    """Produces a histogram of the feature in the DataFrame. Excludes the zero values.
    - data: the DataFrame containing the data
    - column_selection: string description of how the columns were grouped in making the data dataframe
    - time_step: the time step to use for the histogram
    - bins: the number of bins to use in the histogram"""
    fig = plt.figure()
    
    # set the title of the figure and make it visible
    fig.suptitle(f"Feature histogram for {column_selection} in steps of {time_step}")
    
    lam = lambda x: x[x != 0] if exclude_zero_values else x
    
    axes = data.apply(lam).hist(figsize=(15, 7), bins=bins, edgecolor="black", density=True)
    plt.subplots_adjust(hspace=0.8, wspace=0.2)
    for ax in axes.flatten():
        ax.set_xlabel(f"Total duration in steps of {time_step}")
        ax.set_ylabel("Occurrences")
    
    plt.close(fig)
    return fig
        
produce_feature_histogram(data, column_grouping_selection, time_step_description)

In [None]:
def plot_steps_without_activity(data: pd.DataFrame, time_step: str, selection: str) -> plt.Figure:
    """Plots the number of steps without any activity. Shows for each column the fraction of steps without any activity.
    - data: the DataFrame containing the data
    - time_step: the time step to use for the histogram
    - selection: the selection of columns used in the data DataFrame
    Returns the figure."""
    sparsity = data.apply(lambda x: x[x==0]).count()/len(data)
    fig = plt.figure(figsize=(7, 4))
    fig.suptitle(f"Sparsity of {selection} in steps of {time_step}")
    ax = fig.add_subplot(111)
    ax.grid(False)
    sparsity.sort_values(ascending=False).plot(kind='bar', ax=ax, title=f"{time_step} steps without any activity", grid=False, ylabel="Fraction of steps without activity", ylim=(0, 1), edgecolor="black")
    plt.close(fig)
    return fig

plot_steps_without_activity(data, time_step_description, column_grouping_selection)

In [None]:
from sklearn.preprocessing import StandardScaler

def to_normalized_df(data: pd.DataFrame) -> pd.DataFrame:
    "Takes a numeric DataFrame and returns a DataFrame with the data normalised using StandardScaler."
    return pd.DataFrame(StandardScaler().fit_transform(data), columns=data.columns, index=data.index) 

data_norm = to_normalized_df(data)

In [None]:
import numpy as np
from scipy.cluster.hierarchy import linkage, leaves_list
import seaborn as sns

def fill_diagonal_na(dframe: pd.DataFrame, inPlace=True) -> pd.DataFrame:
    """Fills the diagonal of the DataFrame with NaN values.
    - dframe: the DataFrame to fill
    - inPlace: if True, modifies the DataFrame in place, otherwise returns a copy of the DataFrame"""
    if not inPlace:
        dframe = dframe.copy()
    
    np.fill_diagonal(dframe.values, np.nan)
    
    if not inPlace:
        return dframe
    
def produce_correlation_heatmap(data: pd.DataFrame, method_name: str="pearson") -> plt.Figure:
    """Produces a heatmap of the correlation matrix of the data."""
    fig, ax = plt.subplots(figsize=(5, 5))
    # only show the strictly upper triangle of the matrix, without the main diagonal
    display_data = data.corr(method=method_name)
    display_data = display_data.where(np.triu(np.ones(display_data.shape), k=1).astype(bool))
    sns.heatmap(display_data, cmap="coolwarm", center=0, linewidths=2, annot=True, fmt=".2f", ax=ax)
    plt.close(fig)
    return fig

produce_correlation_heatmap(data_norm)

In [None]:
# energy function

In [None]:
# cluster column can be in time or in values

In [None]:
def get_test_and_training_data(data_norm: pd.DataFrame) -> tuple:
    """Splits the data into 80% training and 20% test data."""
    return data_norm.sample(frac=0.8), data_norm.sample(frac=0.2)

train_data, test_data = get_test_and_training_data(data_norm)

from sklearn.linear_model import LinearRegression

def model_linear_regression(train_data: pd.DataFrame, test_data: pd.DataFrame, column: str) -> tuple:
    """Trains a linear regression model on the training data and tests it on the test data.
    Returns the model and the score."""
    model = LinearRegression(fit_intercept=False)
    model.fit(train_data.drop(columns=[column]), train_data[column])
    return model, model.score(test_data.drop(columns=[column]), test_data[column])

def model_regression(train_data: pd.DataFrame, test_data: pd.DataFrame) -> str:
    """Trains a linear regression model for each column in the training data and tests it on the test data.
    - train_data: the training data, containing the features and the target columns
    - test_data: the test data, containing the features and the target columns
    - Returns the string with the name of the column and the score for that column."""
    string = ""
    for column in data.columns:
        model, score = model_linear_regression(train_data, test_data, column)
        string += f"Column {column} has a score of {score:.2f}\n"
    return string
    
print(model_regression(train_data, test_data))    

In [None]:
from sklearn.neural_network import MLPRegressor

def model_multilayer_perceptron(train_data: pd.DataFrame, test_data: pd.DataFrame, column: str) -> tuple:
    """Trains a perceptron model on the training data and tests it on the test data.
    - shape: the shape of the model, with the number of hidden layers and the number of neurons per layer.
    Returns the model and the score."""
    model = MLPRegressor(max_iter=10000, random_state=0, hidden_layer_sizes=(100, 100))
    model.fit(train_data.drop(columns=[column]), train_data[column])
    return model, model.score(test_data.drop(columns=[column]), test_data[column])


def model_perceptron(train_data: pd.DataFrame, test_data: pd.DataFrame) -> str:
    """Trains a perceptron model for each column in the training data and tests it on the test data.
    - train_data: the training data, containing the features and the target columns
    - test_data: the test data, containing the features and the target columns
    - Returns the string with the name of the column and the score for that column."""
    string = ""
    for column in data.columns:
        model, score = model_multilayer_perceptron(train_data, test_data, column)
        string += f"{column} has a score of {score:.2f} with shape {model.hidden_layer_sizes}\n"
    return string

print(model_perceptron(train_data, test_data))

In [None]:
data_umap_norm = to_normalized_df(data_norm.div(data_norm.sum(axis=1), axis=0))

In [None]:
import umap

def reduce_dimensions(data: pd.DataFrame, n_components: int=3, n_neighbors: int=20, min_dist: float=0.1) -> pd.DataFrame:
    """Reduces the dimensions of the data using UMAP."""
    reducer = umap.UMAP(
        n_neighbors=n_neighbors,
        min_dist=min_dist,
        n_components = n_components,
        metric='correlation'
    )
    return pd.DataFrame(reducer.fit_transform(data), index=data.index)

# promising distance metric for the data
# cosine and correlation

n_dim_umap = 2
n_neighbors_umap = 20
min_dist_umap = 0.1
umap_title =  f"UMAP {n_dim_umap}, {n_neighbors_umap}, {min_dist_umap} from {beginning_summary_str} to {ending_summary_str} in {time_step_description}, {column_grouping_selection}"

umap_reduced_data = reduce_dimensions(data_norm, n_dim_umap, n_neighbors_umap, min_dist_umap)

In [None]:
def plot_scatterplot_umap(data_embedding: pd.DataFrame, labelling_values: pd.Series, title:str = "UMAP Reduction", cbar:bool =True) -> plt.Figure:
    """Plots the scatterplot of the UMAP reduction of the data.
    - data_embedding: the DataFrame containing the UMAP reduction
    - labelling_values: the Series containing the values to use for labelling the points
    - title: the title of the plot
    - cbar: if True, shows the colorbar"""
    
    if labelling_values.dtype != "int64" or labelling_values.dtype != "float64":
        le = LabelEncoder()
        labelling_values = le.fit_transform(labelling_values)
    
    if data_embedding.shape[1] == 2:
        plt.figure()
        labels = le.fit_transform(labelling_values)
        scatter = plt.scatter(data_embedding.iloc[:, 0], data_embedding.iloc[:, 1], c=labelling_values, s=5)
        if cbar:
            plt.colorbar(label='Label')
        plt.xlabel('UMAP 1')
        plt.ylabel('UMAP 2')
        plt.title(title)
        # close the figure to avoid displaying it 
        return scatter
    else:
        fig = plt.figure()
        ax = fig.add_subplot(111, projection='3d')
        # add str labels to the points
        ax.scatter(data_embedding.iloc[:, 0], data_embedding.iloc[:, 1], data_embedding.iloc[:, 2], c=labelling_values, s=5)
        ax.set_xlabel('UMAP 1')
        ax.set_ylabel('UMAP 2')
        ax.set_zlabel('UMAP 3')
        ax.set_title(title)

        return ax

plot_scatterplot_umap(umap_reduced_data, summary["from_time"].dt.strftime("%m"), umap_title)

In [None]:
from scipy.stats import percentileofscore

def get_last_event(group: pd.DataFrame, datetime: pd.DatetimeTZDtype, compare_column:str) -> pd.Series:
    """Returns the last event in the group."""
    id_last_event_before_row = group[group[compare_column] < datetime][compare_column].idxmax()
    return group.loc[id_last_event_before_row]


def compare_event_to_group(row: pd.Series, group: pd.DataFrame) -> pd.DataFrame:
    """Compares the event to the group, returning a series of summary statistics.
    - row is the event to compare to the group
    - row and group should both have beginning_datetime, duration
    - assumes that row["begin_datetime"] is greater than all group["begin_datetime"] """
    
    previous_event = get_last_event(group, row["begin_datetime"], "begin_datetime")
    
    # calculate the deltas between the events in minutes
    deltas_minute = group["begin_datetime"].sort_values().diff().dt.total_seconds()/60
    deltas_minute = deltas_minute.dropna()
    delta_value = (row["begin_datetime"] - previous_event["begin_datetime"]).total_seconds()/60

    statistics = pd.DataFrame(index=[0])
    statistics["delta"] = delta_value
    
    # computes the inverse cumulative function evaluated at delta_row using the distribution of deltas
    statistics["delta_quantile"] = percentileofscore(deltas_minute, delta_value)/100
    statistics["delta_mean"] = deltas_minute.mean()
    # writes the spread of the row in the group, in standard deviations
    statistics["delta_std"] = (delta_value - deltas_minute.mean())/deltas_minute.std()
    
    statistics["duration_quantile"] = percentileofscore(group["duration"], row["duration"])/100
    statistics["duration_mean"] = group["duration"].mean()
    statistics["duration_std"] = (row["duration"] - group["duration"].mean())/group["duration"].std()
    
    return statistics


sel = df[get_subset_match_tags_mask(df["frozen_tags"], "SLP")]
sel = sel[sel["duration"] > 4*60]

sel.sort_values("begin_datetime", inplace=True)
last_sel = sel.iloc[-5]

compare_event_to_group(last_sel, sel)