# Improving Data Aggregation Process

**Goal:** Write a more terse and comprehensible data aggregation script, based on pandas groupby.

**Development Process**:
- Load example data into df from ForkPoolWorker-1.txt. Note that this data formatting will mirror the formatting of the OCR data
- Aggregate df, retaining stats re. the old implementation
- Build new agg_func, verified by comparing stats to previous

In [4]:
os.getcwd()

'/Users/samjett/Documents/student_reviews'

In [4]:
# Data loader.py
try:
    os.chdir('/Users/samjett/Documents/student_reviews/backend')
except:
    pass
# global/PyPI
import pandas as pd
import json
import os
import hashlib
from pprint import pprint
from tqdm import tqdm
from copy import deepcopy
import numpy as np
# local
# from mongo import mongo_driver as db_conn
# aggregate_data.py contains the function to aggregate the data
from data_aggregation import aggregate_data

# Define the name of the database and the name of the collection. Insert each .csv record as a document within the collection
DB_NAME = "test-agg-db" # practice
OCR_DB_NAME = 'ocr_db_v4'
ocr_collections = ['CoA']#, 'CoAaS', 'CoA&GS', 'CoCE-DoA', 'MFPCoB', 'MCoEaE', 'JRCoE', 'GCoE', 'WFCoFA', 'HC', 'CoIS', 'GCoJaMC', 'CoPaCS', 'UC', 'CfIaDL', 'EWP', 'R-AF']

### DEBUG - force_update is always true - off in prod
def update_database(df):
    '''
    Get's data from OCR scraped collections (ocr_collections) in the MongoDB named OCR_DB_NAME, and runs aggregations on this data. Ensures that
    each of these datasets (native form and the aggregated form) exist within the DB_NAME Mongo database.
    :inputs:
    force_update: boolean denoting whether an update should be forced if the dataset and its aggregated form already exists in DB_NAME.
    :returns:
    connection: a connection to the mongo db named DB_NAME.
    '''

    # Establish DB connection
#     conn = db_conn()

    # Modify the ocr collections to achieve standard column naming form
    for ocr_coll in ocr_collections:
        print('Converting the scraped collection '+ocr_coll+ ' to pd dataframe.')
#         ocr_db = conn.get_db_collection(OCR_DB_NAME, ocr_coll)
#         df = pd.DataFrame(list(ocr_db.find()))

        ## Cleaning the data prior to aggregation
        df.drop(['_id'],axis=1, inplace=True).rename(columns ={'Individual Responses':'Responses'}, inplace=True)
        df['Instructor ID'] = (df['Instructor First Name']+df['Instructor Last Name']).apply(str).apply(hash).astype('int32').abs()
        df['Question Number'] = df['Question Number'].astype(int)
        df['Term Code'] = df['Term Code'].astype(int)
        # Make sure the First and Last names are in camelcase; i.e. no CHUNG-HAO LEE
        df['Instructor First Name'] = df['Instructor First Name'].apply(str.title)
        df['Instructor Last Name'] = df['Instructor Last Name'].apply(str.title)
        
        
        print('Loading ' + ocr_coll)
#         # If the original, native collection doesnt exist or if the update is forced
#         if conn.collection_existence_check(DB_NAME, ocr_coll)==False or force_update:
#             collection = conn.get_db_collection(DB_NAME, ocr_coll)

#             # Get the dataframe
#             df = db_dfs[ocr_coll]

#             # Delete all of the current contents from the collection
#             collection.delete_many({})
#             for i in tqdm(range(100)): # Splits df into 4 parts for uploading without AutoReconnect Error, especially for 
#                 # load the db for the given data file into a json format
#                 records = df[(i-1)*int(len(df)/4):i*int(len(df)/4)].to_dict('records')
#                 # try to update the database with the given data file 
#                 result = collection.insert_many(records)
#             else:
#                 records = df.to_dict('records')
#                 # try to update the database with the given data file 
#                 result = collection.insert_many(records)

#             # Update the user on what happened
#             print('A collection called ' + ocr_coll + ' was added to the database '+ DB_NAME + '.')

#         else:
#             print('A collection called ' + ocr_coll + ' already exists in the database '+ DB_NAME + ' and was unmodified.')
            
        # Check to see if the aggregated document already exists in the document in the database
        if True:  # conn.collection_existence_check(DB_NAME, 'aggregated_' + ocr_coll)==False or force_update:
