In [None]:
import pandas as pd
import json
import datetime
import time

# Configuration

Specify the configuration file name that contains information about:

- surveys being aggregated (their name, data file path, and column grouping file path)
- study dates to project aggregated information on
- horizontal and vertical aggregation column list and grouping
- file path for storing horizonally and vertically aggregated information for the study period

Note: horizontal aggregation produces a fixed table in terms of columns and rows. It is filled based on the avaialability of data from surveys being aggregated. For examples, it has affect colunmns for all surveys (weekly, morning, midday1, midday2, and evening). If only weekly and evening surveys are included in the list of surveys being aggregated, only their relevant columns are being aggregated.

Note: vertical aggregated produces a fixed table in terms of columns. However, rows vary depending on the surveys being aggregated.

In [None]:
prompt = """\
Specify the **absolute path** of the configuration file containing information about:

- surveys being aggregated (their name, cleaned data file path, and column grouping file path)
- study dates to project aggregated information on
- horizontal and vertical aggregation column list and grouping
- file path for storing horizonally and vertically aggregated information for the study period

Note: horizontal aggregation produces a fixed table in terms of columns and rows. It is filled \
based on the avaialability of data from surveys being aggregated. For examples, it has affect \
colunmns for all surveys (weekly, morning, midday1, midday2, and evening). If only weekly and \
evening surveys are included in the list of surveys being aggregated, only their relevant columns\
are being aggregated.

Note: vertical aggregated produces a fixed table in terms of columns. However, rows vary depending \
on the surveys being aggregated.

Example (find a sample in script-input repository): emaaggregation-config.json

Tips:

- Place your configuration files in the same directory as this notebook.
- Use a different configuration file for each different analysis rather than modifying a single configuration file.


"""
#config_file = 'emaaggregation-config.json'
config_file = input(prompt)
print('using configurations specified in {}'.format(config_file))
#/Users/yasaman/UWEXP/analysis-scripts/surveys/emaaggregation-config.json

In [None]:
with open(config_file, 'r') as file_obj:
    config = json.load(file_obj)

institution = config['institution']
surveys = config['surveys']
id_range = config['id_range']
study_dates = config['study_dates']
h_aggregation_file = config['h_aggregation_file']
h_aggregation_column_grouping_file = config['h_aggregation_column_grouping_file']
v_aggregation_file = config['v_aggregation_file']
v_aggregation_column_grouping_file = config['v_aggregation_column_grouping_file']

In [None]:
survey_names = ''
survey_files = ''
survey_grouping_files = ''
for survey in surveys:
    survey_names = survey_names + survey['name'] + ', '
    survey_files = survey_files + '   ' + survey['data_file'] + '\n'
    survey_grouping_files = survey_grouping_files + '   ' + survey['column_grouping_file'] + '\n'
print('aggregating', survey_names, 'surveys of', institution, '...')
print('data is obtained from the following files respectively:')
print(survey_files, end='')
print('grouping of data columns is obtained from the following files respectively:')
print(survey_grouping_files, end='')
print('data is from ids in range', id_range[0], 'to', id_range[1], '.')
print('aggregation happens between {}-{:02d}-{:02d} and {}-{:02d}-{:02d}'.format(study_dates['start']['year'],
                                                                                 study_dates['start']['month'],
                                                                                 study_dates['start']['day'],
                                                                                 study_dates['end']['year'],
                                                                                 study_dates['end']['month'],
                                                                                 study_dates['end']['day']))
print('horizontal aggregation is stored in', h_aggregation_file)
print('the columns for horizontally aggregated data can be obtained/should match that listed in various groupings of file', h_aggregation_column_grouping_file)
print('vertical aggregation is stored in', v_aggregation_file)
print('the columns for vertically aggregated data can be obtained/should match that listed in various groupings of file', v_aggregation_column_grouping_file)

In [None]:
def process_affect(name, data, columns, dates_ids):
    cols = ['date', 'PID']
    cols.extend(columns)
    data = pd.merge(dates_ids, data[cols], on=['date', 'PID'], how='left')
    # NOTE I had to separate out merging and resetting index and reset index after renming columns because
    #      with data = pd.merge(dates_ids, data[cols], on=['date', 'PID'], how='left').set_index(['date', 'PID'])
    #      date ended up becoming datetime rather than remaining date. This messed up with the concatenation as
    #      other scales have date type and indexes don't match any more.
    renaming = {col:col+'_'+name for col in columns}
    data.rename(index=str, columns=renaming, inplace=True)
    data = data.set_index(['date', 'PID'])
    #print('{} - affect - size of perpped data: {}'.format(name, data.shape[0]))
    return data

