## Mistplay Data Engineer Take Home Assessment

The produced code should be able to acheive the following

1. remove duplicates over the columns id and created_at (considered simultaneously)
2. compute the rank of each user's user_score within each age group and output the rank in a new column called sub_group_rank
3. process the column widget_list by
    - flattening the list items i.e. each item in the list is put into its own row
    - extracting the values in the JSON elements into their own columns called widget_name and widget_amount
4. anonymize the column email and output the anonymized version in a new column email_anon. This column email_anon should have the following properties.
    - given an anonymized value the original value can be recovered
5. create a new table that is an inverted index that gives, for each country in location, which ids are located in that country
6. write the processed tables/data into separate parquet file(s). Exactly how the files/tables are organized is not as important as having all the data present.

Your code will be evaluated for correctness, scalability and maintainability.

##### Completed by: James Mesich
##### Completed on: February 25th 2021
##### Updated: February 28th 2021

# This notebook will be split into two parts
### 1. Functions that complete the tasks laid out
### 2. The script that brings everything together

# Part 1: Functions

#### Import Libraries

In [1]:
import pandas as pd
import base64
import warnings
from pandas.io.json import json_normalize

warnings.filterwarnings("ignore")

#### Remove duplicates over the columns id and created_at (considered simultaneously)

In [2]:
"""
Removes all duplicates based on the columns provided in the Dataframe
parameters:
    df: Dataframe
    columns: List of Strings
    
returns: Dataframe
"""
def remove_duplicates(df, columns):
    subset = columns 
    dups = sum(df.duplicated(subset=subset))
    if dups != 0 :
        print(str(dups)+" duplicates found.")
        df = df.drop_duplicates(subset=subset, keep='first')
        dups = sum(df.duplicated(subset=subset))
        if dups == 0:
            print("Success, all duplicates removed.")
    else:
        print("No duplicates found.")
     
    return df

#### Compute the rank of each user's user_score within each age group and output the rank in a new column called sub_group_rank

In [3]:
"""
Adds a new column to the dataframe, ranking a column by the designated group
parameters:
    df: Dataframe
    group: String, column header to group by
    ranked_value: String, column header to rank
    
returns: Dataframe
"""
def rank_by_attr(df,group,ranked_value):
    
    col_name = 'sub_group_rank_' + group
    df[col_name] = df.groupby(group)[ranked_value].rank("dense", ascending=False).astype(int)
    
    return df

#### Flatten the list items i.e. each item in the list is put into its own row. Extract the values in the JSON elements into their own columns called widget_name and widget_amount

In [4]:
"""
Flattens a column containing a list of dictionaires. 
parameters:
    df: Dataframe
    col_name: String, column to be flattened
    
returns: Dataframe
"""
def flatten_col(df, col_name,prefix):
    #get our values in a dataframe, json_normalize is much faster than .apply()
    #this line would have created a new df with no nan values. Not good for augmenting df
    #df2 = pd.concat([pd.DataFrame(pd.json_normalize(x)) for x in df[col_name]], ignore_index=True)
    df = df.explode(col_name)
    #apply is slow here 
    df = pd.concat([df.drop([col_name], axis=1), df[col_name].apply(pd.Series).add_prefix(str(prefix)+'_')], axis=1)
    df = df.drop([str(prefix)+'_0'], axis=1)
    return df

#### Anonymize the column email and output the anonymized version in a new column email_anon. This column email_anon should have the following properties

In [5]:
"""
Anonymizes the string.
parameters:
    x: String
    returns: encoded String
"""
#For a production level program, would have used something like Fernet.
def encode(x):
    x_bytes = x.encode('ascii')
    base64_bytes = base64.b64encode(x_bytes)
    base64_x = base64_bytes.decode('ascii')
    return base64_x