#             collection = conn.get_db_collection(DB_NAME, 'aggregated_' + ocr_coll)
            # Get the dataframe
            df = db_dfs[ocr_coll]

            # Create the aggregated database 
            print('Aggregating the ' + ocr_coll )
            ag_df = aggregate_data(df)

            # load the db for the given data file into a json format
            ag_records = json.loads(ag_df.T.to_json()).values()

            # Delete all of the current contents from the collection
            collection.delete_many({})

            # Try to update the aggregated dataframe
#             ag_result = collection.insert_many(ag_records)

            # Update the user on what happened
            print('A collection called aggregated_'+ ocr_coll + ' was added to the database '+ DB_NAME + '.')

        else:
            print('A collection called aggregated_'+ ocr_coll + ' already exists in the database '+ DB_NAME + ' and was unmodified.')
            
    # Return the connection to the collection
#     return conn

def load_parsed_data(file):
    with open(file, "r") as f:
        lines = f.readlines()
        print(len(lines))
    objs = []
    for line in tqdm(lines):
        obj = eval(line)
        objs.append(obj)
    return objs

# Get the df for ForkPoolWorker-1.txt
def convert_parsed_files_to_df():
    files = ["ForkPoolWorker-1.txt"] # , "ForkPoolWorker-2.txt", "ForkPoolWorker-3.txt", "ForkPoolWorker-4.txt"
    parsed_data = []
    for file in tqdm(files):
        parsed_data += load_parsed_data(f'../ocr/{file}')
    return parsed_data

