### Survey QC

Last updated: 04/18/2025

Summary: This data pipeline was developed out of a need for accurate survey adherence statistics for the Fucito group at Yale. The overarching goal is to identify two relevant figures, N and D. N, the numerator, is the count of *valid* surveys submitted. D, the denominator, is the count of *valid* surveys delivered. Emphasis on the *valid* - this is a culmination of several rules determined through discussions between the Fucito and Beiwe teams.   

Requirements: The raw survey data, including both survey answers and timings, has already been downloaded to a directory "raw_data" (this can be modified). Furthermore, part 3b extracts information from the Survey Settings and Interventions files (available in the 'Edit this Study' section of the dashboard), so these should be up to date. Finally, part 3c of this script calls a Beiwe API, which requires a Keyring Studies file with valid access and secret keys. 

#### Part 1: Identifying Mismatched Files

This section iterates through raw survey data (survey answers and survey timings) and identifies files that are missing their counterparts. Survey answers record the timing of the submission in the csv file name, while survey timings have a row of data at the very end indicating that the user performed a submission action. This script records all submission times according to survey answers and separately according to suvery timings. Then these times are cross-examined to determine missing or erroneous files. 

In [None]:
# Uncomment the line below and use it to install any necessary packages.
#%pip install ...

In [1]:
import os
import csv
import pandas as pd
from pandas import json_normalize
import json
from datetime import datetime
from datetime import timedelta
import requests
import orjson
import data_summaries

In [2]:
def extract_submission_rows(file_path, survey_id):
    '''
    Inputs:
        file_path: the relative file path of the file to be examined
        survey_id: the survey of interest 
    Outputs:
        submission_time: a list containing UTC time strings of submission times, an empty list is returned if a submission row is not found
    Behavior:
        This function scans each survey timings file and gradually builds up the submission histories of users 
        by appending to the existing submission_dates object each time a submission row is identified
    '''
    with open(file_path, mode='r', encoding='utf-8') as csvfile:
        reader = csv.DictReader(csvfile)
        result = []
        # Iterate the file by row
        for row in reader:
            event_value = row.get('event', '').strip().lower()
            question_id_value = row.get('question id', '').strip().lower()
            curr_survey_id = row.get('survey id', '').strip()
            # Check for a submission row and return the submission time if suitable
            if (event_value == 'submitted' or question_id_value == 'user hit submit') and curr_survey_id == survey_id:
                result.append(row.get('UTC time'))
        return result

In [3]:
def iterate_answer_files(survey_answers_dir, survey_id):
    '''
    Inputs:
        survey_answers_dir: the relative file path of the survey answers directory
        survey_id: the survey of interest 
    Outputs:
        answers_submissions: a df containing submission times and corresponding file paths generated from answer files
    Behavior:
        This function iterates through all of the survey answer files in a specific survey directory and generate a df object 
        containing submission times and paths to the corresponding files
    '''
    answers_submissions = pd.DataFrame(columns=['Time', 'FilePath', 'Extension'])

    # Check if the survey answers directory exists for specific survey id
    survey_answers_survey_dir = os.path.join(survey_answers_dir, survey_id)
    if os.path.isdir(survey_answers_survey_dir):
        
        # Iterate through survey answers files and gather their submission time via the file name
        for file in os.listdir(survey_answers_survey_dir):
            file_path = os.path.join(survey_answers_survey_dir, file)
            submission_time = os.path.splitext(file)[0]
            extension = os.path.splitext(file)[1]
            new_submission = pd.DataFrame({ 
                'Time': [submission_time], 
                'FilePath': [file_path],
                'Extension': [extension]
            })
            answers_submissions = pd.concat([answers_submissions, new_submission])
            
    # Process the 'Time' column into a standard format
    answers_submissions['Time'] = answers_submissions['Time'].str.replace("_", ":", regex=False).str[:-6]
    answers_submissions['Time'] = pd.to_datetime(answers_submissions['Time'], format="%Y-%m-%d %H:%M:%S")

    # Sort by 'Time' also ensuring that .csv files are prioritized
    answers_submissions = answers_submissions.sort_values(by=["Time", "Extension"], key=lambda col: col.map(lambda x: (x != '.csv', x)))
    
    return answers_submissions

