# Spark Network Services GmbH
### Data Warehouse Challenge 2021: Junior Data Engineer

##### The content of this document is confidential and shall not be shared with other than the candidate Spark employees

We are super excited to get to know you. To assess your skillset, we have an ELT/ETL challenge for
you. The assessment of Python (HTTP Request, JSON handler, database integration), SQL (DML/DDL)
and git is compulsory.

It’s required for you deliver your git project with instructions on how to reproduce it.
Background context:
One of our Product Owners came to us asking to create a new pipeline, and his team are looking
forward understanding this data. They don’t have any documentation apart from these 2 endpoints:
https://619ca0ea68ebaa001753c9b0.mockapi.io/evaluation/dataengineer/jr/v1/users
https://619ca0ea68ebaa001753c9b0.mockapi.io/evaluation/dataengineer/jr/v1/messages

##### The requirement for you is:

1) Collect all the data just once from these endpoints and create/populate the tables (user,
subscription and message).

2) Product Owners do intend to produce metrics based on date, age, city, country, email
domain, gender, smoking condition, income, subscriptions and messages. It is your
responsibility to propose how to model the tables, columns and relationships. PII
handling should be considered, so that no sensitive data can be accessed by the final
users.

3) The Product Owner asked you to provide the queries for some scenarios, please
add a file sql_test.sql in your project with the queries that solve the below
questions:

 How many total messages are being sent every day?

 Are there any users that did not receive any message?

 How many active subscriptions do we have today?

 How much is the average price ticket (sum amount subscriptions / count subscriptions) breakdown by year/month (format YYYY-MM)?

##### Privacy Requirements:
We need to be GDPR compliant, so we are very concerned about data privacy. It is important that
any sensitive user information is not exposed
Imported fields must be privacy-protected in the following way:

 Remove all PIIs.

 When required the type of anonymization is up to you.

 Emails: it is mandatory to discard the username part on import and to keep only the
domain (i.e. mickey.mouse@disney.com =&gt; disney.com).

 Other fields: is up to you to decide if those can be relevant, according to the task.

 Do not import the chat messages, this is extremely sensitive information.

##### Important to know:
For this test, any data shown here doesn’t represent the official data model from Spark Networks
or expose any real data from our customers. All the data are fake and generated by
https://mockapi.io/.

#### Thank you very much for investing your time to solve this challenge!

Starting the Project

## Data Extraction and Visualization

In [1]:
import requests

try:
    responseUser = requests.get('https://619ca0ea68ebaa001753c9b0.mockapi.io/evaluation/dataengineer/jr/v1/users')
    jsonRespUser = responseUser.json()

except HTTPError as httpError:
    print(f'HTTP Error occored: {httpError}.')
except Exception as err:
    print(f'Other error occored: {err}.')

In [2]:
try:
    responseMessages = requests.get('https://619ca0ea68ebaa001753c9b0.mockapi.io/evaluation/dataengineer/jr/v1/messages')
    jsonRespMessages = responseMessages.json()

except HTTPError as httpError:
    print(f'HTTP Error occored: {httpError}.')
except Exception as err:
    print(f'Other error occored: {err}.')

In [3]:
#to better visualization
import pandas as pd
# First test
df_users = pd.json_normalize(jsonRespUser)

In [4]:
#dealing with the age
from datetime import date, datetime

df_users['birthDate'] = pd.to_datetime(df_users['birthDate'])
#getting todays date
today = date.today()
today = pd.to_datetime('today')
df_users['age'] =  (today.year - df_users['birthDate'].dt.year) - ((today.month - df_users['birthDate'].dt.month) < 0)

# extracting the domain
df_users['domain'] = df_users['email'].str.split('@').str[1]

# dropping unwanted columns
df_users = df_users.drop(columns=['firstName', 'lastName', 'address', 'zipCode', 'subscription', 'birthDate', 'email'])

In [5]:
# making sure that the fields have the write ... int, datetime, bollean, string
#casting
df_users['createdAt'] = pd.to_datetime(df_users['createdAt'])
df_users['updatedAt'] = pd.to_datetime(df_users['updatedAt'])
df_users['profile.isSmoking'] = df_users['profile.isSmoking'].astype('bool')
df_users['profile.income'] = df_users['profile.income'].astype(float)
df_users['id'] = df_users['id'].astype(int)
df_users = df_users.rename(columns={'id': 'user_id'}, index={'ONE': 'Row_1'})
df_users = df_users.rename(columns={'profile.income': 'profile_income'}, index={'ONE': 'Row_1'})
df_users = df_users.rename(columns={'profile.isSmoking': 'profile_isSmoking'}, index={'ONE': 'Row_1'})
df_users = df_users.rename(columns={'profile.gender': 'profile_gender'}, index={'ONE': 'Row_1'})
df_users = df_users.rename(columns={'profile.profession': 'profile_profession'}, index={'ONE': 'Row_1'})

