In [23]:
import requests
import pandas as pd
import pandas.io.sql as sqlio

# Step 1 - Get the data

In [24]:
BASE_URL = "http://cf-data-engineer.glitch.me"

customers_df    = pd.DataFrame.from_records(requests.get(BASE_URL + "/customer").json())
lineitems_df    = pd.DataFrame.from_records(requests.get(BASE_URL + "/lineitem").json())
nations_df      = pd.DataFrame.from_records(requests.get(BASE_URL + "/nation").json())
orders_df       = pd.DataFrame.from_records(requests.get(BASE_URL + "/orders").json())
parts_df        = pd.DataFrame.from_records(requests.get(BASE_URL + "/part").json())
partsupp_df     = pd.DataFrame.from_records(requests.get(BASE_URL + "/partsupp").json())
regions_df      = pd.DataFrame.from_records(requests.get(BASE_URL + "/region").json())
suppliers_df    = pd.DataFrame.from_records(requests.get(BASE_URL + "/supplier").json())

# Step 2 - Schema design

A **Star Schema** is supposed to satisfy analysis goals, thus, before I design a star schema I need to identify what are the analysis goals.

Since analysis goals are not given (other than those in **Step 5 - Quiz Time**), I will make few assumptions before designing the star schema, first of all,lets assume that our main measure will be ```revenue``` and ```quantity``` and we want to analyze these measures with respect to ```customers```, ```suppliers```, ```time```, ```parts```, and ```order details```. 

Looking at the raw data that we've extracted from the API, we will find that our ```customer``` table & ```supplier``` table have **foreign key** that's referring to the ```nations``` table, since our goal is to design a star schema, not a snowflake schema, I will have to denormalize the customer and supplier tables. This means that I will include the details of the customer's nation & region in a single customer table and I will include the details of the supplier's nation & region in a single supplier table.

Now we have 5 dimension tables, and 1 fact table, each observation/row in the fact table ```lineitem_orders_fact``` will contain foreign keys that will refer to the 5 dimension ```customers, suppliers, date, parts, order details``` and this will enable us to conduct roll up and drill down analysis with respect to our dimensions, for example if we want to analyze revenue with respect to customer, we can achieve this, if we want to drill down and analyze with respect to customers at a specific region, we can do that and if we want to drill down further and analyze with respect to customers at a specific region and a specific nation, we can easily do that.

For the customer dimension table, I will **classify** the customers account balances into 3 groups by where customers who lie between the **0 - 0.25** quantiles are considered **"Low"** class, **0.25 - 0.75** are considered **"Medium"** class, **0.75 - 1** are considered **"High"** class

Finally, I ommitted few attributes that don't make sense for our analysis, for example, **the shipping_date**, **receipt_date**, **return_flag** in the ```lineitems``` data needs to be dropped since this will mean that we will have to wait for the shipment to arrive to our customer and record it's date before populating our star schema, which means that alot of the orders that were already placed will be delayed before being put in the data warehouse for analysis. Also the **comments** attribute in all tables don't make sense for our drill-down / roll-up analysis, thus they wont be considered. The **linestatus** in the ```lineitem``` data also doesn't make sense for our analysis as well, thus it will not be considered.

After all of the above considerations, the star schema is as following