In [None]:
def process_discrimination(name, data, columns, dates_ids):
    cols = ['start_date', 'PID']
    cols.extend(columns)
    data = data[cols]
    
    # shift the start date of surveys that are before 10am to the day before. these are late submissions
    # NOTE no survey can be started between midnight and 10am if the participant have not messed up 
    #      with the link. surveys of UW phase I where all sent after 10am and where at most valid to 
    #      answer before the midnight of the same day
    data.loc[data['start_date'].dt.time < datetime.time(10, 0, 0), 'start_date'] = \
        data.loc[data['start_date'].dt.time < datetime.time(10, 0, 0), 'start_date'] - pd.Timedelta('12 hour') 
    
    # call add_date based on the adjusted start_time
    add_date(data, 'start_date')
    
    # adjust the dates for weekly surveys to the day before
    if(name == 'weekly'):
        data.loc[:, 'date'] = data['date'] - pd.Timedelta('1 day')
    
    cols_unfair = columns.copy()
    cols_unfair.remove('unfair_not')
    if 'unfair_yesno' in cols_unfair: # weekly survey of UW phase I originally had this column
        # NOTE: responses to weekly EMA's of UW phase I are only valid for discrimination analysis 
        #      if unfair_yesno column is NULL while at least another unfair column is not NULL.
        cols_unfair.remove('unfair_yesno')
        unfair_reported = (data['unfair_yesno'].isnull() & data[cols_unfair].notnull().any(axis=1))
        no_unfair_reported = (data['unfair_yesno'].isnull() & data[cols_unfair].isnull().all(axis=1))
    else:
        unfair_reported = data[cols_unfair].notnull().any(axis=1)
        no_unfair_reported = data[cols_unfair].isnull().all(axis=1)
    
    data.loc[unfair_reported, 'discriminated'] = 'YES'
    data.loc[no_unfair_reported, 'discriminated'] = 'NO'
    
    cols = ['date', 'PID', 'discriminated']
    cols.extend(columns)
    
    data = pd.merge(dates_ids, data[cols], on=['date', 'PID'], how='left').set_index(['date', 'PID'])
    #print('{} - discrimination - size of perpped data: {}'.format(name, data.shape[0]))
    return data

In [None]:
def process_discrimination_cmu(name, data, columns, dates_ids):
    cols = ['start_date', 'PID']
    cols.extend(columns)
    data = data[cols]
    
    # shift the start date of surveys that are before 10am to the day before. these are late submissions
    # NOTE no survey can be started between midnight and 10am if the participant have not messed up 
    #      with the link. surveys of UW phase I where all sent after 10am and where at most valid to 
    #      answer before the midnight of the same day
    data.loc[data['start_date'].dt.time < datetime.time(9, 0, 0), 'start_date'] = \
        data.loc[data['start_date'].dt.time < datetime.time(9, 0, 0), 'start_date'] - pd.Timedelta('12 hour') 
    # NOTE: I have no idea about the timing of surveys for CMU phase II and I have not cleaned any data for 
    #       being late according to such timing. I'm going to assume the values I see at midnight for evening
    #       surveys belong to the day before. I use 8 instead of 10 as it seems CMU surveys were sent out at 9am.
    #       9am is the time that I see in the data.
    
    # call add_date based on the adjusted start_time
    add_date(data, 'start_date')
    
    #per_date = data.groupby(by=['PID', 'date']).size()
    #per_date = per_date[per_date > 1]
    #print(per_date) # to fine the late items that after adjustment become problematic as multiple submissions
    
    # adjust the dates for weekly surveys to the day before
    if(name == 'weekly'):
        data.loc[:, 'date'] = data['date'] - pd.Timedelta('1 day')
    
    unfair_reported = (data['unfair_type'] < 14) & (data['unfair_type'] > 0)
    no_unfair_reported = data['unfair_type'] == 14
    
    data.loc[unfair_reported, 'discriminated'] = 'YES'
    data.loc[no_unfair_reported, 'discriminated'] = 'NO'
    
    cols = ['date', 'PID', 'discriminated']
    cols.extend(columns)
    
    data = pd.merge(dates_ids, data[cols], on=['date', 'PID'], how='left').set_index(['date', 'PID'])
    #print('{} - discrimination - size of perpped data: {}'.format(name, data.shape[0]))
    return data