In [4]:
def iterate_timings_files(survey_timings_dir, survey_id):
    '''
    Inputs:
        survey_timings_dir: the relative file path of the survey timings directory
        survey_id: the survey of interest 
    Outputs:
        timings_submissions: a df containing submission times and corresponding file paths generated from timings files
    Behavior:
        This function iterates through all of the survey timings files in a specific survey directory and generate a df object 
        containing submission times and paths to the corresponding files
    '''
    timings_submissions = pd.DataFrame(columns=['Time', 'FilePath'])

    # Check if the survey timings directory exists for specific survey id
    survey_timings_survey_dir = os.path.join(survey_timings_dir, survey_id)
    if os.path.isdir(survey_timings_survey_dir):
            
        # Iterate through survey timings files and call extract_submission_rows function to identify submission times
        for file in os.listdir(survey_timings_survey_dir):
            file_path = os.path.join(survey_timings_survey_dir, file)
            if os.path.isfile(file_path):
                result = extract_submission_rows(file_path, survey_id)
                if result:
                    new_submission = pd.DataFrame({
                        'Time': result,
                        'FilePath': [file_path] * len(result) 
                    })
                    timings_submissions = pd.concat([timings_submissions, new_submission]) 

    # Process the 'Time' column into a standard format and sort by it
    timings_submissions['Time'] = pd.to_datetime(timings_submissions['Time'], format="%Y-%m-%dT%H:%M:%S.%f").dt.floor('s')
    timings_submissions = timings_submissions.sort_values(by="Time")
    
    return timings_submissions

In [5]:
def compare_submission_logs(answers_submissions, timings_submissions):
    '''
    Inputs:
        answers_submissions: a df containing submission times and corresponding file paths generated from answer files
        timings_submissions: a df containing submission times and corresponding file paths generated from timings files
    Outputs:
        matched: a count of the number of matched files identified with the two given logs
        unmatched: a count of the number of unmatched files identified with the two given logs
    Behavior:
        This function compares the answers and timings submission logs and flags any unmatched submission files
    '''
    # Initialize two index variables to traverse the logs and trackers of matched and unmatched files
    answer_i = 0
    timing_i = 0
    matched = 0
    unmatched = 0
    
    # Iterate through both lists at the same time looking for mismatches
    while answer_i < answers_submissions.shape[0] and timing_i < timings_submissions.shape[0]:
        time_diff = abs(answers_submissions['Time'].iloc[answer_i] - timings_submissions['Time'].iloc[timing_i])
        # Allow an acceptable difference of 1 minute between submission times of answers and timings
        if time_diff <= pd.Timedelta(minutes=1):
            matched += 1
            answer_i += 1
            timing_i += 1
            
        # If differences larger than one minute are identified, there must be unmatched files
        else:
            if answers_submissions['Time'].iloc[answer_i] > timings_submissions['Time'].iloc[timing_i]:
                print(f"Unmatched file found at {timings_submissions['FilePath'].iloc[timing_i]}")
                unmatched += 1
                timing_i += 1
            else:
                print(f"Unmatched file found at {answers_submissions['FilePath'].iloc[answer_i]}")
                unmatched += 1
                answer_i += 1

    # Handle any remaining entries in either df
    if answer_i < answers_submissions.shape[0]:
        for index in range(answer_i, answers_submissions.shape[0]):
            unmatched += 1
            print(f"Unmatched file found at {answers_submissions['FilePath'].iloc[index]}")

    if timing_i < timings_submissions.shape[0]:
        for index in range(timing_i, timings_submissions.shape[0]):
            unmatched += 1
            print(f"Unmatched file found at {timings_submissions['FilePath'].iloc[index]}")

    return matched, unmatched