#reorganizign the fields
df_users = df_users[['user_id','createdAt','updatedAt', 'city', 'country','domain','age','profile_gender','profile_isSmoking','profile_profession','profile_income']]
df_users

Unnamed: 0,user_id,createdAt,updatedAt,city,country,domain,age,profile_gender,profile_isSmoking,profile_profession,profile_income
0,1,2021-11-23 16:10:33.614000+00:00,2021-11-24 13:34:15.404000+00:00,Pembroke Pines,United States,hotmail.com,1,male,True,Central Configuration Planner,3709.61
1,2,2021-11-23 02:40:10.964000+00:00,2021-11-24 16:04:42.393000+00:00,Meganemouth,Namibia,gmail.com,0,male,True,Corporate Tactics Strategist,1504.25
2,3,2021-11-23 06:26:53.843000+00:00,2021-11-24 06:37:01.117000+00:00,Port Cary,Bulgaria,gmail.com,0,female,False,Senior Quality Manager,3256.41
3,4,2021-11-23 03:27:56.458000+00:00,2021-11-24 17:00:21.524000+00:00,South Christophe,United States,gmail.com,0,male,True,Internal Division Agent,758.89
4,5,2021-11-23 14:57:27.793000+00:00,2021-11-24 05:23:38.587000+00:00,East Darrionhaven,Iceland,gmail.com,0,female,False,Central Paradigm Agent,2658.19
5,6,2021-11-22 19:14:23.721000+00:00,2021-11-24 02:56:07.833000+00:00,,United States,yahoo.com,0,,True,Customer Security Producer,


In [11]:
df_subscriptions = pd.json_normalize(jsonRespUser, meta=['id'], record_path=['subscription'])
df_subscriptions['createdAt'] = pd.to_datetime(df_subscriptions['createdAt'])
df_subscriptions['startDate'] = pd.to_datetime(df_subscriptions['startDate'])
df_subscriptions['endDate'] = pd.to_datetime(df_subscriptions['endDate'])
df_subscriptions['amount'] = df_subscriptions['amount'].astype(float)
df_subscriptions['id'] = df_subscriptions['id'].astype(int)
df_subscriptions = df_subscriptions.rename(columns={'id': 'user_id'}, index={'ONE': 'Row_1'})
df_subscriptions

Unnamed: 0,createdAt,startDate,endDate,status,amount,user_id
0,2021-11-24 16:58:46.581000+00:00,2021-11-24 05:12:49.301000+00:00,2022-09-15 06:05:59.630000+00:00,Active,43.18,1
1,2021-11-24 14:36:18.895000+00:00,2021-11-24 12:57:48.724000+00:00,2022-07-13 09:14:04.001000+00:00,Active,23.78,2
2,2021-11-22 23:41:32.927000+00:00,2021-11-23 14:42:04.416000+00:00,2022-07-26 17:06:45.413000+00:00,Rejected,64.75,3
3,2021-11-23 18:57:20.540000+00:00,2021-11-24 18:04:41.908000+00:00,2022-03-03 09:47:26.916000+00:00,Active,88.6,3
4,2021-11-23 05:23:29.452000+00:00,2021-11-23 09:24:30.685000+00:00,2022-03-25 10:14:15.548000+00:00,Rejected,15.98,4
5,2021-11-24 02:07:08.482000+00:00,2021-11-24 12:47:33.246000+00:00,2021-12-10 20:22:36.132000+00:00,Inactive,3.51,4
6,2021-11-24 14:40:24.257000+00:00,2021-11-24 11:22:33.265000+00:00,2022-11-23 12:41:28.319000+00:00,Active,89.71,5


In [7]:
df_messages = pd.json_normalize(jsonRespMessages)
df_messages['createdAt'] = pd.to_datetime(df_messages['createdAt'])
df_messages['id'] = df_messages['id'].astype(int)
df_messages['receiverId'] = df_messages['receiverId'].astype(int)
df_messages['senderId'] = df_messages['senderId'].astype(int)
df_messages = df_messages.rename(columns={'id': 'message_id'}, index={'ONE': 'Row_1'})
df_messages = df_messages.drop(columns=['message'])
df_messages = df_messages[['message_id', 'createdAt', 'receiverId','senderId']]
df_messages

