In [1]:
%run gcal.ipynb

In [None]:
# imports
from abc import ABC, abstractmethod
from datetime import date, datetime, timedelta
from enum import Enum
import functools
import ipywidgets as widgets
from multiprocessing import Process
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots

In [2]:
# set of calendar activities shared across visualizations
activities = {}

In [3]:
# configuration parameters

CALENDARS = ["Work", "Meetings"]  # set of calendars to visualize
SEPARATOR = " - "  # separator between category and activity
CACHE_FILE = "/tmp/token.pickle"  # local path to where token should be stored
CREDENTIALS_FILE = "/Users/anna/bin/google_calendar/credentials.json"  # path to credentials file for Google Calendar API

In [4]:
class TimePeriodType(Enum):
    """Time periods that can be analyzed."""

    DAILY = "daily"
    WEEKLY = "weekly"
    OVERALL = "overall"

    @classmethod
    def all_to_list(cls):
        """Return a list of all available time periods."""
        return [cls.DAILY.value, cls.WEEKLY.value, cls.OVERALL.value]


class TimePeriod:
    """Representation of a time period."""

    def __init__(self, start, end, type):
        self.start = start
        self.end = end
        self.type = type


class TimePeriods:
    """A set of multiple time periods."""

    def __init__(self, start, end, type):
        """Generate time periods between a `start` and `end` date based on the period type."""
        if type == TimePeriodType.DAILY:
            date_list = [
                start + timedelta(days=x) for x in range((end - start).days + 1)
            ]
            self.periods = [TimePeriod(d, d, type) for d in date_list]
        elif type == TimePeriodType.WEEKLY:
            self.periods = []
            current_date = start
            while current_date < end:
                day_diff = 6 - current_date.weekday()
                self.periods.append(
                    TimePeriod(
                        current_date, current_date + timedelta(days=day_diff), type
                    )
                )
                current_date = current_date + timedelta(days=day_diff + 1)
            self.periods.append(
                TimePeriod((self.periods[-1].end + timedelta(days=1)), end, type)
            )
        elif type == TimePeriodType.OVERALL:
            self.periods = [TimePeriod(start, end, type)]
        else:
            raise Exception(f"Unknown time period type {type}")

    def get_period(self, date):
        """Return the time period a date belongs to."""
        for period in self.periods:
            if date >= period.start and date <= period.end:
                return period

        return None


class AggregationType(Enum):
    """The way data should be aggregated and displayed."""

    TOTAL = "total"
    PERCENTAGE = "percentage"

    @classmethod
    def all_to_list(cls):
        """Return all available aggregation types."""
        return [cls.TOTAL.value, cls.PERCENTAGE.value]

In [None]:
class ActivityVisualization(ABC):
    """Implements a visualization for calendar data."""

    def __init__(self):
        self.activities = []
        self.start = date.today()
        self.end = date.today()
        self.calendars = []
        self.period = TimePeriodType.DAILY
        self.aggregation = AggregationType.TOTAL

    @property
    def title(self):
        return "Visualization"

    @property
    def description(self):
        return None

    def get_periods(self):
        return TimePeriods(self.start, self.end, TimePeriodType(self.period))

    def refresh(self, activities, start, end, calendars, period, aggregation):
        self.activities = activities
        self.start = start
        self.end = end
        self.calendars = calendars
        self.period = period
        self.aggregation = aggregation

        self.process()
        self.aggregate_data()
        self.plot()

    def aggregate_data(self):
        """Transform the data based on the aggregation."""
        dates = [d.start for d in self.get_periods().periods]

        if self.aggregation == AggregationType.PERCENTAGE.value:
            date_totals = {}
            for c, data in self.data.items():
                for i, val in enumerate(data):
                    if dates[i] not in date_totals:
                        date_totals[dates[i]] = val
                    else:
                        date_totals[dates[i]] += val

            data = {
                c: [
                    v / date_totals[dates[i]] * 100.0 if date_totals[dates[i]] else 0
                    for i, v in enumerate(data)
                ]
                for c, data in self.data.items()
            }

            self.data = data

    def plot(self):
        """
        Plot the visualization.

        Uses a stacked bar chart by default.
        """

        self.data["Date"] = [d.start for d in self.get_periods().periods]

        if len(list(self.activities.keys())) > 0:
            fig = px.bar(
                self.data,
                x="Date",
                y=list(self.data.keys()),
                labels={"value": "Time spent"},
                color_discrete_sequence=px.colors.qualitative.Dark24,
            )
            fig.update_layout(
                title=go.layout.Title(
                    text=f"{self.title} <br><sup>{self.description}</sup>",
                    xref="paper",
                    x=0,
                )
            )
            fig.layout.template = "plotly_dark"
            fig.show()
        else:
            layout = go.Layout(
                height=100,
                width=300,
                annotations=[
                    go.layout.Annotation(
                        text="No data to display",
                        xref="paper",
                        yref="paper",
                        font={"family": "Courier"},
                    )
                ],
            )
            fig = go.FigureWidget(data=[{"y": [2, 3, 1]}], layout=layout)
            fig.show()

        @abstractmethod
        def process(self):
            """Process the data."""
            raise NotImplementedError