[![Untitled-1.png](https://i.postimg.cc/sXHNs4hR/Untitled-1.png)](https://postimg.cc/ph8ZB8C0)

### Functions to create the star schema tables

In [25]:
# create date dimension table
def create_date_dim_table():
    def convert_date(x):
        splitted_x = x.split('-')
        d_datekey = x
        d_year = splitted_x[0]
        d_month = splitted_x[1]
        d_day = splitted_x[2]

        return pd.Series([d_datekey, d_year, d_month, d_day])

    date_df = pd.DataFrame(columns=['d_datekey', 'd_year', 'd_month', 'd_day'])

    date_df[["d_datekey", "d_year", "d_month", "d_day"]
            ] = orders_df["o_orderdate"].apply(convert_date).drop_duplicates()

    return date_df


# creates customer dimension table
def create_customer_dim_table():

    customer_dim_table = customers_df.join(nations_df, on="c_nationkey").join(regions_df, on="n_regionkey"). \
        drop(columns=["n_nationkey", "c_nationkey", "n_regionkey",
             "r_regionkey", "n_comment",  "r_comment", "c_comment"])

    customer_dim_table.rename(
        columns={"n_name": "c_nation", "r_name": "c_region"}, inplace=True)

    # classifying customers using quantiles based on their account balance
    customer_dim_table["c_accbal_class"] = pd.qcut(customers_df["c_acctbal"], q=[
                                                   "0", "0.25", "0.75", "1"], labels=["Low", "Medium", "High"])

    return customer_dim_table


# creates supplier dimension table
def create_supp_dim_table():
    supplier_dim_table = suppliers_df. \
        join(nations_df, on="s_nationkey"). \
        join(regions_df, on="n_regionkey"). \
        drop(columns=["n_nationkey", "s_nationkey", "n_regionkey",
             "r_regionkey", "n_comment",  "r_comment", "s_comment"])

    supplier_dim_table.rename(
        columns={"n_name": "s_nation", "r_name": "s_region"}, inplace=True)

    return supplier_dim_table


# creates part dimension table
def create_part_dim_table():
    return parts_df.drop(columns=["p_comment"])


# creates order dimension table
def create_orders_dim_table():
    return orders_df.drop(columns=["o_comment"])


# creates lineitem orders fact table
def create_lineorders_fact_table():
    lineorders_fact_table = lineitems_df. \
        join(orders_df.set_index("o_orderkey"), on="l_orderkey"). \
        drop(columns=[
            "l_comment",
            "l_shipdate",
            "l_commitdate",
            "l_receiptdate",
            "l_returnflag",
            "o_comment",
            "o_orderpriority",
            "o_shippriority",
            "o_clerk",
            "o_orderstatus",
            "l_linestatus",
            "o_totalprice"])

    lineorders_fact_table.rename(
        columns={"o_custkey": "l_custkey", "o_orderdate": "l_datekey"}, inplace=True)

    # computing the revenue
    lineorders_fact_table["l_revenue"] = lineorders_fact_table["l_extendedprice"] * (
        1 - lineorders_fact_table["l_discount"])

    return lineorders_fact_table


**NOTE:** In order to run the rest of the notebook, you'll have to ```docker-compose up``` the ```docker-compose.yaml``` to spin up the postgres database

### Populating the Star Schema (In POSTGRES Database)

In [26]:
from sqlalchemy import create_engine

# configuring the connection to postgres
engine = create_engine('postgresql://postgres:postgres@localhost:5432/datawarehouse')

# creating the star schema tables
lineitem_orders_fact = create_lineorders_fact_table()
dim_customer = create_customer_dim_table()
dim_supplier = create_supp_dim_table()
dim_order = create_orders_dim_table()
dim_part = create_part_dim_table()
dim_date = create_date_dim_table()

# loading the data in postgres
lineitem_orders_fact.to_sql("lineitem_orders_fact", con = engine, index = False)
dim_customer.to_sql("dim_customer", con = engine, index = False)
dim_supplier.to_sql("dim_supplier", con = engine, index = False)
dim_order.to_sql("dim_order", con = engine, index = False)
dim_date.to_sql("dim_date", con = engine, index = False)

# Step 3 - Orchestration

### Describe how you'd schedule this process to run multiple times per day

**Answer:** Using *CRONJOB* I can schedule the script to run multiple times per day where the data will be ingested from the API and loaded into the database, for example, lets assume that we want the process to run every 2 hours. using the **crontab configuration** shown below, we are able to achieve this.

[![crontab.png](https://i.postimg.cc/V5P86MkG/crontab.png)](https://postimg.cc/JHxgYsJZ)

### What about if the data comes from a stream, and arrives at random times?

**Answer:** In this case it makes more sense to use more advanced orchestration tools, Airflow is a good choice here, it's open source and very well developed. with Airflow we can create DAGS where we ensure that a specific transformation task doesn't run unless all the tasks that the transformation task depends on run first, with this we can ensure that even if data is coming in different times, a transformation is not triggered unless all the data have arrived successfully 

# Step 4 - Production

### Describe how you would deploy your code to production and allow for future maitenance

**Answer:** Deployment can take two paths, on-premise and in-cloud, let's assume that we want to deploy our code in cloud, AWS in this case. What we can do is first create an EC2 instance that will host our ETL script and a we can configure Airflow / CRONTAB to schedule our ETL. Also for maintenability, we will have to include the **logging** module in our ETL script and save our logs in a file, this will help us conduct root cause analysis on failures that have occured. Finally if there's any changes that we want to do to the ETL script, authorized users can SSH into the EC2 instance and apply those changes. 

So now we talked about the EC2 instance, but where will the data be loaded and how will it be accessible ?! For this, our ETL script will write all the output data (Star schema tables) into an S3 bucket (Data Lake), following that we will use **AWS Glue crawlers** to learn the schema's of the data stored in our data lake and save the schema's in an **AWS Glue Catalog**, now since we have all our schema's in **AWS Glue Catalog** we can create a redshift cluster and connect it to the AWS Glue Catalog using an external schema, by this our redshift cluster is able to query the data directly from the S3 Bucket (Data Lake), without the need to load the data in Redshift. This is cost effective since loading and keeping the data in S3 is way cost effective that loading and keeping the data in Redshift.

For maintanenance, we have access to AWS cloudwatch logs to analyze any abnormalities / issues.

Finally, here's the proposed architecture.

[![Blank-diagram-2.jpg](https://i.postimg.cc/Jhw99hcr/Blank-diagram-2.jpg)](https://postimg.cc/VJRGtmVp)

### Bonus point: What changes would you need to make to run this code in a containerized environment (e.g. Docker)?
**Answer:** First of all, I will change my ETL code from a *notebook* style to a *standalone app* style with modules and packages, Second, I will create a package called **workflow** that will contain the following 3 modules: **Extractor.py**, **Transformer.py** and **Loader.py**, the extractor module will contain code that performs the data ingestion/extraction from the API, the **Transformer** module will contain code that's responsible for transforming the data (creating the schema tables) and the **Loader** module will contain code related to the loading of the data in the OLAP database. Then I will have a **main.py** script that will be the entry point to the ETL process. 

Once I have set up the app directory as stated above, I will create a **requirements.txt** file where I will include all the dependencies required to run the ETL script and finally I will create a dockerfile to create a docker image as shown below

```
FROM python:3
	
COPY requirements.txt /tmp/
	
RUN pip install --no-cache-dir --upgrade pip &&\
	        pip install --requirement /tmp/requirements.txt
	
COPY . /app

CMD [ "python", "/app/main.py" ]
```

And then whenever we want to run the image, the etl script **(main.py)** will run automatically.

# Step 5 - Quiz time!



### Top 5 Nations By Revenue

In [27]:
top_5_nations_query = """
SELECT c.c_nation, SUM(l.l_revenue) AS "revenue"
FROM dim_customer AS c JOIN lineitem_orders_fact AS l ON c.c_custkey = l.l_custkey
GROUP BY c.c_nation
ORDER BY SUM(l.l_revenue) DESC
LIMIT 5;
"""

top_5_nations = sqlio.read_sql_query(top_5_nations_query, engine)
top_5_nations

Unnamed: 0,c_nation,revenue
0,CANADA,105337600.0
1,EGYPT,102254400.0
2,IRAN,100283500.0
3,BRAZIL,94333200.0
4,ALGERIA,93680680.0


### Most Common Shipmode Among Top 5 Nations

In [28]:
most_common_shipmode_query = """
with cte as (
    SELECT c.c_nation, l.l_shipmode, count(l.l_shipmode) AS "shipmode_count"
    FROM dim_customer as c JOIN lineitem_orders_fact as l ON c.c_custkey = l.l_custkey
    WHERE c.c_nation IN ('CANADA', 'EGYPT', 'IRAN', 'BRAZIL', 'ALGERIA')
    GROUP BY c.c_nation, l.l_shipmode
    ORDER BY c.c_nation DESC)

SELECT l_shipmode, SUM("shipmode_count") AS count
FROM cte
GROUP BY l_shipmode
ORDER BY SUM("shipmode_count") DESC
LIMIT 1;
"""

most_common_shipmode = sqlio.read_sql_query(most_common_shipmode_query, engine)
most_common_shipmode

Unnamed: 0,l_shipmode,count
0,FOB,2118.0


### Top 3 Months By Revenue

In [29]:
top_3_selling_months_query = """
SELECT d.d_month, SUM(l.l_revenue) AS "revenue"
FROM lineitem_orders_fact as l JOIN dim_date as d ON l.l_datekey = d.d_datekey
GROUP BY d.d_month
ORDER BY SUM(l.l_revenue) DESC
LIMIT 3;
"""

top_3_months = sqlio.read_sql_query(top_3_selling_months_query, engine)
top_3_months

Unnamed: 0,d_month,revenue
0,5,186911000.0
1,3,183781000.0
2,1,180816100.0


### Top Customers By Revenue

In [30]:
top_customers_query = """
SELECT c.c_custkey, SUM(l.l_revenue) AS revenue
FROM lineitem_orders_fact as l JOIN dim_customer as c ON l.l_custkey = c.c_custkey
GROUP BY c.c_custkey
ORDER BY SUM(l.l_revenue) DESC;
"""

top_customers = sqlio.read_sql_query(top_customers_query, engine)
top_customers

Unnamed: 0,c_custkey,revenue
0,1489,5.203674e+06
1,214,4.503704e+06
2,73,4.466381e+06
3,1246,4.465336e+06
4,1396,4.455382e+06
...,...,...
995,185,2.794881e+05
996,716,2.701052e+05
997,365,2.288314e+05
998,1091,1.627706e+05


### Financial Year to Year Revenue Analysis

In [31]:
financial_yty_query = """
WITH cte AS (
SELECT CASE WHEN 
	d.d_month::int BETWEEN 01 AND 06 THEN d.d_year::int ELSE d.d_year::int + 1 END as "FinancialYear", 
	l.l_revenue AS "revenue"
FROM dim_date as d JOIN lineitem_orders_fact as l on d.d_datekey = l.l_datekey
)

SELECT 
	"FinancialYear", 
	SUM("revenue") AS "Total_Year_Revenue", 
	LAG(SUM("revenue")) OVER (ORDER BY "FinancialYear") AS "Previous_Year_Revenue",
	(SUM("revenue") - LAG(SUM("revenue")) OVER (ORDER BY "FinancialYear")) / SUM("revenue") * 100 AS percent_change
FROM cte
GROUP BY "FinancialYear"
ORDER BY "FinancialYear";
"""

financial_year_to_year = sqlio.read_sql_query(financial_yty_query, engine)
financial_year_to_year

Unnamed: 0,FinancialYear,Total_Year_Revenue,Previous_Year_Revenue,percent_change
0,1992,158375800.0,,
1,1993,300374700.0,158375800.0,47.273927
2,1994,325919000.0,300374700.0,7.837616
3,1995,302219100.0,325919000.0,-7.841936
4,1996,309554200.0,302219100.0,2.369572
5,1997,318534400.0,309554200.0,2.819204
6,1998,302358900.0,318534400.0,-5.349764
7,1999,27798920.0,302358900.0,-987.664073


### BONUS

In [32]:
# api endpoint for money convert rate
BASE_URL = "https://cdn.moneyconvert.net/api/latest.json"

# api request to get the money rates (US Based)
MONEY_RATES = requests.get(BASE_URL).json()["rates"]

# Getting the AUD to EUR rate
AUD_TO_EUR = MONEY_RATES["EUR"] / MONEY_RATES["AUD"]

In [33]:
def transform_to_eur_base(df, *columns):
    """Takes a dataframe and transforms AUD based columns to EUR based

    Args:
        df (pd.DataFrame): The dataframe to be transformed
        *columns: list of columns to be transformed

    Returns:
        pd.DataFrame: pandas dataframe with transformed columns
    """    
    table = df.copy()  # to avoid modifying the passed df since it's mutable

    for col in columns:
        table[col] = table[col] * AUD_TO_EUR
    return table

In [34]:
top_5_nations_EUR = transform_to_eur_base(top_5_nations, "revenue")
top_5_nations_EUR

Unnamed: 0,c_nation,revenue
0,CANADA,71795220.0
1,EGYPT,69693810.0
2,IRAN,68350470.0
3,BRAZIL,64294940.0
4,ALGERIA,63850200.0


In [35]:
top_3_months_EUR = transform_to_eur_base(top_3_months, "revenue")
top_3_months_EUR

Unnamed: 0,d_month,revenue
0,5,127393400.0
1,3,125260100.0
2,1,123239400.0


In [36]:
top_customers_EUR = transform_to_eur_base(top_customers, "revenue")
top_customers_EUR

Unnamed: 0,c_custkey,revenue
0,1489,3.546683e+06
1,214,3.069602e+06
2,73,3.044164e+06
3,1246,3.043451e+06
4,1396,3.036667e+06
...,...,...
995,185,1.904915e+05
996,716,1.840963e+05
997,365,1.559653e+05
998,1091,1.109400e+05


In [37]:
financial_yty_eur_based = transform_to_eur_base(financial_year_to_year, "Total_Year_Revenue", "Previous_Year_Revenue")
financial_yty_eur_based

Unnamed: 0,FinancialYear,Total_Year_Revenue,Previous_Year_Revenue,percent_change
0,1992,107944600.0,,
1,1993,204727200.0,107944600.0,47.273927
2,1994,222137500.0,204727200.0,7.837616
3,1995,205984300.0,222137500.0,-7.841936
4,1996,210983700.0,205984300.0,2.369572
5,1997,217104400.0,210983700.0,2.819204
6,1998,206079600.0,217104400.0,-5.349764
7,1999,18946990.0,206079600.0,-987.664073