In [6]:
def identify_unmatched_files(base_dir):
    '''
    Inputs:
        base_dir: the directory containing the raw data
    Behavior:
        This function runs the compare_submission_logs function on all participants and surveys to identify all unmatched files
    '''
    # Initialize two variables for tracking total # of matched files and # of unmatched files
    matched = 0
    unmatched = 0
    
    # Iterate through each user_id directory
    for user_id in os.listdir(base_dir):
        user_dir = os.path.join(base_dir, user_id)
            
        if os.path.isdir(user_dir):
            # Paths to the survey_timings and survey_answers directories
            survey_timings_dir = os.path.join(user_dir, 'survey_timings')
            survey_answers_dir = os.path.join(user_dir, 'survey_answers')
                
        # Check if the survey_timings directory exists and get the survey_ids
        if os.path.exists(survey_timings_dir):
            survey_timings_ids = set(os.listdir(survey_timings_dir))
    
        # Check if the survey_answers directory exists and get the survey_ids
        if os.path.exists(survey_answers_dir):
            survey_answers_ids = set(os.listdir(survey_answers_dir))
    
        # Get the union of survey ids from survey answers and timinigs
        all_survey_ids = survey_timings_ids | survey_answers_ids
            
        # Iterate through each survey_id
        for survey_id in all_survey_ids:
    
            # Call the iterator functions to look through the files and extract submission times
            answers_submissions = iterate_answer_files(survey_answers_dir, survey_id)
            timings_submissions = iterate_timings_files(survey_timings_dir, survey_id)
    
            # Call the comparison function to identify any unmatched survey files
            temp_matched, temp_unmatched = compare_submission_logs(answers_submissions, timings_submissions)
            matched += temp_matched
            unmatched += temp_unmatched
            
    print(f"Number of matched files identified: {matched}")
    print(f"Number of unmatched files identified: {unmatched}")

In [7]:
# This command deletes any .ipynb_checkpoints which can interfere with file iteration
!rm -rf $(find . -type d -name .ipynb_checkpoints)

base_dir = "raw_data"
identify_unmatched_files(base_dir)

Unmatched file found at raw_data/4ggpbwfj/survey_answers/PmZQCMHU8cAhIdZshFuipCPi/2024-05-26 15_14_43+00_00.csv
Unmatched file found at raw_data/4ggpbwfj/survey_answers/PmZQCMHU8cAhIdZshFuipCPi/2024-08-05 14_28_04+00_00.csv
Unmatched file found at raw_data/4ggpbwfj/survey_answers/9A3eA4U3kFK7ICiYMkPb352v/2024-05-27 14_30_38+00_00.csv
Unmatched file found at raw_data/4ggpbwfj/survey_answers/9A3eA4U3kFK7ICiYMkPb352v/2024-06-17 20_04_05+00_00.csv
Unmatched file found at raw_data/4ggpbwfj/survey_answers/9A3eA4U3kFK7ICiYMkPb352v/2024-08-05 14_26_05+00_00.csv
Unmatched file found at raw_data/wihq7mla/survey_answers/PmZQCMHU8cAhIdZshFuipCPi/2024-03-25 13_02_04+00_00.csv
Unmatched file found at raw_data/wihq7mla/survey_answers/PmZQCMHU8cAhIdZshFuipCPi/2024-03-26 13_22_45+00_00.csv
Unmatched file found at raw_data/wihq7mla/survey_answers/9A3eA4U3kFK7ICiYMkPb352v/2024-09-23 14_57_13+00_00.csv
Unmatched file found at raw_data/enylyud4/survey_answers/9A3eA4U3kFK7ICiYMkPb352v/2024-03-25 13_06_14+00

#### Part 2: Identifying Duplicate Answers

This section traverses through each participant's survey answers data and identifies duplicate files using MD5 hashing.

In [8]:
def hash_file_contents(file_path):
    '''
    Inputs:
        file_path: the file path of the file to be hashed
    Behavior:
        This function reads a csv file and hashed it into a 32 character string that can help identify duplicates
    '''
    import hashlib
    hasher = hashlib.md5()
    with open(file_path, 'rb') as file:
        buf = file.read()
        hasher.update(buf)
    return hasher.hexdigest()