In [None]:
# filter widgets

header = widgets.HTML(
    value="""
        <h1>Calendar Insights</h1>
    """
)

start_date = widgets.DatePicker(
    description="Start Date", disabled=False, value=date.today() - timedelta(days=28)
)

end_date = widgets.DatePicker(
    description="End Date", disabled=False, value=date.today()
)

calendars = widgets.SelectMultiple(
    options=CALENDARS, value=CALENDARS, description="Calendars", disabled=False
)

period = widgets.Dropdown(
    options=TimePeriodType.all_to_list(), value="daily", description="Period"
)

aggregation = widgets.Dropdown(
    options=AggregationType.all_to_list(), value="total", description="Aggregation"
)

separator = widgets.HTML(value="<br>")

In [5]:
def refresh_activities():
    """Pull in calendar activities based on filter values."""
    global activities
    activities = load_calendar_data(
        calendars.value,
        SEPARATOR,
        CREDENTIALS_FILE,
        CACHE_FILE,
        datetime.combine(start_date.value, datetime.min.time()),
        datetime.combine(end_date.value, datetime.min.time()),
    )

In [6]:
class CalendarOverTime(ActivityVisualization):
    @property
    def title(self):
        return "Time Spent in each Calendar"

    @property
    def description(self):
        return "Each calendar represents a high level category of activity."

    def process(self):
        data_by_calendar = {}
        periods = self.get_periods()

        for calendar, calendar_data in self.activities.items():
            data_by_period = {d.start: 0 for d in periods.periods}

            for activity in calendar_data:
                day_diff = activity.end.date() - activity.start.date()
                current_period = periods.get_period(activity.start.date())
                if current_period is None:
                    continue

                current_period = current_period.start
                current_date = activity.start

                for day in range(day_diff.days + 1):
                    if (
                        datetime.combine(
                            current_date + timedelta(days=1), datetime.min.time()
                        )
                        < activity.end
                    ):
                        data_by_period[current_period] += (
                            current_date + timedelta(days=1) - current_date
                        ).hours

                        next_period = periods.get_period(
                            (current_date + timedelta(days=1)).date()
                        )
                        if next_period is None:
                            continue

                        current_date = current_date + timedelta(days=1)
                        if next_period.start != current_period:
                            current_period = next_period.start
                    else:
                        data_by_period[current_period] += (
                            activity.end - current_date
                        ).seconds / 3600.0

            data_by_calendar[calendar] = list(data_by_period.values())

        self.data = data_by_calendar

In [7]:
class AreaOverTime(ActivityVisualization):
    @property
    def title(self):
        return "Time Spent in each Area"

    @property
    def description(self):
        return "Each area represents a specific project or context."

    def process(self):
        data_by_area = {}
        periods = self.get_periods()

        for calendar, calendar_data in self.activities.items():
            for activity in calendar_data:
                day_diff = activity.end.date() - activity.start.date()
                current_date = activity.start

                current_period = periods.get_period(activity.start.date())
                if current_period is None:
                    continue

                current_period = current_period.start

                if activity.name is None:
                    continue

                area = activity.name

                if area not in data_by_area:
                    data_by_area[area] = {d.start: 0 for d in periods.periods}

                for day in range(day_diff.days + 1):
                    if (
                        datetime.combine(
                            current_date + timedelta(days=1), datetime.min.time()
                        )
                        < activity.end
                    ):
                        data_by_area[area][current_period] += (
                            current_date + timedelta(days=1) - current_date
                        ).hours
                        next_period = periods.get_period(
                            (current_date + timedelta(days=1)).date()
                        )
                        if next_period is None:
                            continue

                        current_date = current_date + timedelta(days=1)
                        if next_period.start != current_period:
                            current_period = next_period.start
                    else:
                        data_by_area[area][current_period] += (
                            activity.end - current_date
                        ).seconds / 3600.0

        self.data = {category: list(d.values()) for category, d in data_by_area.items()}

In [8]:
class ContextSwitches(ActivityVisualization):
    @property
    def title(self):
        return "Total Number of Context Switches"

    @property
    def description(self):
        return "Total number of times a switch to a different activity happened."

    def process(self):
        context_switches = {d.start: 0 for d in self.get_periods().periods}

        for calendar, calendar_data in self.activities.items():
            for activity in calendar_data:
                day_diff = activity.end.date() - activity.start.date()
                current_date = activity.start

                current_period = self.get_periods().get_period(activity.start.date())
                if current_period is None:
                    continue

                current_period = current_period.start

                if activity.name is None:
                    continue

                for day in range(day_diff.days + 1):
                    if (
                        datetime.combine(
                            current_date + timedelta(days=1), datetime.min.time()
                        )
                        < activity.end
                    ):
                        context_switches[current_period] += 1
                        next_period = self.get_periods().get_period(
                            (current_date + timedelta(days=1)).date()
                        )
                        if next_period is None:
                            continue

                        current_date = current_date + timedelta(days=1)
                        if next_period.start != current_period:
                            current_period = next_period.start
                    else:
                        context_switches[current_period] += 1
        context_switches["Context Switches"] = list(context_switches.values())
        self.data = {}
        self.data["Context Switches"] = context_switches["Context Switches"]

    def aggregate_data(self):
        # just take the data as is
        pass

