# Data Engineer Assessment

## ETL

In [1]:
import pandas as pd
import warnings
import sqlalchemy
from sqlalchemy import create_engine
from termcolor import colored
warnings.filterwarnings('ignore')

In [2]:
pd.set_option('display.max_rows', None)
pd.set_option('display.max_columns', None)

## <font color="#34495E">Booking</font>

### Booking file extraction

In [3]:
booking_csv = pd.read_csv(r"booking.csv")

### Booking Analysis and Transformation

In [4]:
booking_csv.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 8481 entries, 0 to 8480
Data columns (total 8 columns):
 #   Column              Non-Null Count  Dtype 
---  ------              --------------  ----- 
 0   user_id             8481 non-null   int64 
 1   booking_id          8481 non-null   int64 
 2   created_at          8463 non-null   object
 3   status              8402 non-null   object
 4   checkin_status      8320 non-null   object
 5   booking_start_time  8452 non-null   object
 6   booking_end_time    8452 non-null   object
 7   is_demo             8471 non-null   object
dtypes: int64(2), object(6)
memory usage: 530.2+ KB


In [5]:
booking_csv.head(5)

Unnamed: 0,user_id,booking_id,created_at,status,checkin_status,booking_start_time,booking_end_time,is_demo
0,146,1,2022-10-19 05:39:41.448694 UTC,completed,checkedIn,2022-11-01 05:00:00.000000 UTC,2022-11-01 17:30:00.000000 UTC,False
1,146,2,2022-10-26 10:00:43.220322 UTC,completed,checkedIn,2022-11-02 05:00:00.000000 UTC,2022-11-02 17:30:00.000000 UTC,False
2,66,3,2022-11-07 06:11:54.407982 UTC,completed,checkedIn,2022-11-07 06:30:00.000000 UTC,2022-11-07 17:30:00.000000 UTC,False
3,286,4,2022-11-08 10:40:59.593669 UTC,editedByUser,checkInNotAvailable,2022-11-09 08:30:00.000000 UTC,2022-11-09 17:00:00.000000 UTC,False
4,153,5,2022-11-14 10:52:21.897090 UTC,cancelled,notCheckedIn,2022-11-14 05:00:00.000000 UTC,2022-11-14 17:30:00.000000 UTC,False


In [6]:
booking_csv.dtypes

user_id                int64
booking_id             int64
created_at            object
status                object
checkin_status        object
booking_start_time    object
booking_end_time      object
is_demo               object
dtype: object

Checking categorical variables 'status, 'checkin_status' and 'is_demo'

In [7]:
def list_unique_values(df, column_name):
    print(colored(pd.unique(df[[column_name]].values.ravel('K')), 'green'))

In [8]:
list_column = ['status','checkin_status','is_demo']

In [9]:
for column in list_column:
    list_unique_values(booking_csv, column)