def identify_duplicate_answers (base_dir, target_survey_id=None):
    '''
    Inputs:
        base_dir: the name of the directory containing the raw data
        target_survey_id: the survey ID to check for duplicate answers, if None it checks all surveys.
    Behavior:
        This function compares all the survey answer files of a specified survey within each participant and identifies any duplicates
    '''

    # Initialize a dictionary to store all hashes
    all_hashes = {}
    
    # Iterate through each user_id directory
    for user_id in os.listdir(base_dir):
        user_dir = os.path.join(base_dir, user_id)
        
        if os.path.isdir(user_dir):
            # Path to the survey_answers directory
            survey_answers_dir = os.path.join(user_dir, 'survey_answers')
            
            # Check if the survey_answers directory exists
            if os.path.exists(survey_answers_dir):
                # Get the survey_ids to check
                if target_survey_id:
                    survey_ids_to_check = [target_survey_id]
                else:
                    survey_ids_to_check = os.listdir(survey_answers_dir)
                
                # Iterate through each survey_id
                for survey_id in survey_ids_to_check:
                    # Path to the specific survey_id directory
                    survey_answers_survey_dir = os.path.join(survey_answers_dir, survey_id)
                    
                    if os.path.isdir(survey_answers_survey_dir):
                        # Gather all .csv file paths in the directory
                        file_paths = [
                            os.path.join(survey_answers_survey_dir, file)
                            for file in os.listdir(survey_answers_survey_dir)
                            if os.path.isfile(os.path.join(survey_answers_survey_dir, file)) and file.endswith('.csv')
                        ]
                        
                        # Maintain a dictionary to track duplicates
                        for file_path in file_paths:
                            # Generate hash of the file contents
                            file_hash = hash_file_contents(file_path)
                            if file_hash in all_hashes:
                                all_hashes[file_hash].append(file_path)
                            else:
                                all_hashes[file_hash] = [file_path]

    duplicates_output = []
    
    # Prepare duplicate groups for output
    for file_group in all_hashes.values():
        if len(file_group) > 1:
            joined_paths = "\n".join(file_group)
            # Print duplicates to the console
            print(f"Duplicate files found:\n{joined_paths}\n")
            duplicates_output.append(f"Duplicate files found:\n{joined_paths}\n\n")

In [9]:
identify_duplicate_answers(base_dir, target_survey_id="PmZQCMHU8cAhIdZshFuipCPi")

#### Part 3: Identifying Unexpected Submissions

This section of the script attempts to flag any instances of *unexpected* survey submissions. There are two categories of submissions that have been deemed *unexpected*: multiple submissions for any 24-hour period$^1$, and submissions outside of the designated survey periods. 

$^1$Eligible submission periods may be less than 24 hours based on the deployment of the next survey iteration.

##### Part 3a: Generating a Log of Answers

The first task is to generate a log of all survey answers for all participants.

In [10]:
def generate_answers_log(base_dir, survey_id=None):
    '''
    Inputs:
        base_dir: the directory containing the raw data
        survey_id: the survey_id to be filtered, if none is specified, all surveys are considered
    Output:
        full_log: the df object containing all submission dates and filepaths for each user
    Behavior:
        This function iterates through all users and generates one large object filled with 
        information about participants' survey answer times
    '''
    # Generate a df object to hold all submission times for participants
    full_log = pd.DataFrame(columns=['BeiweID', 'Date', 'TimestampUTC', 'FilePath'])

    # Iterate through each user_id directory
    for user_id in os.listdir(base_dir):
        user_dir = os.path.join(base_dir, user_id)
            
        if os.path.isdir(user_dir):
            # Paths to the survey_answers directorie
            survey_answers_dir = os.path.join(user_dir, 'survey_answers')
    
        # Check if the survey_answers directory exists and get the survey_ids
        if os.path.exists(survey_answers_dir):
            survey_answers_ids = set(os.listdir(survey_answers_dir))

        # Only consider specified survey_id
        if survey_id is not None:
    
            # Call the iterator functions to look through the files and extract submission times
            answers_submissions = iterate_answer_files(survey_answers_dir, survey_id)
            answers_submissions = answers_submissions[answers_submissions['Extension'] == ".csv"]

            # Add the new submissions to the existing log
            new_submission = pd.DataFrame({
                'BeiweID': [user_id] * len(answers_submissions),
                'Date': answers_submissions['Time'].dt.date,
                'TimestampUTC': answers_submissions['Time'],
                'FilePath': answers_submissions['FilePath']
            })
            full_log = pd.concat([full_log, new_submission]) 
        
        else:
            # Iterate through each survey
            for survey in survey_answers_ids:

                # Call the iterator functions to look through the files and extract submission times
                answers_submissions = iterate_answer_files(survey_answers_dir, survey)
                answers_submissions = answers_submissions[answers_submissions['Extension'] == ".csv"]

                # Add the new submissions to the existing log
                new_submission = pd.DataFrame({
                    'BeiweID': [user_id] * len(answers_submissions),
                    'Date': answers_submissions['Time'].dt.date,
                    'TimestampUTC': answers_submissions['Time'],
                    'FilePath': answers_submissions['FilePath']
                })
                full_log = pd.concat([full_log, new_submission])
                
    full_log = full_log.sort_values(by=["BeiweID", "TimestampUTC"])
    return full_log

