# Data Cleaning and Processing

## Extract, Transform, Load

This notebook outlines the Extract, Transform, and Load (ETL) process as a part of the Nordeus Data Engineering Challenge. Each section of this notebook methodically walks through these steps, showcasing efficient and reliable data processing techniques.

In this notebook, a comprehensive JSON file containing all relevant data is processed and extracted. The extracted data is then systematically organized into three distinct pandas dataframes, followed by thorough cleaning and refinement. Finally, the refined data is efficiently stored in a PostgreSQL database, leveraging ElephantSQL's cloud services for reliable and scalable data management. This approach ensures a streamlined data handling process from extraction to storage.

In [2]:
import pandas as pd

In [3]:
%pip install psycopg2-binary

Collecting psycopg2-binary
  Downloading psycopg2_binary-2.9.9-cp39-cp39-macosx_10_9_x86_64.whl (2.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.8/2.8 MB[0m [31m16.7 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hInstalling collected packages: psycopg2-binary
Successfully installed psycopg2-binary-2.9.9
Note: you may need to restart the kernel to use updated packages.


In [4]:
%pip install sqlalchemy

Note: you may need to restart the kernel to use updated packages.


In [7]:
data = pd.read_json('events.jsonl', lines=True)

In [8]:
data

Unnamed: 0,event_id,event_type,event_timestamp,event_data
0,14944,login,1274385529,{'user_id': '5e7a47f6-683b-11ee-aca7-8699b86be...
1,9218,login,1274139718,{'user_id': '60fc4c5e-683b-11ee-aca7-8699b86be...
2,40146,logout,1274486750,{'user_id': '6f956d90-683b-11ee-aca7-8699b86be...
3,8239,login,1273967219,{'user_id': '5d3fb182-683b-11ee-aca7-8699b86be...
4,21742,login,1274572691,{'user_id': '658a8b14-683b-11ee-aca7-8699b86be...
...,...,...,...,...
47770,7537,login,1273448795,{'user_id': '5cf4f458-683b-11ee-aca7-8699b86be...
47771,21510,login,1274572715,{'user_id': '7e6a063c-683b-11ee-aca7-8699b86be...
47772,26960,login,1274572784,{'user_id': '686ae1da-683b-11ee-aca7-8699b86be...
47773,41969,logout,1274191200,{'user_id': '61ec8638-683b-11ee-aca7-8699b86be...


The `event_data` column in the dataset contains nested JSON objects, which are complex structures not ideally suited for direct analysis in a tabular format like a pandas DataFrame. To make this data more accessible and easier to analyze, it's important to 'flatten' or normalize these nested JSON objects.

The code provided bellow does exactly this:

In [9]:
normalized_event_data = pd.json_normalize(data['event_data'])
data = pd.concat([data.drop('event_data', axis=1), normalized_event_data], axis=1)

In [10]:
data

Unnamed: 0,event_id,event_type,event_timestamp,user_id,country,name,device_os,marketing_campaign,transaction_currency,transaction_amount
0,14944,login,1274385529,5e7a47f6-683b-11ee-aca7-8699b86be788,,,,,,
1,9218,login,1274139718,60fc4c5e-683b-11ee-aca7-8699b86be788,,,,,,
2,40146,logout,1274486750,6f956d90-683b-11ee-aca7-8699b86be788,,,,,,
3,8239,login,1273967219,5d3fb182-683b-11ee-aca7-8699b86be788,,,,,,
4,21742,login,1274572691,658a8b14-683b-11ee-aca7-8699b86be788,,,,,,
...,...,...,...,...,...,...,...,...,...,...
47770,7537,login,1273448795,5cf4f458-683b-11ee-aca7-8699b86be788,,,,,,
47771,21510,login,1274572715,7e6a063c-683b-11ee-aca7-8699b86be788,,,,,,
47772,26960,login,1274572784,686ae1da-683b-11ee-aca7-8699b86be788,,,,,,
47773,41969,logout,1274191200,61ec8638-683b-11ee-aca7-8699b86be788,,,,,,


The large DataFrame, data, is being separated into three distinct DataFrames based on the `event_type` column. This separation is guided by the requirements, which dictate the data modeling.

In [11]:
common_columns = ['event_id', 'event_timestamp', 'event_type', 'user_id']

# Define columns for each event type
columns = {
    'registration': common_columns + ['name', 'country', 'device_os', 'marketing_campaign'],
    'transaction': common_columns + ['transaction_amount', 'transaction_currency'],
    'user_activity': common_columns
}

# Filter and select columns for each DataFrame
df_registration = data[data['event_type'] == 'registration'][columns['registration']]
df_transaction = data[data['event_type'] == 'transaction'][columns['transaction']]
df_user_activity = data[data['event_type'].isin(['login', 'logout'])][columns['user_activity']]

In [12]:
df_registration

Unnamed: 0,event_id,event_timestamp,event_type,user_id,name,country,device_os,marketing_campaign
8,1175,1274141527,registration,63faa66c-683b-11ee-aca7-8699b86be788,Lope Cañellas,ES,Android,influencer_marketing
15,6563,1274534871,registration,846202ce-683b-11ee-aca7-8699b86be788,Dr. Ulrich Hauffer,DE,iOS,
18,1201,1274226584,registration,6422113e-683b-11ee-aca7-8699b86be788,Mr. Daniel Taylor,US,Android,
22,3127,1274341838,registration,6fc81560-683b-11ee-aca7-8699b86be788,Horst Meyer,DE,iOS,social_media_advertising
23,1392,1274281891,registration,65494f0a-683b-11ee-aca7-8699b86be788,Jorge Poza-Soto,ES,Android,social_media_advertising
...,...,...,...,...,...,...,...,...
47756,7210,1274523804,registration,88444f82-683b-11ee-aca7-8699b86be788,Luzie Neuschäfer,DE,iOS,
47757,6811,1274502836,registration,85dd2962-683b-11ee-aca7-8699b86be788,Renata Mascagni,IT,iOS,social_media_advertising
47764,6232,1274530387,registration,82619444-683b-11ee-aca7-8699b86be788,Carolyn Davis,US,iOS,email_marketing
47765,2635,1274313784,registration,6ccdbfe0-683b-11ee-aca7-8699b86be788,Taylor Allen,US,Android,social_media_advertising


In [13]:
df_transaction

Unnamed: 0,event_id,event_timestamp,event_type,user_id,transaction_amount,transaction_currency
137,47655,1274572519,transaction,81cebb24-683b-11ee-aca7-8699b86be788,1.99,USD
312,47610,1274570710,transaction,734ed624-683b-11ee-aca7-8699b86be788,0.99,USD
519,47638,1274570626,transaction,6017708e-683b-11ee-aca7-8699b86be788,2.99,USD
523,47530,1274486320,transaction,74b67418-683b-11ee-aca7-8699b86be788,0.99,USD
534,47223,1274080210,transaction,603a31fa-683b-11ee-aca7-8699b86be788,0.99,EUR
...,...,...,...,...,...,...
47638,47234,1274134119,transaction,5e2cfb18-683b-11ee-aca7-8699b86be788,1.99,EUR
47641,47593,1274572707,transaction,70d1a1d8-683b-11ee-aca7-8699b86be788,2.99,USD
47747,47365,1274379342,transaction,60bfe160-683b-11ee-aca7-8699b86be788,0.99,USD
47752,47368,1274376333,transaction,651f6a00-683b-11ee-aca7-8699b86be788,2.99,EUR


In [14]:
df_user_activity

Unnamed: 0,event_id,event_timestamp,event_type,user_id
0,14944,1274385529,login,5e7a47f6-683b-11ee-aca7-8699b86be788
1,9218,1274139718,login,60fc4c5e-683b-11ee-aca7-8699b86be788
2,40146,1274486750,logout,6f956d90-683b-11ee-aca7-8699b86be788
3,8239,1273967219,login,5d3fb182-683b-11ee-aca7-8699b86be788
4,21742,1274572691,login,658a8b14-683b-11ee-aca7-8699b86be788
...,...,...,...,...
47770,7537,1273448795,login,5cf4f458-683b-11ee-aca7-8699b86be788
47771,21510,1274572715,login,7e6a063c-683b-11ee-aca7-8699b86be788
47772,26960,1274572784,login,686ae1da-683b-11ee-aca7-8699b86be788
47773,41969,1274191200,logout,61ec8638-683b-11ee-aca7-8699b86be788


## Data Cleaning Process

In this section, we focus on cleaning the data to ensure its quality and relevance for our analysis.

In [15]:
transaction_amount_valid = [0.99, 1.99, 2.99, 4.99, 9.99]

# Drop rows where 'user_id' is null
df_user_activity = df_user_activity.dropna(subset=['user_id'])
df_transaction = df_transaction.dropna(subset=['user_id'])
df_registration = df_registration.dropna(subset=['user_id'])

# Drop duplicates events
df_user_activity = df_user_activity.drop_duplicates(subset='event_id')
df_transaction = df_transaction.drop_duplicates(subset='event_id')
df_registration = df_registration.drop_duplicates(subset='event_id')

# Keep only rows where 'transaction_currency' is 'USD' or 'EUR'
df_transaction_= df_transaction[df_transaction['transaction_currency'].isin(['USD', 'EUR'])]

# Keep only rows where 'transaction_amount' is in the list of valid amounts
df_transaction = df_transaction[df_transaction['transaction_amount'].isin(transaction_amount_valid)]

In [17]:
# This step is a part of data validation, where we check for any unexpected or disallowed values in the marketing campaign data
# Since all values in this column align with our criteria and no disallowed values are detected, we decide to maintain the columns as is

print(df_registration.marketing_campaign.unique())
print(df_registration.device_os.unique())

['influencer_marketing' '' None 'social_media_advertising'
 'email_marketing']
['Android' 'iOS' 'Web']


In [None]:
import psycopg2

# ElephantSQL details
# Change '***' with real credentials

dbname = '***'
user = '***'
password = '***'
host = '***'

In [None]:
# Create connection to postgres

pg_conn = psycopg2.connect(dbname=dbname, user=user,
                           password=password, host=host)

In [None]:
from sqlalchemy import create_engine

# Creating engine for database connection using SQLAlchemy's create_engine function
# Change '***' with real connection string and credentials

engine = create_engine('***', creator=lambda: pg_conn)

In [None]:
# Saving dataframes to SQL datbase

df_registration.to_sql('registration', engine, if_exists='replace', index=False)
df_transaction.to_sql('transaction', engine, if_exists='replace', index=False)
df_user_activity.to_sql('user_activity', engine, if_exists='replace', index=False)

In [None]:
# Disposing the enging and closing the connection

engine.dispose()
pg_conn.close()