# Analysis

## Imports, load the data and inspect it

In [69]:
# Imports
import os
import csv
from datetime import datetime, timedelta
from collections import defaultdict, Counter
from typing import List, Tuple, Dict, Union
from statistics import variance
import unittest

def load_all_data(file_path: str) -> List[Tuple[str, str, datetime]]:
    """Load all event data from a CSV file."""
    all_data = []
    with open(file_path, 'r') as file:
        csv_reader = csv.reader(file)
        next(csv_reader)  # Skip header
        for line in csv_reader:
            user_id, event_type, event_time = line
            event_time = datetime.fromisoformat(event_time.replace("Z", "+00:00"))
            all_data.append((user_id, event_type.upper(), event_time))
    return all_data

data = load_all_data('datapao_homework_2023.csv')
data[:3]

[('2e5d8815-4e59-4302-99c0-6fc9593a2eef',
  'GATE_IN',
  datetime.datetime(2023, 1, 31, 8, 18, 36, tzinfo=datetime.timezone.utc)),
 ('a8c60645-aef4-4b4e-aefb-65e242536c2f',
  'GATE_IN',
  datetime.datetime(2023, 1, 31, 8, 43, 41, tzinfo=datetime.timezone.utc)),
 ('0b99d382-ea52-4a1d-8e9e-218933c0d7b8',
  'GATE_IN',
  datetime.datetime(2023, 1, 31, 8, 43, 47, tzinfo=datetime.timezone.utc))]

## 1. Calculate the amount of time and the number of days each person has spent in the office in February. Write results to a CSV, in the format (user_id, time, days, average_per_day, rank)

In [70]:
def filter_and_sort_data(data_source: Union[str, List[Tuple[str, str, datetime]]]) -> List[Tuple[str, str, datetime]]:
    """Filter and sort event data for the month of February."""
    if isinstance(data_source, str):
        all_data = load_all_data(data_source)
        feb_data = [entry for entry in all_data if entry[2].month == 2]
    else:
        feb_data = [entry for entry in data_source if entry[2].month == 2]
    feb_data.sort(key=lambda x: (x[0], x[2]))
    return feb_data

def calculate_user_metrics(feb_data: List[Tuple[str, str, datetime]]) -> Tuple[Dict[str, timedelta], Dict[str, set], Dict[str, List[Tuple[datetime, datetime]]]]:
    """Calculate metrics like time spent, unique days, and sessions for each user."""
    user_time = defaultdict(timedelta)
    user_days = defaultdict(set)
    user_sessions = defaultdict(list)

    for i in range(len(feb_data) - 1):
        user_id, event_type, event_time = feb_data[i]
        next_user_id, next_event_type, next_event_time = feb_data[i + 1]

        if user_id == next_user_id and event_type == "GATE_IN" and next_event_type == "GATE_OUT":
            time_diff = next_event_time - event_time
            user_time[user_id] += time_diff
            user_days[user_id].add(event_time.date())

            if user_sessions[user_id]:
                last_session_end = user_sessions[user_id][-1][1]
                if (event_time - last_session_end).seconds // 3600 >= 2:
                    user_sessions[user_id].append((event_time, next_event_time))
                else:
                    user_sessions[user_id][-1] = (user_sessions[user_id][-1][0], next_event_time)
            else:
                user_sessions[user_id].append((event_time, next_event_time))

    return user_time, user_days, user_sessions