Unnamed: 0,message_id,createdAt,receiverId,senderId
0,1,2021-11-25 12:18:57.208000+00:00,2,1
1,2,2021-11-25 15:26:33.436000+00:00,3,2
2,3,2021-11-25 21:55:29.995000+00:00,1,3
3,4,2021-11-26 03:09:45.900000+00:00,1,4
4,5,2021-11-26 09:15:42.912000+00:00,1,5
5,6,2021-11-27 06:42:02.172000+00:00,2,6
6,7,2021-11-27 06:38:39.424000+00:00,5,1
7,8,2021-11-27 07:16:37.817000+00:00,6,2
8,9,2021-11-27 12:38:30.049000+00:00,1,3
9,10,2021-11-27 21:11:57.106000+00:00,3,1


## Preparing data base for loading

In [8]:
import sqlalchemy
from credentials import mysql_db_config
import sqlalchemy
from sqlalchemy import *

engine = sqlalchemy.create_engine(f'mysql+pymysql://{mysql_db_config["user"]}:{mysql_db_config["password"]}@{mysql_db_config["host"]}') # connect to server

#check if database exists if not add the database
engine.execute(f"CREATE DATABASE IF NOT EXISTS {mysql_db_config['database']};") #create db

<sqlalchemy.engine.cursor.LegacyCursorResult at 0x25c9afc2520>

In [9]:
#adding the database to the engine call
engine = sqlalchemy.create_engine(f'mysql+pymysql://{mysql_db_config["user"]}:{mysql_db_config["password"]}@{mysql_db_config["host"]}/{mysql_db_config["database"]}')

In [10]:
#if doesn't exist the table we create it!
#this table is not required but is part of the research
def make_table_users():
    meta = MetaData()

    users = Table(
       'users', meta, 
        Column('user_id', Integer), 
        Column('createdAt', DateTime), 
        Column('updatedAt', DateTime), 
        Column('city', String(32)),
        Column('country', String(32)),
        Column('domain', String(32)),
        Column('age', Integer),
        Column('profile_gender', String(32)),
        Column('profile_isSmoking', Boolean),
        Column('profile_profession', String(100)),
        Column('profile_income', Float))

    users.create(engine, checkfirst=True) #checks if already exists first

In [11]:
#if doesn't exist the table we create it!
#this table is not required but is part of the research
def make_table_subscriptions():
    meta = MetaData()

    subscriptions = Table(
       'subscription', meta, 
        Column('subscription_id', Integer, primary_key=True), 
        Column('createdAt', DateTime), 
        Column('startDate', DateTime),
        Column('endDate', DateTime),
        Column('status', String(32)),
        Column('amount', Float),
        Column('user_id', Integer))

    subscriptions.create(engine, checkfirst=True)

In [12]:
#if doesn't exist the table we create it!
#this table is not required but is part of the research
def make_table_messages():
    meta = MetaData()

    messages = Table(
       'messages', meta, 
        Column('message_id', Integer), 
        Column('createdAt', DateTime), 
        Column('receiverId', String(32)),
        Column('senderId', String(32)))

    messages.create(engine, checkfirst=True)

## Loading

In [13]:
df_users.to_sql('users', engine, if_exists='replace', dtype={
    'user_id': Integer, 
    'createdAt': DateTime, 
    'updatedAt': DateTime, 
    'city': String(32),
    'country': String(32),
    'domain': String(32),
    'age': Integer,
    'profile_gender': String(32),
    'profile_isSmoking': Boolean,
    'profile_profession': String(100),
    'profile_income': Float},index=False)


In [14]:
df_subscriptions.to_sql('subscriptions', engine, if_exists='replace', dtype={ 
    'createdAt': DateTime, 
    'startDate': DateTime,
    'endDate': DateTime,
    'status': String(32),
    'amount': Float,
    'user_id': Integer},index=True)
# change the name from index to subscription.id



In [15]:
df_messages.to_sql('messages', engine, if_exists='replace', dtype={
    'createdAt': DateTime, 
    'message_id': Integer, 
    'createdAt': DateTime, 
    'receiverId': String(32),
    'senderId': String(32)},index=False)