[32m['completed' 'editedByUser' 'cancelled' 'accepted' 'running' nan][0m
[32m['checkedIn' 'checkInNotAvailable' 'notCheckedIn' 'readyForCheckIn'
 'readyForNonQRCheckIn' nan][0m
[32m['FALSE' '"FALSE"' '"TRUE"' nan][0m


<font color='#C70039'>'"FALSE"', '"TRUE" with double quotes is not good, right? Let's fix it later</font>

Checking NaN values

In [10]:
booking_csv.isna().sum()

user_id                 0
booking_id              0
created_at             18
status                 79
checkin_status        161
booking_start_time     29
booking_end_time       29
is_demo                10
dtype: int64

Drop NaN values

In [11]:
booking_csv = booking_csv.dropna()

Changing timestamp format and set null for wrong formats

In [12]:
def transform_date_UTC(df, column_name):
    df[column_name] = pd.to_datetime(df[column_name].str.replace(' UTC',''), errors='coerce')

In [13]:
list_column = ['created_at','booking_start_time','booking_end_time']

In [14]:
for column in list_column:
    transform_date_UTC(booking_csv, column)

Fixing True and False double quotes and seting to bool

In [15]:
def replace_quotes_and_set_boolean(df, column_name):
    df[column_name] = df[column_name].str.replace('"','')
    d = {'TRUE': True, 'FALSE': False}
    df[column_name] = df[column_name].map(d)

In [16]:
replace_quotes_and_set_boolean(booking_csv,'is_demo')

Columns data type adjustments 

In [17]:
def set_column_type(df, dict_):
    for key, value in dict_.items():
        df[key] = df[key].astype(value)

In [18]:
data_type_dict = {
    "user_id": "int64",
    "booking_id": "int64",
    "status": "str",
    "checkin_status": "str",
    "is_demo": "bool"
}

In [19]:
set_column_type(booking_csv, data_type_dict)

In [20]:
booking_csv.dtypes

user_id                        int64
booking_id                     int64
created_at            datetime64[ns]
status                        object
checkin_status                object
booking_start_time    datetime64[ns]
booking_end_time      datetime64[ns]
is_demo                         bool
dtype: object

Checking if exists duplicates ID

In [21]:
booking_csv['booking_id'].duplicated().any()

False

All good

In [22]:
list_column = ['status','checkin_status','is_demo']

In [23]:
for column in list_column:
    list_unique_values(booking_csv, column)

[32m['completed' 'editedByUser' 'cancelled' 'accepted' 'running'][0m
[32m['checkedIn' 'checkInNotAvailable' 'notCheckedIn' 'readyForCheckIn'
 'readyForNonQRCheckIn'][0m
[32m[False  True][0m


In [24]:
booking_csv.head(5)

Unnamed: 0,user_id,booking_id,created_at,status,checkin_status,booking_start_time,booking_end_time,is_demo
0,146,1,2022-10-19 05:39:41.448694,completed,checkedIn,2022-11-01 05:00:00,2022-11-01 17:30:00,False
1,146,2,2022-10-26 10:00:43.220322,completed,checkedIn,2022-11-02 05:00:00,2022-11-02 17:30:00,False
2,66,3,2022-11-07 06:11:54.407982,completed,checkedIn,2022-11-07 06:30:00,2022-11-07 17:30:00,False
3,286,4,2022-11-08 10:40:59.593669,editedByUser,checkInNotAvailable,2022-11-09 08:30:00,2022-11-09 17:00:00,False
4,153,5,2022-11-14 10:52:21.897090,cancelled,notCheckedIn,2022-11-14 05:00:00,2022-11-14 17:30:00,False


### Booking Load

In [25]:
%load_ext sql

In [26]:
%sql postgresql://postgres:1234@localhost:5432/etl_exercise

'Connected: postgres@etl_exercice'

In [27]:
engine = sqlalchemy.create_engine('postgresql://postgres:1234@localhost:5432/etl_exercise')

Incremental ou Full load ? (Business decision)

In [28]:
booking_csv.to_sql("booking", engine, if_exists="replace", index=False)

157

In [29]:
#booking_csv.to_sql("public.booking", engine, if_exists="append", index=False)

In [30]:
%sql SELECT user_id, booking_id, created_at, status, checkin_status, booking_start_time, booking_end_time, is_demo FROM booking Limit 5

 * postgresql://postgres:***@localhost:5432/etl_exercice
5 rows affected.


user_id,booking_id,created_at,status,checkin_status,booking_start_time,booking_end_time,is_demo
146,1,2022-10-19 05:39:41.448694,completed,checkedIn,2022-11-01 05:00:00,2022-11-01 17:30:00,False
146,2,2022-10-26 10:00:43.220322,completed,checkedIn,2022-11-02 05:00:00,2022-11-02 17:30:00,False
66,3,2022-11-07 06:11:54.407982,completed,checkedIn,2022-11-07 06:30:00,2022-11-07 17:30:00,False
286,4,2022-11-08 10:40:59.593669,editedByUser,checkInNotAvailable,2022-11-09 08:30:00,2022-11-09 17:00:00,False
153,5,2022-11-14 10:52:21.897090,cancelled,notCheckedIn,2022-11-14 05:00:00,2022-11-14 17:30:00,False


## <font color="#34495E">Company</font>

### Company Extraction

In [31]:
company_csv = pd.read_csv(r"company.csv")

### Company Analysis and Transformation

In [32]:
company_csv.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 7 entries, 0 to 6
Data columns (total 4 columns):
 #   Column        Non-Null Count  Dtype 
---  ------        --------------  ----- 
 0   company_id    7 non-null      int64 
 1   status        7 non-null      object
 2   created_at    7 non-null      object
 3   company_name  7 non-null      object
dtypes: int64(1), object(3)
memory usage: 352.0+ bytes


In [33]:
company_csv.head(5)

Unnamed: 0,company_id,status,created_at,company_name
0,4,trial,2022-07-08 19:39:00.188065 UTC,We love Deskbooking
1,6,trial,2023-01-05 15:40:48.551596 UTC,Deskbooking Studios
2,2,active,2022-07-08 19:38:56.327903 UTC,Flexbooking Partners
3,10,active,2022-07-08 19:38:54.245846 UTC,Prime Deskbooker
4,5,active,2022-11-17 08:19:28.838521 UTC,Deskbooking GmBH


In [34]:
company_csv.columns

Index(['company_id', 'status', 'created_at', 'company_name'], dtype='object')

In [35]:
list_column = ['status','company_name']

In [36]:
for column in list_column:
    list_unique_values(company_csv, column)

[32m['trial' 'active'][0m
[32m['We love Deskbooking' 'Deskbooking Studios' 'Flexbooking Partners'
 'Prime Deskbooker' 'Deskbooking GmBH' 'Bookery' 'BookerCentrics'][0m


Changing timestamp format and set null for wrong formats

In [37]:
transform_date_UTC(company_csv, 'created_at')

Checking NaN values

In [38]:
company_csv.isna().sum()

company_id      0
status          0
created_at      0
company_name    0
dtype: int64

Drop NaN values

In [39]:
company_csv = company_csv.dropna()

In [40]:
company_csv.dtypes

company_id               int64
status                  object
created_at      datetime64[ns]
company_name            object
dtype: object

In [41]:
company_csv['company_id'].duplicated().any()

False

In [42]:
company_csv.head(5)

Unnamed: 0,company_id,status,created_at,company_name
0,4,trial,2022-07-08 19:39:00.188065,We love Deskbooking
1,6,trial,2023-01-05 15:40:48.551596,Deskbooking Studios
2,2,active,2022-07-08 19:38:56.327903,Flexbooking Partners
3,10,active,2022-07-08 19:38:54.245846,Prime Deskbooker
4,5,active,2022-11-17 08:19:28.838521,Deskbooking GmBH


### Company Load

In [43]:
company_csv.to_sql("company", engine, if_exists="replace", index=False)

7

## <font color="#34495E">Users</font>

### Users Extraction

In [44]:
users_csv = pd.read_csv(r"users.csv")

### Users Analysis and Transformation

In [45]:
users_csv.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1189 entries, 0 to 1188
Data columns (total 5 columns):
 #   Column      Non-Null Count  Dtype 
---  ------      --------------  ----- 
 0   rn          1189 non-null   int64 
 1   created_at  1165 non-null   object
 2   company_id  1189 non-null   int64 
 3   status      1180 non-null   object
 4   demo_user   1179 non-null   object
dtypes: int64(2), object(3)
memory usage: 46.6+ KB


In [46]:
users_csv.columns

Index(['rn', 'created_at', 'company_id', 'status', 'demo_user'], dtype='object')

In [47]:
users_csv.head(5)

Unnamed: 0,rn,created_at,company_id,status,demo_user
0,103,,2,active,False
1,129,,2,active,False
2,148,,2,active,False
3,258,,3,active,False
4,271,,4,active,False


In [48]:
list_column = ['status','demo_user']

In [49]:
for column in list_column:
    list_unique_values(users_csv, column)

[32m['active' 'inactive' 'deleted' nan][0m
[32m['FALSE' nan 'TRUE' '"FALSE"' '"TRUE"'][0m


Changing timestamp format and set null for wrong formats

In [50]:
transform_date_UTC(users_csv, 'created_at')

Checking NaN values

In [51]:
users_csv.isna().sum()

rn             0
created_at    24
company_id     0
status         9
demo_user     10
dtype: int64

Drop NaN values

In [52]:
users_csv = users_csv.dropna()

Fixing True and False double quotes and seting to bool

In [53]:
replace_quotes_and_set_boolean(users_csv,'demo_user')

In [54]:
for column in list_column:
    list_unique_values(users_csv, column)

[32m['inactive' 'active' 'deleted'][0m
[32m[False  True][0m


Checking duplicates ID

In [55]:
users_csv['rn'].duplicated().any()

True

<font color='red'>oh boy ! Duplicate IDs can be a problem, we will identify and choose with the latest date created_at</font>

In [56]:
duplicates_df = pd.concat(g for _, g in users_csv.groupby("rn") if len(g) > 1)

In [57]:
duplicates_df

Unnamed: 0,rn,created_at,company_id,status,demo_user
377,310,2022-11-01 15:48:00.794563,10,inactive,False
378,310,2022-11-01 15:48:00.794563,10,active,True
379,310,2023-03-09 10:54:54.885050,10,active,False
380,310,2022-11-01 15:48:00.794563,10,deleted,False


In [58]:
to_keep = duplicates_df.sort_values('created_at').drop_duplicates('rn',keep='last')

In [59]:
to_keep

Unnamed: 0,rn,created_at,company_id,status,demo_user
379,310,2023-03-09 10:54:54.885050,10,active,False


In [60]:
to_remove = duplicates_df[~duplicates_df.isin(to_keep)].dropna()

In [61]:
to_remove

Unnamed: 0,rn,created_at,company_id,status,demo_user
377,310.0,2022-11-01 15:48:00.794563,10.0,inactive,False
378,310.0,2022-11-01 15:48:00.794563,10.0,active,True
380,310.0,2022-11-01 15:48:00.794563,10.0,deleted,False


In [62]:
users_csv = users_csv[~users_csv.isin(to_remove)].dropna()

In [63]:
users_csv['rn'].duplicated().any()

False

all good now !

In [64]:
users_csv.dtypes

rn                   float64
created_at    datetime64[ns]
company_id           float64
status                object
demo_user             object
dtype: object

changing types

In [65]:
data_type_dict = {
    "rn": "int64",
    "company_id": "int64",
    "demo_user": "bool"
}

In [66]:
set_column_type(users_csv, data_type_dict)

In [67]:
users_csv.dtypes

rn                     int64
created_at    datetime64[ns]
company_id             int64
status                object
demo_user               bool
dtype: object

In [68]:
users_csv.head(5)

Unnamed: 0,rn,created_at,company_id,status,demo_user
6,466,2022-12-07 13:37:03.970821,5,inactive,False
7,496,2022-12-07 13:37:06.472852,5,inactive,False
8,559,2022-12-07 13:39:23.152891,5,inactive,False
9,578,2022-12-07 13:39:24.922548,5,active,False
10,629,2022-12-07 13:39:28.860572,5,inactive,False


### User Load

In [69]:
users_csv.to_sql("users", engine, if_exists="replace", index=False)

143

### Once the data model is build, please answer the following questions using your data model:

<font color='#C70039'>1. What is the monthly count of unique Users (headcount) who have made a booking for the last 6 months?</font>

In [70]:
pd.read_sql("SELECT DATE_TRUNC('month', created_at) AS month, COUNT(DISTINCT user_id) AS headcount \
            FROM public.booking \
            WHERE created_at >= NOW() - INTERVAL '6 months' \
            GROUP BY DATE_TRUNC('month', created_at) \
            ORDER BY month",engine)

Unnamed: 0,month,headcount
0,2023-01-01,92
1,2023-02-01,180
2,2023-03-01,199
3,2023-04-01,197
4,2023-05-01,136
5,2024-02-01,1


<font color='#C70039'>2. How many users need more than 30 days to make their first booking, and from which company are those users ?</font>

In [71]:
pd.read_sql("SELECT c.company_name, COUNT(*) AS user_count FROM users u \
            JOIN booking b ON u.rn = b.user_id \
            JOIN company c ON u.rn = c.company_id \
            WHERE b.created_at > (u.created_at + INTERVAL '30 days') \
            GROUP BY c.company_name",engine)

Unnamed: 0,company_name,user_count
0,Deskbooking GmBH,1
1,We love Deskbooking,48
2,Prime Deskbooker,49
3,Bookery,64
4,Deskbooking Studios,30
5,Flexbooking Partners,1


<font color='#C70039'>3. What is the daily 7 day rolling total booking amount for March 2023?</font>

In [72]:
pd.read_sql("SELECT booking_date, sum(qtd) OVER ( \
                ORDER BY date_trunc('day', booking_date) \
                ROWS BETWEEN 6 PRECEDING AND CURRENT ROW) AS rolling_total_booking_count \
            FROM (SELECT \
                    date_trunc('day', created_at) AS booking_date, \
                    count(*) as qtd \
                  FROM PUBLIC.booking \
                  WHERE created_at >= '2023-03-01'::date AND created_at < '2023-04-01'::date    \
                  GROUP BY date_trunc('day', created_at) \
                  ORDER BY date_trunc('day', created_at)) as tmp",engine)

Unnamed: 0,booking_date,rolling_total_booking_count
0,2023-03-01,45.0
1,2023-03-02,97.0
2,2023-03-03,150.0
3,2023-03-04,156.0
4,2023-03-05,183.0
5,2023-03-06,256.0
6,2023-03-07,333.0
7,2023-03-08,347.0
8,2023-03-09,344.0
9,2023-03-10,357.0


### Lastly, also answer the following questions:
- Which principles should you follow when building pipelines, and are there any
anti-patterns you would avoid? How would you improve your current solution further, if
you had more time?

#### Anti-Patterns to Avoid:

* Non-monitoring of pipeline quality and performance
* Not doing good documentation
* Neglecting proper error handling
* Not making a testing strategy
* Not follow security business rules
* Limited Scalability plan
* Not break the pipeline down into smaller, independent components to maintain, test, and debug

#### Improvments

* Dynamically select files
* Moving files to transformation and staging areas
* Use parallelism for better performance Ex. Hadoop Spark or cloud solutions and Linux :)
* Create test and production pipeline
* Error Handling (send e-mail or log to dashboard)
* Version Control
* Put job in scheduler like crontab, oozie, airflow etc