In [11]:
!rm -rf $(find . -type f -name .DS_Store)
base_dir = "raw_data"
log = generate_answers_log(base_dir, survey_id="PmZQCMHU8cAhIdZshFuipCPi")
log.to_csv('answers_log_full.csv', index=False)

# Generate a csv containing Beiwe IDs and their survey submission counts
counts = log.groupby('BeiweID').size()
counts = counts.reset_index(name='SurveyCount')
counts.to_csv('final_counts.csv', index=False)

  full_log = pd.concat([full_log, new_submission])


##### Part 3b: Generating an Expected Schedule

Next, we need to generate an expected schedule of diary notifications based on participants' enrollment dates and the diary relative schedule

In [12]:
def read_interventions_file(file_path):
    '''
    Inputs:
        file_path: the path to the interventions json
    Output:
        enrollment_log: the df object containing every participant's enrollement date
    Behavior:
        This function looks through the interventions json to determine enrollment dates for all participants. 
        THIS FUNCTION NEEDS TO BE CUSTOMIZED TO A SPECIFIC STUDY.
    '''
    # Initialize an empty list to hold user data
    enrollment_log = []

    # Read the interventions json file
    with open(file_path, 'r') as file:
        interventions = json.load(file)
        # Iterate for each individual
        for user in interventions:
            user_data = interventions[user]
            key = next(iter(user_data.keys()))
            # Record the Beiwe ID and enrollment date of the individual
            enrollment_log.append({
                'BeiweID': user,
                'EnrollmentDate': user_data[key]['Enrollment date']
            })
    return pd.DataFrame(enrollment_log)

def read_settings_file(file_path):
    '''
    Inputs:
        file_path: the path to the settings json
    Output:
        notification_days: a list containing the relative deployment dates from the enrollment date
    Behavior:
        This function looks through the settings json to determine which relative dates the diary survey is deployed on.
        THIS FUNCTION NEEDS TO BE CUSTOMIZED TO A SPECIFIC STUDY.
    '''
    # Initialize an empty list to hold user data
    notification_days = []

    # Read the settings json file
    with open(file_path, 'r') as file:
        settings = json.load(file)
        # Access the relative schedule of the Diary survey
        timings = settings['surveys'][6]['relative_timings']
        for timing in timings:
            # Record the relative deployment day of the survey
            notification_days.append(timing[1])

    notification_days.sort()
    return notification_days

def generate_expected_survey_schedule():
    '''
    Output:
        full_schedule: the df object containing all expected notification dates for all participants
    Behavior:
        This function calls the two methods above to look through study jsons and then generates an 
        expected survey schedule for each participant
    '''
    # Gather enrollment data from the intervention file
    interventions_file = "m4z54N5SU7Eqq2LbwmxQd2UN_intervention_data.json"
    interventions = read_interventions_file(interventions_file)

    # Gather scheduling data using the settings file
    settings_file = "Yale_Fucito_Young_Adult_Alcohol_-_Live_Study_surveys_and_settings.json"
    settings = read_settings_file(settings_file)

    # Initiate a new list to track the full expected schedule of surveys for all participants
    full_schedule = []
    # Iterate for each pariticipant
    for index, row in interventions.iterrows():
        # Extract the Beiwe ID and enrollment date
        enrollment_date = pd.Timestamp(row['EnrollmentDate'])
        beiwe_id = row['BeiweID']

        # Iterate for each deployment day of the Diary relative schedule
        for day in settings:
            # Calculate the absolute date of the deployment
            new_date = enrollment_date + timedelta(days=day)
            burst = day // 90 + 1
            # Record this absolute date
            full_schedule.append({
                'BeiweID': beiwe_id,
                'EnrollmentDate': enrollment_date,
                'RelativeDay': day,
                'Burst': burst,
                'CalculatedDate': new_date
            })
            
    full_schedule = pd.DataFrame(full_schedule)
    full_schedule = full_schedule.sort_values(by=["BeiweID", "RelativeDay"])
    return full_schedule

In [13]:
dates = generate_expected_survey_schedule()
dates.to_csv('notification_dates_log.csv', index=False)

# Update final counts csv with enrollment dates and final survey dates
counts = pd.read_csv("final_counts.csv")
last_dates = dates.groupby('BeiweID').last().reset_index()
counts = counts.merge(
            last_dates[['BeiweID', 'EnrollmentDate', 'CalculatedDate']],
            how='left',
            left_on=['BeiweID'],
            right_on=['BeiweID']
        )