In [9]:
class Indicators(ActivityVisualization):
    @property
    def title(self):
        return "Quick overview of some select metrics"

    def _total_time_tracked(self, activities):
        """Total amount of time of activities that have been recorded."""
        total_time = 0
        for calendar, calendar_data in activities.items():
            for activity in calendar_data:
                total_time += activity.duration

        return total_time

    def _total_context_switches(self, activities):
        """Total numbe of context switches."""
        context_switches = 0
        for calendar, calendar_data in activities.items():
            for activity in calendar_data:
                day_diff = activity.end.date() - activity.start.date()
                current_date = activity.start

                if activity.name is None:
                    continue

                for day in range(day_diff.days + 1):
                    if (
                        datetime.combine(
                            current_date.date() + timedelta(days=1), datetime.min.time()
                        )
                        < activity.end
                    ):
                        context_switches += 1
                        current_date = current_date.date() + timedelta(days=1)
                    else:
                        context_switches += 1
        return context_switches

    def _total_areas(self, activities):
        """Total number of different areas recorded."""
        total_areas = []

        for calendar, calendar_data in activities.items():
            for activity in calendar_data:
                if activity.name not in total_areas:
                    total_areas.append(activity.name)

        return len(total_areas)

    def process(self):
        date_diff = (self.end - self.start).days + 1
        activities_prev_period = load_calendar_data(
            CALENDARS,
            SEPARATOR,
            CREDENTIALS_FILE,
            CACHE_FILE,
            datetime.combine(
                self.start - timedelta(days=date_diff), datetime.min.time()
            ),
            datetime.combine(self.end - timedelta(days=date_diff), datetime.min.time()),
        )

        self.data = {}

        self.data["context_switches"] = self._total_context_switches(activities)
        self.data["prev_context_switches"] = self._total_context_switches(
            activities_prev_period
        )

        self.data["time_tracked"] = self._total_time_tracked(activities)
        self.data["prev_time_tracked"] = self._total_time_tracked(
            activities_prev_period
        )

        self.data["number_of_areas"] = self._total_areas(activities)
        self.data["prev_number_of_areas"] = self._total_areas(activities_prev_period)

    def aggregate_data(self):
        # just take the data as is
        pass

    def plot(self):
        date_diff = (self.end - self.start).days + 1

        fig = make_subplots(
            rows=1,
            cols=3,
            specs=[[{"type": "domain"}, {"type": "domain"}, {"type": "domain"}]],
        )
        fig.layout.template = "plotly_dark"

        fig.add_trace(
            go.Indicator(
                mode="number+delta",
                value=self.data["context_switches"] / date_diff,
                title="Average Context Switches",
                delta={
                    "reference": self.data["prev_context_switches"] / date_diff,
                    "relative": True,
                    "position": "bottom",
                },
            ),
            row=1,
            col=1,
        )

        fig.add_trace(
            go.Indicator(
                mode="number+delta",
                value=self.data["time_tracked"] / 3600,
                title="Total Time Tracked [h]",
                delta={
                    "reference": self.data["prev_time_tracked"] / 3600,
                    "relative": True,
                    "position": "bottom",
                },
            ),
            row=1,
            col=2,
        )

        fig.add_trace(
            go.Indicator(
                mode="number+delta",
                value=self.data["number_of_areas"],
                title="Number of Areas Worked On",
                delta={
                    "reference": self.data["prev_number_of_areas"],
                    "relative": True,
                    "position": "bottom",
                },
            ),
            row=1,
            col=3,
        )

        fig.update_layout(height=300, width=1000)
        fig.show()

In [None]:
visualizations = [
    Indicators(),
    CalendarOverTime(),
    AreaOverTime(),
    ContextSwitches(),
]

In [10]:
def refresh(start_date, end_date, calendars, period, aggregation):
    """Refresh visualizations."""
    refresh_activities()

    # make sure status indicators are plotted first
    visualizations[0].refresh(
        activities=activities,
        start=start_date,
        end=end_date,
        calendars=calendars,
        period=period,
        aggregation=aggregation,
    )

    # plot visualizations in parallel
    for viz in visualizations:
        if not isinstance(viz, Indicators):
            Process(
                target=viz.refresh(
                    activities=activities,
                    start=start_date,
                    end=end_date,
                    calendars=calendars,
                    period=period,
                    aggregation=aggregation,
                )
            ).start()


# make filters interactive, call refresh function when changed
widget = widgets.interactive(
    refresh,
    start_date=start_date,
    end_date=end_date,
    calendars=calendars,
    period=period,
    aggregation=aggregation,
)
widget.update()
# display filters horizontally
controls = widgets.HBox(
    widget.children[:-1], layout=widgets.Layout(flex_flow="row wrap")
)
output = widget.children[-1]
display(widgets.VBox([header, controls, separator, output]))

interactive(children=(DatePicker(value=datetime.date(2022, 10, 26), description='Start Date'), DatePicker(valu…