df = pd.DataFrame(convert_parsed_files_to_df())

  0%|          | 0/1 [00:00<?, ?it/s]
  0%|          | 0/79555 [00:00<?, ?it/s][A
  2%|▏         | 1589/79555 [00:00<00:04, 15885.09it/s][A

79555



  4%|▍         | 3226/79555 [00:00<00:04, 16026.77it/s][A
  6%|▌         | 4526/79555 [00:00<00:05, 14979.17it/s][A
  7%|▋         | 5744/79555 [00:00<00:05, 14012.23it/s][A
  9%|▉         | 7397/79555 [00:00<00:04, 14683.00it/s][A
 11%|█▏        | 9051/79555 [00:00<00:04, 15194.42it/s][A
 14%|█▎        | 10740/79555 [00:00<00:04, 15664.66it/s][A
 16%|█▌        | 12436/79555 [00:00<00:04, 16031.78it/s][A
 18%|█▊        | 14200/79555 [00:00<00:03, 16482.45it/s][A
 20%|█▉        | 15798/79555 [00:01<00:03, 16174.92it/s][A
 22%|██▏       | 17548/79555 [00:01<00:03, 16549.33it/s][A
 24%|██▍       | 19303/79555 [00:01<00:03, 16836.32it/s][A
 27%|██▋       | 21214/79555 [00:01<00:03, 17458.27it/s][A
 29%|██▉       | 23230/79555 [00:01<00:03, 18187.60it/s][A
 32%|███▏      | 25125/79555 [00:01<00:02, 18408.77it/s][A
 34%|███▍      | 26971/79555 [00:01<00:02, 17566.58it/s][A
 36%|███▋      | 28974/79555 [00:01<00:02, 18238.45it/s][A
 39%|███▊      | 30823/79555 [00:01<00:02, 1

In [23]:
%%capture
agg_df_orig = aggregate_data(df)

In [30]:
sorted(list(agg_df_orig.columns))

['Avg Course Rating',
 'Avg Department Rating',
 'Avg Instructor Rating In Section',
 'College Code',
 'Course Enrollment',
 'Course Number',
 'Course Rank in Department in Semester',
 'Course Title',
 'Instructor Enrollment',
 'Instructor First Name',
 'Instructor ID',
 'Instructor Last Name',
 'SD Course Rating',
 'SD Department Rating',
 'SD Instructor Rating In Section',
 'Subject Code',
 'Term Code',
 'course_uuid']

In [65]:
# Condition the df
## Cleaning the data prior to aggregation
df = df.drop(['_id'],axis=1, errors = 'ignore').rename(columns ={'Individual Responses':'Responses'})
df['Instructor ID'] = (df['Instructor First Name']+df['Instructor Last Name']).apply(str).apply(hash).astype('int32').abs()
df['course_uuid'] = (df['Subject Code']+df['Course Number'].apply(str)+df['Section Title'].apply(lambda x: x[:-4])).apply(str).apply(hash).astype('int32').abs().apply(str) 
df['Question Number'] = df['Question Number'].astype(int)
df['Term Code'] = df['Term Code'].astype(int)
# Make sure the First and Last names are in camelcase; i.e. no CHUNG-HAO LEE
df['Instructor First Name'] = df['Instructor First Name'].apply(str.title)
df['Instructor Last Name'] = df['Instructor Last Name'].apply(str.title)

In [66]:
def combine_standard_deviations(sd_list, mean_list,pop_list):
    '''
    This function will take lists of standard deviations, means, population sizes, and weights for each list unit. The function
    will combine the lists to produce a standard deviation for the group, based on the input parameters. Formula for combining the SD taken from the below link:
    
    https://www.researchgate.net/post/How_to_combine_standard_deviations_for_three_groups
    
    * All lists must be same length
    '''
    # Check for equal sized lists
    if not len(sd_list) == len(mean_list)==len(pop_list):
        print('All input lists to the function -- combine_standard_deviations -- must be of the same length.')
    # Compute the weighted mean by populations
    pop_mean = np.sum(mean_list*pop_list)/(np.sum(pop_list))
    # Compute the deviance
    deviance = np.sum((pop_list)*(sd_list**2) + (pop_list)*(mean_list - pop_mean)**2)
    # Compute the standard deviation
    sd = np.sqrt(deviance/(np.sum(pop_list)))
    return sd

def aggregate_df_groupby(df):
    """
    Aggregates a pandas dataframe of student reviews data. See the First, Second, and Third Operations below for more descriptions of functions.
    Note the similar form across operations.
    :Inputs:
    - df: A pandas dataframe, slightly modified from db results (see conditioning in data_loader.py)
    :Returns:
    - ag_df: An aggregated version of the same dataframe
    """
    ag_df = deepcopy(df)
    
    # Remove the repeat rows that will occur because we are taking 1-10 question responses down to 1
    ag_df = ag_df.drop_duplicates(subset = ['Term Code', 'course_uuid', 'Instructor ID'])
    
    # First Operation: Combine sections of the same course taught by the same instructor in the same semester
    # we'll use these local functions
    weighted_mean_instr = lambda x: np.average(x.values, weights=df.loc[x.index, 'Responses'].values)
    weighted_stdevs_instr = lambda x: combine_standard_deviations(x.values, \
                                                                    mean_list = df.loc[x.index, 'Mean'].values, \
                                                                    pop_list = df.loc[x.index, 'Responses'].values)
    ag_df['Avg Instructor Rating In Section'] = df.groupby(['Term Code', 'course_uuid', 'Instructor ID'])['Mean'].transform(weighted_mean_instr)
    ag_df['SD Instructor Rating In Section'] = df.groupby(['Term Code', 'course_uuid', 'Instructor ID'])['Standard Deviation'].transform(weighted_stdevs_instr)
    ag_df['Instructor Enrollment'] = ag_df.groupby(['Term Code', 'course_uuid', 'Instructor ID'])['Responses'].transform('sum') # Instr enrollment is sum of section enrollment
    
    
    # Second Operation: Combine Instructors in Courses to get the Average Course metrics
    # we'll use these local functions, which tell the groupby which columns to analyze
    weighted_mean_course = lambda x: np.average(x.values, weights=ag_df.loc[x.index, 'Instructor Enrollment'].values)
    weighted_stdevs_course = lambda x: combine_standard_deviations(x.values, \
                                                                    mean_list = ag_df.loc[x.index, 'Avg Instructor Rating In Section'].values, \
                                                                    pop_list = ag_df.loc[x.index, 'Instructor Enrollment'].values)
    ag_df['Avg Course Rating'] = ag_df.groupby(['Term Code', 'course_uuid'])['Avg Instructor Rating In Section'].transform(weighted_mean_course)
    ag_df['SD Course Rating'] = ag_df.groupby(['Term Code', 'course_uuid'])['SD Instructor Rating In Section'].transform(weighted_stdevs_course)
    ag_df['Course Enrollment'] = ag_df.groupby(['Term Code', 'course_uuid'])['Instructor Enrollment'].transform('sum') # Course enrollment sum of constituent instr enrollments
    ag_df['Course Rank in Department in Semester']= ag_df.groupby(['Term Code', 'College Code', 'Subject Code'])['Avg Course Rating'].rank(method ='dense',na_option='top', ascending=False).apply(int)

    
    # Third Operation: Combine Courses inside of a department to get average department metrics
    weighted_mean_dept = lambda x: np.average(x.values, weights=ag_df.loc[x.index, 'Course Enrollment'].values)
    weighted_stdevs_dept = lambda x: combine_standard_deviations(x.values, \
                                                                    mean_list = ag_df.loc[x.index, 'Avg Instructor Rating In Section'].values, \
                                                                    pop_list = ag_df.loc[x.index, 'Course Enrollment'].values)
    ag_df['Avg Department Rating'] = ag_df.groupby(['Term Code', 'College Code', 'Subject Code'])['Avg Course Rating'].transform(weighted_mean_dept)
    ag_df['SD Department Rating'] = ag_df.groupby(['Term Code', 'College Code', 'Subject Code'])['SD Course Rating'].transform(weighted_stdevs_dept)
    
    # Rename the necessary columns
    ag_df = ag_df.rename(columns = {'Section Title':'Course Title'})
    # Drop unnecessary columns, mainly leftovers from df
    ag_df = ag_df.drop(columns = ['Question', 'Question Number', 'Responses','Mean', 'Standard Deviation'], errors = 'ignore')
    return ag_df

ag_df_new = aggregate_df_groupby(df)

In [67]:
len(ag_df_new)
len(agg_df_orig)
cols = []
def compare_aggs():
    for dd in [agg_df_orig, ag_df_new.drop(columns = ['Course ID'], errors='ignore')]:
#         display(dd.loc[:20, :].reindex(sorted(dd.columns), axis=1))
        display(dd[(dd['Term Code']==201720) & (dd['Subject Code']=='MATH') & (dd['Course Number'] == 2924)].reindex(sorted(dd.columns), axis=1))
compare_aggs()
    

Unnamed: 0,Avg Course Rating,Avg Department Rating,Avg Instructor Rating In Section,College Code,Course Enrollment,Course Number,Course Rank in Department in Semester,Course Title,Instructor Enrollment,Instructor First Name,Instructor ID,Instructor Last Name,SD Course Rating,SD Department Rating,SD Instructor Rating In Section,Subject Code,Term Code,course_uuid
0,3.950457,3.704753,3.3,CoAaS,169,2924,7,Disc-MATH 2924-030,17,Ling,1012622236,Jin,1.095908,1.205724,1.097473,MATH,201720,math2924
8860,3.950457,3.704753,3.706521,CoAaS,169,2924,7,Diff & Integral Calculus II,51,Leslie,2029323307,Davidson-Rossier,1.095908,1.205724,1.272648,MATH,201720,math2924
29735,3.950457,3.704753,4.19512,CoAaS,169,2924,7,Diff & Integral Calculus II,86,Roi,811226286,Docampo Alvarez,1.095908,1.205724,0.831184,MATH,201720,math2924
54115,3.950457,3.704753,4.114284,CoAaS,169,2924,7,Disc-MATH 2924-010,15,Jonathan,956339769,Merlini,1.095908,1.205724,1.283281,MATH,201720,math2924


Unnamed: 0,Avg Course Rating,Avg Department Rating,Avg Instructor Rating In Section,College Code,Course Enrollment,Course Number,Course Rank in Department in Semester,Course Title,Instructor Enrollment,Instructor First Name,Instructor ID,Instructor Last Name,SD Course Rating,SD Department Rating,SD Instructor Rating In Section,Subject Code,Term Code,course_uuid
0,3.571285,3.708398,3.3,CoAaS,50,2924,16,Disc-MATH 2924-030,17,Ling,1012622236,Jin,1.333018,1.28821,1.097473,MATH,201720,605307480
8860,4.130806,3.708398,3.823529,CoAaS,104,2924,7,Diff & Integral Calculus II,18,Leslie,2029323307,Davidson-Rossier,0.913587,1.28821,1.18633,MATH,201720,394716444
12430,3.571285,3.708398,3.375,CoAaS,50,2924,16,Disc-MATH 2924-050,18,Leslie,2029323307,Davidson-Rossier,1.333018,1.28821,1.439557,MATH,201720,605307480
29735,4.130806,3.708398,4.19512,CoAaS,104,2924,7,Diff & Integral Calculus II,86,Roi,811226286,Docampo Alvarez,0.913587,1.28821,0.831184,MATH,201720,394716444
54115,3.571285,3.708398,4.114284,CoAaS,50,2924,16,Disc-MATH 2924-010,15,Jonathan,956339769,Merlini,1.333018,1.28821,1.283281,MATH,201720,605307480


In [None]:
# MATH	201720	math2924