In [None]:
AGGREGATORS = {
   'time': None, 
   'identifier': None, 
   'affect': {'type': 'extend', 'function': process_affect}, 
   'discrimination': {'type': 'combine', 'function': process_discrimination_cmu}, 
   'social': None, 
   'workload': None, 
   'stress': None, 
   'stress_forecast': None, 
   'coping': None, 
   'regulation': None, 
   'resources': None, 
   'community': None, 
   'belonging': None, 
   'substance': None, 
   'activity': None, 
   'sleep': None, 
   'fatigue': None, 
   'overall_day': None
# if None, the scale cannot be processed
}

In [None]:
def add_date(df, col2date):
    df['date'] = df[col2date].dt.date
    df['date'] = df['date'].astype('datetime64[ns]')
    return df

In [None]:
# reading in data
data = {}
grouping = {}
for survey in surveys:
    data[survey['name']] = pd.read_csv(survey['data_file'],
                                       parse_dates=['start_date', 'end_date', 'recorded_date'])
    add_date(data[survey['name']], 'start_date')
    with open(survey['column_grouping_file'], 'r') as file_obj:
        grouping[survey['name']] = json.load(file_obj)

In [None]:
# setting up the fixed rows of aggregations
start_date = study_dates['start']
start_date = datetime.datetime(start_date['year'], 
                               start_date['month'], 
                               start_date['day'], 
                               start_date['hour'], 
                               start_date['minute'], 
                               start_date['second'])
end_date = study_dates['end']
end_date = datetime.datetime(end_date['year'],
                             end_date['month'], 
                             end_date['day'], 
                             end_date['hour'], 
                             end_date['minute'], 
                             end_date['second'])

dates = pd.DataFrame(pd.date_range(start=start_date, end=end_date, freq='D'), columns=['date'])
dates['dummykey'] = 1

participants = pd.DataFrame(list(range(id_range[0], id_range[1]+1)), columns=['PID'])
pids = [200,201,202,203,204,205,206,207,208,209,210,211,212,213,214,215,217,218,220,224,225,226,228,229,230,231,232,
        233,234,235,236,237,239,240,241,242,243,244,245,246,247,248,249,250,251,252,254,255,256,257,259,260,261,262,
        263,264,265,266,267,268,269,270,271,272,274,275,276,277,278,279,281,282,283,284,285,289,290,291,292,293,294,
        295,296,297,298,300,301,302,303,304,305,306,307,308,309,311,312,314,315,318,319,321,322,324,325,326,327,328,
        329,330,331,332,333,334,336,337,338,339,342,343,344,350,352,355,359,360,361,362,363,365,368,369,370,373,374,
        376,378,379,380,381,383,386,387,388,390,391,393,394,395,396,397,398,401,402,409,410,412,413,414,415,416,417,
        419,422,423,424,427,430,431,432,434,435,437,438,439,442,444,445,448,450,451,452,453,454,457,460,461,462,463,
        465,466,472,473,474,476,477,479,483,484,485,487,489,491,492,497,500,506,507,510,512,513,514,515,517,518,519,
        521,522,523,524,525,528,531,532,538,539,540,541,542,551,554,555,559,563,572,577,581,586,594,595,607,609,613,
        615,617,629,631,644,653,655,659,662,664,665,668,669,670,671,672,673,686,693,698,703,706,711,714] # CMU phase II
participants = pd.DataFrame(pids, columns=['PID'])
participants['dummykey'] = 1

dates_ids = pd.merge(dates, participants, on='dummykey')
dates_ids = dates_ids.drop(columns='dummykey')

# Horizontal Aggregation

In [None]:
with open(h_aggregation_column_grouping_file, 'r') as file_obj:
    aggregation_groups = json.load(file_obj)
# TO-DO loop through the items in horizontal column grouping and obtain a list of columns
# TO-DO create h_aggregated with those columns and dates_ids as index

In [None]:
# iterate through the scales that should be aggregated and call the processor function for each scale across surveys
scales = aggregation_groups.keys()
aggregated = {}
for scale in scales:
    for survey_name, survey_data in data.items():
        if scale not in grouping[survey_name]:
            #print('scale {} does not exist in {}'.format(scale, survey_name))
            continue
        if AGGREGATORS[scale] is None:
            #print('{} - procssing {} not supported'.format(survey_name, scale))
            continue
        
        aggregator_type = AGGREGATORS[scale]['type']
        aggregator = AGGREGATORS[scale]['function']
        prepped = aggregator(survey_name, survey_data, grouping[survey_name][scale], dates_ids)
        
        if aggregator_type == 'extend':
            if scale not in aggregated:
                aggregated[scale] = [prepped]
            else:
                aggregated[scale].append(prepped)
        elif aggregator_type == 'combine':
            if scale not in aggregated:
                aggregated[scale] = prepped
            else:
                aggregated[scale] = aggregated[scale].combine_first(prepped)