def calculate_avg_time_and_rank(user_time: Dict[str, timedelta], user_days: Dict[str, set]) -> Tuple[Dict[str, float], Dict[str, int]]:
    """Calculate average time and ordinal rank for each user."""
    user_avg_time = {user: (user_time[user].seconds // 3600) / len(user_days[user]) for user in user_time}
    sorted_users = sorted(user_avg_time.items(), key=lambda x: x[1], reverse=True)
    user_rank = {user: rank + 1 for rank, (user, _) in enumerate(sorted_users)}
    return user_avg_time, user_rank

def write_to_csv(output_data: List[List[Union[str, int, float]]], header: List[str], file_path: str) -> None:
    """Write calculated metrics to a CSV file."""
    with open(file_path, 'w', newline='') as file:
        csv_writer = csv.writer(file)
        csv_writer.writerow(header)
        csv_writer.writerows(output_data)

def task_one_main(file_path: str, output_file_path: str) -> None:
    """Main function to execute Task 1: Calculate and write user metrics to a CSV."""
    feb_data = filter_and_sort_data(file_path)
    user_time, user_days, user_sessions = calculate_user_metrics(feb_data)
    user_avg_time, user_rank = calculate_avg_time_and_rank(user_time, user_days)

    output_data = [[user, user_time[user].seconds // 3600, len(user_days[user]), user_avg_time[user], user_rank[user]] for user in user_time]
    write_to_csv(output_data, ['user_id', 'time', 'days', 'average_per_day', 'rank'], output_file_path)

# Define the output directory and file path
output_directory = './output'
if not os.path.exists(output_directory):
    os.makedirs(output_directory)

output_file_path_1 = os.path.join(output_directory, 'first.csv')

# Call the main function for Task 1
task_one_main('datapao_homework_2023.csv', output_file_path_1)

output_file_path_1

'./output\\first.csv'

In [71]:
# Unit Tests for Task 1

class TestTaskOne(unittest.TestCase):

    # Sample data for testing
    sample_data = [
        ('user_1', 'GATE_IN', datetime(2023, 2, 1, 8, 0, 0)),
        ('user_1', 'GATE_OUT', datetime(2023, 2, 1, 12, 0, 0)),
        ('user_1', 'GATE_IN', datetime(2023, 2, 1, 13, 0, 0)),
        ('user_1', 'GATE_OUT', datetime(2023, 2, 1, 18, 0, 0)),
        ('user_2', 'GATE_IN', datetime(2023, 2, 1, 9, 0, 0)),
        ('user_2', 'GATE_OUT', datetime(2023, 2, 1, 17, 0, 0))
    ]

    def test_filter_and_sort_data(self):
        self.assertEqual(filter_and_sort_data(self.sample_data), self.sample_data)

    def test_calculate_user_metrics(self):
        user_time, user_days, user_sessions = calculate_user_metrics(self.sample_data)
        self.assertEqual(user_time['user_1'].seconds // 3600, 9)
        self.assertEqual(user_time['user_2'].seconds // 3600, 8)
        self.assertEqual(len(user_days['user_1']), 1)
        self.assertEqual(len(user_days['user_2']), 1)

    def test_calculate_avg_time_and_rank(self):
        user_time = {'user_1': timedelta(hours=9), 'user_2': timedelta(hours=8)}
        user_days = {'user_1': {datetime(2023, 2, 1).date()}, 'user_2': {datetime(2023, 2, 1).date()}}
        user_avg_time, user_rank = calculate_avg_time_and_rank(user_time, user_days)
        self.assertEqual(user_avg_time['user_1'], 9.0)
        self.assertEqual(user_avg_time['user_2'], 8.0)
        self.assertEqual(user_rank['user_1'], 1)
        self.assertEqual(user_rank['user_2'], 2)

# Run the unit tests
unittest.TextTestRunner().run(unittest.TestLoader().loadTestsFromTestCase(TestTaskOne))

...
----------------------------------------------------------------------
Ran 3 tests in 0.001s

OK


<unittest.runner.TextTestResult run=3 errors=0 failures=0>

## 2. Find out who had the longest work session in February. Write the result to a CSV in the format (user_id, session_length)

In [72]:
def calculate_longest_session(user_sessions: Dict[str, List[Tuple[datetime, datetime]]]) -> Dict[str, timedelta]:
    """Calculate the longest work session for each user."""
    return {user: max((session[1] - session[0] for session in sessions), default=timedelta(0))
            for user, sessions in user_sessions.items()}

def task_two_main(user_sessions: Dict[str, List[Tuple[datetime, datetime]]], output_file_path: str) -> None:
    """Main function to execute Task 2: Calculate and write longest work sessions to a CSV."""
    user_longest_session = calculate_longest_session(user_sessions)
    sorted_users_by_session = sorted(user_longest_session.items(), key=lambda x: x[1], reverse=True)
    output_second = [[user, longest_session.seconds // 3600] for user, longest_session in sorted_users_by_session]
    write_to_csv(output_second, ['user_id', 'session_length'], output_file_path)

# Define the output file path for Task 2
output_file_path_2 = os.path.join(output_directory, 'second.csv')

# Fetch the feb_data from the original data source or pre-filtered data
feb_data = filter_and_sort_data('datapao_homework_2023.csv')  # Replace with your actual file path

# Get user_sessions from calculate_user_metrics function
_, _, user_sessions = calculate_user_metrics(feb_data)

# Call the main function for Task 2
task_two_main(user_sessions, output_file_path_2)

output_file_path_2

'./output\\second.csv'

In [73]:
# Unit Tests for Task 2

class TestTaskTwo(unittest.TestCase):

    # Sample session data for testing
    sample_sessions = {
        'user_1': [
            (datetime(2023, 2, 1, 8, 0, 0), datetime(2023, 2, 1, 18, 0, 0)),  # Merged session
        ],
        'user_2': [
            (datetime(2023, 2, 1, 9, 0, 0), datetime(2023, 2, 1, 17, 0, 0))
        ]
    }

    def test_calculate_longest_session(self):
        longest_sessions = calculate_longest_session(self.sample_sessions)
        self.assertEqual(longest_sessions['user_1'].seconds // 3600, 10)
        self.assertEqual(longest_sessions['user_2'].seconds // 3600, 8)

# Run the unit tests for Task 2
unittest.TextTestRunner().run(unittest.TestLoader().loadTestsFromTestCase(TestTaskTwo))

.
----------------------------------------------------------------------
Ran 1 test in 0.000s

OK


<unittest.runner.TextTestResult run=1 errors=0 failures=0>

## 3.1 Identify core working hours -- when are the most employees present?

This should help us best book face-to-face meetings and collaborative work.

In [74]:
def calculate_hourly_presence(feb_data: List[Tuple[str, str, datetime]]) -> Dict[int, int]:
    """Calculate the hourly presence of employees."""
    hourly_presence = Counter()
    for user_id, event_type, event_time in feb_data:
        if event_type == "GATE_IN":
            hourly_presence[event_time.hour] += 1
        elif event_type == "GATE_OUT":
            hourly_presence[event_time.hour] -= 1
    return hourly_presence

def calculate_cumulative_hourly_presence(hourly_presence: Dict[int, int]) -> Dict[int, int]:
    """Calculate the cumulative hourly presence of employees."""
    cumulative_presence = 0
    hourly_cumulative_presence = {}
    for hour in range(24):
        cumulative_presence += hourly_presence.get(hour, 0)
        hourly_cumulative_presence[hour] = cumulative_presence
    return hourly_cumulative_presence

# Run the functions for calculating core working hours
hourly_presence = calculate_hourly_presence(feb_data)
hourly_cumulative_presence = calculate_cumulative_hourly_presence(hourly_presence)
hourly_cumulative_presence

{0: -1,
 1: -2,
 2: -3,
 3: -3,
 4: -3,
 5: -3,
 6: -3,
 7: -3,
 8: 57,
 9: 131,
 10: 153,
 11: 189,
 12: 176,
 13: 180,
 14: 175,
 15: 179,
 16: 174,
 17: 173,
 18: 163,
 19: 137,
 20: 56,
 21: -3,
 22: -2,
 23: 0}

In [75]:
# Unit Tests for Core Working Hours Calculation

class TestCoreWorkingHours(unittest.TestCase):

    # Sample data for testing
    sample_data = [
        ('user_1', 'GATE_IN', datetime(2023, 2, 1, 8, 0, 0)),
        ('user_1', 'GATE_OUT', datetime(2023, 2, 1, 12, 0, 0)),
        ('user_1', 'GATE_IN', datetime(2023, 2, 1, 13, 0, 0)),
        ('user_1', 'GATE_OUT', datetime(2023, 2, 1, 18, 0, 0)),
        ('user_2', 'GATE_IN', datetime(2023, 2, 1, 9, 0, 0)),
        ('user_2', 'GATE_OUT', datetime(2023, 2, 1, 17, 0, 0))
    ]

    def test_calculate_hourly_presence(self):
        hourly_presence = calculate_hourly_presence(self.sample_data)
        self.assertEqual(hourly_presence[8], 1)
        self.assertEqual(hourly_presence[9], 1)
        self.assertEqual(hourly_presence[12], -1)
        self.assertEqual(hourly_presence[13], 1)
        self.assertEqual(hourly_presence[17], -1)
        self.assertEqual(hourly_presence[18], -1)

    def test_calculate_cumulative_hourly_presence(self):
        hourly_presence = calculate_hourly_presence(self.sample_data)
        hourly_cumulative_presence = calculate_cumulative_hourly_presence(hourly_presence)
        self.assertEqual(hourly_cumulative_presence[8], 1)
        self.assertEqual(hourly_cumulative_presence[9], 2)
        self.assertEqual(hourly_cumulative_presence[12], 1)
        self.assertEqual(hourly_cumulative_presence[13], 2)
        self.assertEqual(hourly_cumulative_presence[17], 1)
        self.assertEqual(hourly_cumulative_presence[18], 0)

# Run the unit tests for Core Working Hours Calculation
unittest.TextTestRunner().run(unittest.TestLoader().loadTestsFromTestCase(TestCoreWorkingHours))

..
----------------------------------------------------------------------
Ran 2 tests in 0.001s

OK


<unittest.runner.TextTestResult run=2 errors=0 failures=0>

At 11 AM we have the highest employee presence, and it might be best to book larger meetings around that time.

The number of people present remains somewhat constant for multiple hours after that, so these hours might be best for any collaborative work sessions.

## 3.2 Flexibility index

Calculate the variance in the start and end times of each employee's work sessions. A higher variance indicates the employee has a very flexible schedule, and a lower variance indicates the employee has a strict pattern and routine.

The most flexible workers might be our first people to call when urgent things pop up outside more typical working hours.
The least flexible users are predictable, and their schedules might be the easiest to plan around and assign tasks based on.

In [76]:
def populate_start_end_times(user_sessions: Dict[str, List[Tuple[datetime, datetime]]]) -> Tuple[Dict[str, List[int]], Dict[str, List[int]]]:
    """Populate start and end times for each user's work sessions."""
    user_start_times = defaultdict(list)
    user_end_times = defaultdict(list)
    for user, sessions in user_sessions.items():
        for session_start, session_end in sessions:
            user_start_times[user].append(session_start.hour)
            user_end_times[user].append(session_end.hour)
    return user_start_times, user_end_times

def calculate_flexibility_index(user_start_times: Dict[str, List[int]], user_end_times: Dict[str, List[int]]) -> Dict[str, float]:
    """Calculate the flexibility index for each user."""
    user_flexibility_index = {}
    for user in user_start_times:
        start_var = variance(user_start_times[user]) if len(user_start_times[user]) > 1 else 0
        end_var = variance(user_end_times[user]) if len(user_end_times[user]) > 1 else 0
        user_flexibility_index[user] = start_var + end_var
    return user_flexibility_index

# Run the functions for calculating flexibility index
user_start_times, user_end_times = populate_start_end_times(user_sessions)
user_flexibility_index = calculate_flexibility_index(user_start_times, user_end_times)

# Sort the users by Flexibility Index
sorted_users_by_flexibility = sorted(user_flexibility_index.items(), key=lambda x: x[1], reverse=True)

# Display the top 3 most flexible and least flexible users
top_3_most_flexible_users = sorted_users_by_flexibility[:3]
top_3_least_flexible_users = sorted_users_by_flexibility[-3:]

top_3_most_flexible_users, top_3_least_flexible_users

([('1fcdd2f5-13ed-4ee8-b4ec-706654e68472', 57.82213438735178),
  ('bfa0f1f9-300e-41ad-bcff-0c3bd79595f1', 40.01307189542484),
  ('5dafb3b2-22aa-4039-a2f0-3cf711f84177', 33.97011494252874)],
 [('98ea63ee-bba1-4ea5-8ada-1d0a2dd1f6fd', 23.04),
  ('oab91046-1831-436d-a41d-da6ba1b2d385', 22.104761904761904),
  ('33c08c48-f50a-4c72-a975-f8572d65a8db', 22.026666666666664)])

In [77]:
# Unit Tests for Flexibility Index Calculation

class TestFlexibilityIndex(unittest.TestCase):

    # Sample session data for testing
    sample_sessions = {
        'user_1': [
            (datetime(2023, 2, 1, 8, 0, 0), datetime(2023, 2, 1, 18, 0, 0)),  # Merged session
            (datetime(2023, 2, 2, 9, 0, 0), datetime(2023, 2, 2, 17, 0, 0))
        ],
        'user_2': [
            (datetime(2023, 2, 1, 9, 0, 0), datetime(2023, 2, 1, 17, 0, 0)),
            (datetime(2023, 2, 2, 10, 0, 0), datetime(2023, 2, 2, 16, 0, 0))
        ]
    }

    def test_populate_start_end_times(self):
        start_times, end_times = populate_start_end_times(self.sample_sessions)
        self.assertEqual(start_times['user_1'], [8, 9])
        self.assertEqual(end_times['user_1'], [18, 17])
        self.assertEqual(start_times['user_2'], [9, 10])
        self.assertEqual(end_times['user_2'], [17, 16])

    def test_calculate_flexibility_index(self):
        start_times, end_times = populate_start_end_times(self.sample_sessions)
        flexibility_index = calculate_flexibility_index(start_times, end_times)
        self.assertEqual(flexibility_index['user_1'], 0.5 + 0.5)
        self.assertEqual(flexibility_index['user_2'], 0.5 + 0.5)

# Run the unit tests for Flexibility Index Calculation
unittest.TextTestRunner().run(unittest.TestLoader().loadTestsFromTestCase(TestFlexibilityIndex))

..
----------------------------------------------------------------------
Ran 2 tests in 0.001s

OK


<unittest.runner.TextTestResult run=2 errors=0 failures=0>