counts.rename(columns={'CalculatedDate': 'LastSurvey'}, inplace=True)
counts.to_csv('final_counts.csv', index=False)

##### Part 3c: Extracting Notification Times

Using the notification API we can determine the absolute timings of survey notifications (which is important because deployments are localized to the participant's timezone).

In [14]:
def call_notifications_api(beiwe_id, access_key, secret_key):
    '''
    Inputs:
        beiwe_id: the user whose notification history to access
        access_key: API access key from the keyring file
        secret_key: API secret key from the keyring file 
    Output:
        notification_history: a df containing the participant's notification deployment history for the Diary survey
    Behavior:
        This function calls the notification API to get a specific participant's notification history.
        The API object is then converted into a df and filtered for original deployments of Diary surveys.
    '''

    # Make a post request to the get-participant-notification-history/v1 endpoint, including the api key,
    # secret key, and participant_id as post parameters.
    endpoint = "https://studies.beiwe.org/get-participant-notification-history/v1"
    t_start = datetime.now()
    print("Starting request at", t_start, flush=True)
    response = requests.post(
        endpoint,
        
        # refine your parameters here
        data={
            "participant_id": beiwe_id,
            "access_key": access_key,
            "secret_key": secret_key           
        },
        allow_redirects=False,
    )
    t_end = datetime.now()
    print("Request completed at", t_end.isoformat(), "duration:", (t_end - t_start).total_seconds(), "seconds")
    
    status_code = response.status_code
    raw_output = response.content
    
    # Sanity checking to make sure the request worked
    print("http status code:", response.status_code)
    
    assert status_code != 400, \
        "400 usually means you are missing a required parameter, or something critical isn't passing some checks.\n" \
        "Check your access key and secret key, if there is a study id make sure it is 24 characters long."
    
    assert status_code != 403, \
        "Permissions Error, you are not authenticated to view data on this study."
    
    assert status_code != 404, \
        "404 means that the entity you have specified does not exist. Check details like study_id, patient_id, etc."
    
    assert response.status_code != 301, \
        "Encountered HTTP redirect, you may have forgotten the s in https. first 100 bytes of response:\n" \
        f"{raw_output[:100]}"
    
    assert response.content != b"", "No data was returned by the server..."
    
    print("Testing whether it is valid json...")
    try:
        json_response = orjson.loads(response.content)
        print("JSON successfully loadded into variable `json_response`")
    except orjson.JSONDecodeError:
        print("Not valid JSON - which may or may not be an issue! Here is the raw output of the first 100 bytes:")
        print(raw_output[:100])
        json_response = None

    try:
        # Filter out notifications for non-Diary surveys and resends/non-relative deployments
        notifications_data = json_normalize(json_response['PmZQCMHU8cAhIdZshFuipCPi'])
        notifications_data = notifications_data[(notifications_data['type'] == 'relative') & (~notifications_data['resend'])]
        
        # Create new columns for Beiwe ID, the date and the UTC time of delivery
        notifications_data['id'] = beiwe_id
        notifications_data['date'] = notifications_data['scheduled_time'].apply(lambda x: x.split('T')[0])
        notifications_data['timestamp_UTC'] = notifications_data['timestamp'].apply(lambda x: pd.to_datetime(x, utc=True))
        notifications_data['timestamp_UTC'] = notifications_data['timestamp_UTC'].dt.tz_convert(None)
    except KeyError as e:
        print(f"KeyError: {str(e)} - The expected key wasn't found in the API response.")
        return pd.DataFrame(columns=['id','date', 'timestamp_UTC'])
    return notifications_data

In [15]:
def match_notifications_to_schedule(notification_dates_file):
    '''
    Inputs:
        notification_dates_file: the csv file containing the expected notification schedule, as generated in Part 3b
    Output:
        notification_log: a df containing everything in the notification_dates_file with added columns for 
        delivery times and truncation times (end of that deployment's eligible submission period)
    Behavior:
        This function adds information about delivery times from the notification API to the notification schedule file
    '''
    # Read the notification_dates_file, convert it into a df and initialize new columns
    schedule = pd.read_csv(notification_dates_file)
    schedule['timestamp_UTC'] = None
    
    # Extract the unique users from this file
    users = schedule['BeiweID'].unique()
    
    # Read the keyring_studies file to get the API access and secret keys
    kr = data_summaries.read_keyring("keyring_studies.py")
    access_key = kr.get("ACCESS_KEY")
    secret_key = kr.get("SECRET_KEY")

    # Iterate for all users
    for user in users:
        # Collect the user's notification history
        notifications_data = call_notifications_api(user, access_key, secret_key)

        # Update the schedule df with the notification history
        updated_schedule = schedule.merge(
            notifications_data[['id','date', 'timestamp_UTC']],
            how='left',
            left_on=['BeiweID','CalculatedDate'],
            right_on=['id','date']
        )
        schedule.loc[schedule['BeiweID'] == user, 'timestamp_UTC'] = updated_schedule['timestamp_UTC_y']

    # Rename the timestamp column and add a column for the end of the survey's eligible period 
    schedule = schedule.rename(columns={'timestamp_UTC': 'DeliveredUTC'})
    schedule['TruncatedUTC'] = schedule['DeliveredUTC'] + timedelta(hours=24)

    for i in range(len(schedule) - 1):  
        if schedule['BeiweID'].iloc[i] == schedule['BeiweID'].iloc[i + 1]:
            current_truncated = schedule['TruncatedUTC'].iloc[i]
            next_delivered = schedule['DeliveredUTC'].iloc[i + 1]
            
            if pd.notna(current_truncated) and pd.notna(next_delivered):
                schedule.at[schedule.index[i], 'TruncatedUTC'] = min(current_truncated, next_delivered)

    return schedule

In [16]:
schedule = match_notifications_to_schedule("notification_dates_log.csv")
schedule.to_csv("notification_dates_log_with_deliveries.csv", index = False)

Starting request at 2025-04-18 16:53:17.308693
Request completed at 2025-04-18T16:53:17.690086 duration: 0.381393 seconds
http status code: 200
Testing whether it is valid json...
JSON successfully loadded into variable `json_response`
Starting request at 2025-04-18 16:53:17.722619
Request completed at 2025-04-18T16:53:18.022895 duration: 0.300276 seconds
http status code: 200
Testing whether it is valid json...
JSON successfully loadded into variable `json_response`
Starting request at 2025-04-18 16:53:18.057073
Request completed at 2025-04-18T16:53:18.365979 duration: 0.308906 seconds
http status code: 200
Testing whether it is valid json...
JSON successfully loadded into variable `json_response`
Starting request at 2025-04-18 16:53:18.402184
Request completed at 2025-04-18T16:53:18.690975 duration: 0.288791 seconds
http status code: 200
Testing whether it is valid json...
JSON successfully loadded into variable `json_response`
Starting request at 2025-04-18 16:53:18.713146
Request c

##### Part 3d: Identify Unexpected Submissions

Finally, we can cross-reference the submissions log against the notifications log to find two kinds of unexpected surveys as mentioned above:
1. Submissions outside of the expected schedules
2. Multiple submissions in one notification period

In [17]:
def match_answers_to_notifications(notifications_file, answers_file, counts_file):
    '''
    Inputs:
        notifications_file: the csv file containing the expected and delivered notification schedule, as generated in Parts 3b/3c
        answers_file: the csv file containing the answer submission log, as generated in Part 3a
        counts_file: the csv file containing the submission counts for each participant
    Output:
        notification_log: a df containing everything in the notification_dates_file_with_deliveries with an additional
        column for number of submissions within each notification window
        counts: a df that adds counts of outside and double submissions to the counts file
    Behavior:
        This function cross-checks the two input logs and flags any unexpected submissions
    '''
    # Read the notification log csv
    schedule = pd.read_csv(notifications_file)
    schedule['DeliveredUTC'] = schedule['DeliveredUTC'].apply(lambda x: pd.to_datetime(x))
    schedule['TruncatedUTC'] = schedule['TruncatedUTC'].apply(lambda x: pd.to_datetime(x))
    schedule['SurveysSubmitted'] = 0

    # Read the answers log csv
    answers = pd.read_csv(answers_file)

    # Read the final counts csv
    counts = pd.read_csv(counts_file)
    counts["DoubleSubmissions"] = 0
    counts["OutsideSubmissions"] = 0
    counts["CheckManually"] = False

    # Create a list for the filtered answers log
    filtered_answers = []

    outside, extra = 0,0

    # Iterate each row of the answers log
    for index, row in answers.iterrows():
        # Extract the Beiwe ID, timestamp and file path
        timestamp = pd.Timestamp(row['TimestampUTC'])
        date = row['Date']
        beiwe_id = row['BeiweID']
        file_path = row['FilePath']

        # Look for notification delibery corresponding to the answer submission
        mask = (
            (schedule['BeiweID'] == beiwe_id) &
            (schedule['DeliveredUTC'] < timestamp) & 
            (schedule['TruncatedUTC'] > timestamp)
        )

        # Indicator whether this is a valid row
        valid_row = True
        
        if schedule.loc[mask].empty:
            if not schedule.loc[(schedule['BeiweID'] == beiwe_id) & (schedule['CalculatedDate'] == date)].empty:
                counts.loc[counts['BeiweID'] == beiwe_id, 'CheckManually'] = True
                print(f"[CHECK MANUALLY] Issue with notification history: {file_path}")
            else:
                
                counts.loc[counts['BeiweID'] == beiwe_id, 'OutsideSubmissions'] += 1
                print(f"Submission outside of diary period: {file_path}")
            outside += 1
        else:
            burst = schedule.loc[mask, 'Burst'].iloc[0]
            
        schedule.loc[mask, 'SurveysSubmitted'] = schedule.loc[mask, 'SurveysSubmitted'] + 1

        if (schedule.loc[mask, 'SurveysSubmitted'] > 1).any():
            valid_row = False
            counts.loc[counts['BeiweID'] == beiwe_id, 'DoubleSubmissions'] += 1
            print(f"Unexpected extra submission: {file_path}")
            extra += 1

        if valid_row:
            row['Burst'] = burst
            filtered_answers.append(row)

    print(f"Total submissions outside of diary periods: {outside}")
    print(f"Total submission periods (days) with multiple submissions: {extra}")
    filtered_answers = pd.DataFrame(filtered_answers, columns=['BeiweID','Date','TimestampUTC','FilePath','Burst'])
    return schedule, counts, filtered_answers

In [18]:
notifications_file = "notification_dates_log_with_deliveries.csv"
answers_file = "answers_log_full.csv"
counts_file = "final_counts.csv"
schedule, counts, answers = match_answers_to_notifications(notifications_file, answers_file, counts_file)
schedule.to_csv("notification_dates_log_with_deliveries_and_submissions.csv", index = False)

answers = answers[['BeiweID','Date','Burst','TimestampUTC','FilePath']]
answers.to_csv("answers_log_filtered.csv", index = False)

counts = counts.sort_values(by=['LastSurvey', 'BeiweID'])
counts = counts[['BeiweID','EnrollmentDate', 'LastSurvey', 'SurveyCount', 'DoubleSubmissions', 'OutsideSubmissions', 'CheckManually']]
counts.to_csv("final_counts.csv", index = False)

Submission outside of diary period: raw_data/11hfsajc/survey_answers/PmZQCMHU8cAhIdZshFuipCPi/2025-02-02 14_31_35+00_00.csv
Submission outside of diary period: raw_data/1bllhfi7/survey_answers/PmZQCMHU8cAhIdZshFuipCPi/2024-04-01 19_40_51+00_00.csv
Submission outside of diary period: raw_data/1bllhfi7/survey_answers/PmZQCMHU8cAhIdZshFuipCPi/2024-07-02 00_57_30+00_00.csv
Submission outside of diary period: raw_data/1czziou5/survey_answers/PmZQCMHU8cAhIdZshFuipCPi/2025-02-17 04_35_05+00_00.csv
Unexpected extra submission: raw_data/1ib9r56g/survey_answers/PmZQCMHU8cAhIdZshFuipCPi/2025-01-22 00_34_11+00_00.csv
Submission outside of diary period: raw_data/1ib9r56g/survey_answers/PmZQCMHU8cAhIdZshFuipCPi/2025-02-03 21_05_03+00_00.csv
Submission outside of diary period: raw_data/1sshhk6u/survey_answers/PmZQCMHU8cAhIdZshFuipCPi/2025-03-04 23_25_22+00_00.csv
Submission outside of diary period: raw_data/1w8gybjj/survey_answers/PmZQCMHU8cAhIdZshFuipCPi/2024-11-25 05_42_08+00_00.csv
Submission outs