for scale in aggregated:
    if AGGREGATORS[scale]['type'] == 'extend':
        aggregated[scale] = pd.concat(aggregated[scale], axis=1)

aggregated = pd.concat(list(aggregated.values()), axis=1).reset_index()
# TO-DO insert aggregated into h_aggregated under appropriate columns

In [None]:
date_offset = pd.to_datetime(start_date) - pd.to_timedelta(1,unit='d')
aggregated['day'] = (aggregated['date'] - date_offset).dt.days
aggregated.sort_values(by=['PID', 'date'], ascending=[True, True], inplace=True)
# NOTE sorting by PID then date ensures shifting aligns along time works on values for the same person

In [None]:
columns = list(aggregated.columns)
columns.remove('day')
columns.insert(columns.index('date')+1, 'day')
aggregated = aggregated[columns]
aggregated.to_csv(h_aggregation_file, index=False)

# Vertical Aggregation

In [None]:
# TO-DO implementation of vertical aggregation with fixed columns

<span style="color:red">refactor the code below (e.g. to use config files or looping instead of code repetition) </span>

In [None]:
weekly = 'results/emaprep/weekly-numVal-internalID_cleaned_DISC_SUBS.csv'
weekly_groups = '/Users/yasaman/UWEXP/script-input/emas/scale_grouping-columns-weeklyEMA.json'

evening = 'results/emaprep/evening-numVal-internalID_cleaned_DISC.csv'
evening_groups = '/Users/yasaman/UWEXP/script-input/emas/scale_grouping-columns-eveningEMA.json'

midday2 = '' # TO-DO
midday2_groups = '' # TO-DO

midday1 = '' # TO-DO
midday1_groups = '' # TO-DO

morning = '' # TO-DO
morning_groups = '' # TO-DO

all_ = 'results/emaprep/all-numVal-internalID_cleaned_DISC_SUBS.csv'
all_groups = 'results/emaprep/scale_grouping-columns-cleaned_DISC_SUBS.json'

In [None]:
weekly = pd.read_csv(weekly)
with open(weekly_groups, 'r') as file_obj:
    weekly_groups = json.load(file_obj)
    
evening = pd.read_csv(evening)
with open(evening_groups, 'r') as file_obj:
    evening_groups = json.load(file_obj)
    
#midday2 = pd.read_csv(midday2)
#with open(midday2_groups, 'r') as file_obj:
#   midday2_groups = json.load(file_obj)

#midday1 = pd.read_csv(midday1)
#with open(midday1_groups, 'r') as file_obj:
#   midday1_groups = json.load(file_obj)

#morning = pd.read_csv(morning)
#with open(morning_groups, 'r') as file_obj:
#   morning_groups = json.load(file_obj)

In [None]:
#surveys = [weekly, evening, midday2, midday1, morning] #TO-DO
surveys = [weekly, evening]
concatenated = pd.concat(surveys)

In [None]:
groups = []
groups.extend(list(weekly_groups.keys()))
groups.extend(list(evening_groups.keys()))
#groups.append(list(midday2_groups.keys()))
#groups.append(list(midday1_groups.keys()))
#groups.append(list(morning_groups.keys()))
groups = list(set(groups))

In [None]:
scale_groupings = {}
for group in groups:
    scale_group = []
    if group in weekly_groups:
        scale_group.extend(weekly_groups[group])
    if group in evening_groups:
        scale_group.extend(evening_groups[group])
    scale_groupings[group] = list(set(scale_group))

In [None]:
with open(all_groups, 'w') as file_obj:
    json.dump(scale_groupings, file_obj)

In [None]:
columns = list(concatenated.columns)
columns_reordered = ['survey', 'start_date', 'recorded_date', 'pid', 'discriminated', 'any_drug', 'any_substance']
for col in columns_reordered:
    columns.remove(col)
columns_reordered.extend(columns)

In [None]:
concatenated = concatenated[columns_reordered]

In [None]:
concatenated.to_csv(all_, index=False)
# tested that the colums of concatenated is the union of the columns of the surveys
# tested that the number of rows of concatenated is the sum of the rows of the surveys

In [None]:
# TO-DO incorporate substance use data from morning surveys of the next day with evening surveys.
#       these represent the substance use on the same day as the discrimination and are comparable with 
#       substance use data in weekly suryves.
# TO-DO incorporate stress forcasting data from morning suryves of the next day with evening surveys.
#       these represent the status on the day after the discimination and are comparable with stress  
#       forcasting data in weekly surveys.