"""
Anonymizes the column
parameters:
    df: Dataframe
    col_name: String, column header to be anonymised 
    
returns: Dataframe
"""
def encode_col(df,col_name):
    #Better to use apply without lambda!
    df['anon_email'] = df['email'].apply(encode)
    return df

#### Create a new table that is an inverted index that gives, for each country in location, which ids are located in that country

In [6]:
"""
Creates a new table based on a key value 
parameters:
    df: Dataframe
    key: String, column name in the df
    value: String, column name in the df
    
returns: new Dataframe
"""
def invert_index(df, key, value):
    #get all the unique keys, ie country 
    keys=df[key].unique().tolist()
    #get the list of ids, make sure its unique
    groups = df.groupby(key)
    values = groups[value].unique() 
    #make the table
    key_value_table = {key: keys, value: values.values}
    return pd.DataFrame(key_value_table)
    

#### Write the processed tables/data into separate parquet file(s).

In [7]:
"""
Saves the dataframe in a gunzipped parquet file
parameters:
    df: Dataframe
    name: String, name of file

returns: None
"""
def save_file(df,name):
    df.to_parquet(name+'.parquet.gzip',compression='gzip')

#### Examine data

In [8]:
#need to know what I am working with
def examine_data(df):
    if df.empty:
        raise Exception("Data file is empty.")
    
    if df.isnull().values.any():
        print("There is missing data")
        print(df.isnull().sum())
    
    #Get some stats of the data first
    print("Column Variable types")
    print(df.dtypes)
    print(" ")
    print("Memory Usage by Column")
    print(df.memory_usage(deep=True))
    
    print("Size of df: "+str(df.size))
    print("Shape of df:" + str(df.shape))

# Part 2: Full Script

In [9]:
#read in the data
df = pd.read_json('data.json', lines=True)
#check for completeness and basic stats
examine_data(df)

Column Variable types
id                          object
email                       object
age_group                    int64
user_score                 float64
revenue                    float64
widget_list                 object
location                    object
created_at     datetime64[ns, UTC]
dtype: object
 
Memory Usage by Column
Index            128
id             93000
email          78904
age_group       8000
user_score      8000
revenue         8000
widget_list    72416
location       64448
created_at      8000
dtype: int64
Size of df: 8000
Shape of df:(1000, 8)


Objects take up a lot more memory than the int/floats/datetimes. 
Lets examine the plausibility of converting the location to categorical data. 
Note: email, widget list and id do not make much sense to do this as all entries are unique. Whereas location will usually have a limited number of values.

In [10]:
print(len(df['location'].unique()))

115


With only 115 unique locations, this is a good candidate to switch to categorical data

In [11]:
df2 = df.copy()
df2['location'] = df2['location'].astype("category")
print(df2.memory_usage(deep=True))
print(" ")
print("Reduction in memory")
print(df['location'].memory_usage(deep=True) / df2['location'].memory_usage(deep=True))

Index            128
id             93000
email          78904
age_group       8000
user_score      8000
revenue         8000
widget_list    72416
location       13652
created_at      8000
dtype: int64
 
Reduction in memory
4.686211901306241


With nearly 4.7 times less memory for the location category! That's huge!

In [12]:
#lets use the modified version and take a look at the data
df = df2
df.head(5)

Unnamed: 0,id,email,age_group,user_score,revenue,widget_list,location,created_at
0,25abf8a8-9d11-4e55-a003-246e458e9fb3,fguinness0@yolasite.com,2,0.467348,351.08,"[{'name': 'Eumetopias jubatus', 'amount': 7052...",Indonesia,2020-01-31 14:50:26+00:00
1,6bf052c9-1307-4b26-a24d-9c47bf747c87,fdebenham1@si.edu,1,0.929755,801.0,"[{'name': 'Bassariscus astutus', 'amount': 441...",Greece,2020-07-16 18:32:48+00:00
2,c3dc4870-463e-49d3-aba7-383cace03cd5,lleband2@hao123.com,3,0.473031,637.77,[],China,2019-09-26 05:26:24+00:00
3,42c18d61-34bb-4b27-a2cf-f43728c690f7,sbartolommeo3@imgur.com,4,0.31544,588.28,"[{'name': 'Manouria emys', 'amount': 72401}, {...",Finland,2020-01-17 09:26:34+00:00
4,e4c7b0ea-466e-4ada-afef-2f1d475be165,ebraben4@xing.com,3,0.009215,191.75,"[{'name': 'Mazama americana', 'amount': 35949}...",Armenia,2020-01-29 21:55:14+00:00


In [13]:
#remove duplicates based on these columns
df = remove_duplicates(df, ['id','created_at'])

6 duplicates found.
Success, all duplicates removed.


In [14]:
#add the sub group rank
df = rank_by_attr(df,'age_group','user_score')

In [15]:
#lets flatten the widget list
df = flatten_col(df, 'widget_list','widget')
df.head(10)

Unnamed: 0,id,email,age_group,user_score,revenue,location,created_at,sub_group_rank_age_group,widget_amount,widget_name
0,25abf8a8-9d11-4e55-a003-246e458e9fb3,fguinness0@yolasite.com,2,0.467348,351.08,Indonesia,2020-01-31 14:50:26+00:00,138,70520.0,Eumetopias jubatus
0,25abf8a8-9d11-4e55-a003-246e458e9fb3,fguinness0@yolasite.com,2,0.467348,351.08,Indonesia,2020-01-31 14:50:26+00:00,138,35573.0,Ramphastos tucanus
1,6bf052c9-1307-4b26-a24d-9c47bf747c87,fdebenham1@si.edu,1,0.929755,801.0,Greece,2020-07-16 18:32:48+00:00,14,44123.0,Bassariscus astutus
1,6bf052c9-1307-4b26-a24d-9c47bf747c87,fdebenham1@si.edu,1,0.929755,801.0,Greece,2020-07-16 18:32:48+00:00,14,52666.0,Lutra canadensis
1,6bf052c9-1307-4b26-a24d-9c47bf747c87,fdebenham1@si.edu,1,0.929755,801.0,Greece,2020-07-16 18:32:48+00:00,14,6977.0,Oryx gazella
2,c3dc4870-463e-49d3-aba7-383cace03cd5,lleband2@hao123.com,3,0.473031,637.77,China,2019-09-26 05:26:24+00:00,130,,
3,42c18d61-34bb-4b27-a2cf-f43728c690f7,sbartolommeo3@imgur.com,4,0.31544,588.28,Finland,2020-01-17 09:26:34+00:00,167,72401.0,Manouria emys
3,42c18d61-34bb-4b27-a2cf-f43728c690f7,sbartolommeo3@imgur.com,4,0.31544,588.28,Finland,2020-01-17 09:26:34+00:00,167,28076.0,Sagittarius serpentarius
3,42c18d61-34bb-4b27-a2cf-f43728c690f7,sbartolommeo3@imgur.com,4,0.31544,588.28,Finland,2020-01-17 09:26:34+00:00,167,83195.0,Mirounga angustirostris
4,e4c7b0ea-466e-4ada-afef-2f1d475be165,ebraben4@xing.com,3,0.009215,191.75,Armenia,2020-01-29 21:55:14+00:00,242,35949.0,Mazama americana


In [16]:
#Encode the email column
df = encode_col(df,'email')

In [17]:
#Test to make sure the encoding worked.
email = df['email'].iloc[0]
base64_message = df['anon_email'].iloc[0]
base64_bytes = base64_message.encode('ascii')
message_bytes = base64.b64decode(base64_bytes)
message = message_bytes.decode('ascii')
if message == email:
    print("Success!")

Success!


In [18]:
#Create the inverted index
loc_by_id_table = invert_index(df, 'location', 'id')

In [19]:
#save the two separate tables
save_file(df,"user_data")
save_file(loc_by_id_table,"location_data")