# Setup

Make sure to follow the **[setup instructions here before starting](https://github.com/josephmachado/de_project?tab=readme-ov-file#option-1-github-codespaces-recommended)**.

# Note: The recommended readings mentioned in this workshop(& more) will be covered in detail as part of my Data Engineering Hands-on Workshop, **[sign up here](https://astounding-architect-5764.ck.page/684e1f422f)**

# Live Workshop Link:

[![Live workshop](https://img.youtube.com/vi/bfiOLwp1aWM/0.jpg)](https://www.youtube.com/live/bfiOLwp1aWM)


# Introduction

There are a lot of data projects available on the web (e.g., **[my list of data eng projects](https://www.startdataengineering.com/post/data-engineering-projects/)**). While these projects are great, starting from scratch to build your data project can be challenging. If you are 

> Wondering how to go from an idea to a production-ready data pipeline

> Feeling overwhelmed by how all the parts of a data system fit together

> Unsure that the pipelines you build are up to industry-standard

If so, this post is for you! In it, we will go over how to build a data project step-by-step from scratch.

By the end of this post, you will be able to quickly create data projects for any use case and see how the different parts of data systems work together. 


# Parts of data project

Most data engineering tool falls into one of the parts shown below (as explained in this [post](https://www.startdataengineering.com/post/parts-of-dataengineering/))

![Data tools](./assets/images/data-tools.png)

In this post, we will review the parts of a data project and select tools to build a data pipeline. While we chose TPCH data for this project, anyone can choose any data set they find interesting and follow the below steps to quickly build their data pipeline.

### **Recommended reading**: **[What are the key parts of data engineering](https://www.startdataengineering.com/post/parts-of-dataengineering/)**

# Requirements

The first step before you start should be defining precise requirements. Please work with the end users to define them (or define them yourself for side projects).

We will go over a few key requirements below. 

### **Recommended reading**: **[this post that goes over how to gather requirements for data projects in detail!](https://www.startdataengineering.com/post/n-questions-data-pipeline-req/)**.



## Understand input datasets available

Let's assume we are working with a car part seller database (tpch). The data is available in a duckdb database. See the data model below:

![TPCH data model](./assets/images/tpch_erd.png)

We can create fake input data using the [create_input_data.py](https://github.com/josephmachado/de_project/blob/main/setup/create_input_data.py) as shown below:

In [3]:
! python ./setup/create_input_data.py

Cleaning up tpch and metadata db files
Creating TPCH input data
Creating metadata table


TPCH data is well modeled and because of this its easy to work with. However this is not always the case, in most real life projects you'd need to 

1. Identify grain(aka what one row in a table corresponds to) of the input data. At times there may be tables with multiple grains.
2. Identify the business process that generates the data. This will dictate how you can actually extract input datasets. For this you'll need to create **[conceptual & logical data models](https://www.thoughtspot.com/data-trends/data-modeling/conceptual-vs-logical-vs-physical-data-models)**.

**Exercise** what is the relationship between the `orders` table and `customer` table.

a. 1 to many

b. many to 1

c. many to many

d. 1 to 1

In [2]:
# TODO: Write your choice (a, b, c, d) here
answer = 'b'

**Exercise**: what the relationship between the `customer` table and the `orders` table?

a. 1 to many

b. many to 1

c. many to many

d. 1 to 1


In [4]:
# TODO: Write your choice (a, b, c, d) here
answer = 'a'

## Define what the output dataset will look like

Let's assume that the `customer` team has asked us to create a dataset that they will use for outreach (think cold emails, calls, etc.). 

Upon discussion with the `customer` team, you discover that the output dataset requires the following columns:

For each customer (i.e., one row per customer)

1. **customer_key**: The unique identifier for the customer 
2. **customer_name**: The customer name
3. **min_order_value**: The value of the order with the lowest value placed by this customer
4. **max_order_value**: The value of the order with the highest value placed by this customer
5. **avg_order_value**: The average value of all the orders placed by this customer  
6. **avg_num_items_per_order**: The average number of items per order placed by this customer

Let's write a simple query to see how we can get this (note that this process will take much longer with badly modeled input data).

In [5]:
# simple query to get the output dataset
import duckdb
con = duckdb.connect("tpch.db")
con.sql("""
WITH order_items AS (
    SELECT
        l_orderkey,
        COUNT(*) AS item_count
    FROM
        lineitem
    GROUP BY
        l_orderkey
),
customer_orders AS (
    SELECT
        o.o_custkey,
        o.o_orderkey,
        o.o_totalprice,
        oi.item_count
    FROM
        orders o
    JOIN
        order_items oi ON o.o_orderkey = oi.l_orderkey
)
SELECT
    c.c_custkey AS customer_key,
    c.c_name AS customer_name,
    MIN(co.o_totalprice) AS min_order_value,
    MAX(co.o_totalprice) AS max_order_value,
    AVG(co.o_totalprice) AS avg_order_value,
    AVG(co.item_count) AS avg_num_items_per_order
FROM
    customer c
JOIN
    customer_orders co ON c.c_custkey = co.o_custkey
GROUP BY
    c.c_custkey, c.c_name;
""")

┌──────────────┬────────────────────┬─────────────────┬─────────────────┬────────────────────┬─────────────────────────┐
│ customer_key │   customer_name    │ min_order_value │ max_order_value │  avg_order_value   │ avg_num_items_per_order │
│    int64     │      varchar       │  decimal(15,2)  │  decimal(15,2)  │       double       │         double          │
├──────────────┼────────────────────┼─────────────────┼─────────────────┼────────────────────┼─────────────────────────┤
│          445 │ Customer#000000445 │        59180.35 │       280273.39 │ 144249.92470588235 │      4.0588235294117645 │
│          557 │ Customer#000000557 │         4010.45 │       302534.45 │ 123190.81181818181 │       3.272727272727273 │
│          611 │ Customer#000000611 │        37857.53 │       226132.40 │  95871.07363636364 │       3.272727272727273 │
│          818 │ Customer#000000818 │        23233.65 │       326565.37 │ 176473.52384615384 │       4.230769230769231 │
│          322 │ Customer#000000

In [14]:
con.commit()
con.close() 

## Define SLAs so stakeholders know what to expect

SLAs stand for service level agreement. SLAs define what end-users can expect from your service(data pipeline, in our case). While there are multiple ways to define SLAs, the common ones for data systems are:

1. `Data freshness`
2. `Data accuracy`

Let's assume that our stakeholders require the data to be no older than 12 hours. This means that your pipeline should run completely at least once every 12 hours. If we assume that the pipeline runs in 2 hours, we need to ensure that it is run at least every 10 hours so that the data is not older than 12 hours at any given time.

For data accuracy, we should define what accurate data is. Let's define accuracy in the following section.

## Define checks to ensure the output dataset is usable

We need to ensure that the data we produce is good enough for end-users to use. Typically, the data team works with end users to identify the critical metrics to check. 

Let's assume we have the following checks to ensure that the output dataset is accurate:

1. **customer_key**: Has to be unique and not null
2. **avg_***: columns should not differ by more than 5% compared to prior runs (across all customers)

### **Recommended reading**: **[Types of data quality checks](https://www.startdataengineering.com/post/types-of-dq-checks/)** & **[Implementing data quality checks with Great Expectations](https://www.startdataengineering.com/post/implement_data_quality_with_great_expectations/)**



**Exercise** What other (1 main) check can you think of for the output dataset?

# Identify what tool to use to process data

We have a plethora of tools to process data, including Apache Spark, Snowflake, Python, Polars, and DuckDB. We will use Polars to process our data because it is small. The Polars library is easy to install and use.

### **Recommended reading**: **[Choosing tools for your data project](https://www.startdataengineering.com/post/choose-tools-dp/#41-requirement-x-component-framework)**

# Data flow architecture

Most data teams have their version of the 3-hop architecture. For example, dbt has its own version (stage, intermediate, mart), and Spark has medallion (bronze, silver, gold) architecture.

You may be wondering why we need this data flow architecture when we have **[the results easily with a simple query shown here](./setup-data-project.ipynb#Columns-and-metics-needed-in-the-dataset-produced)**. 

While this is a simple example, in most real-world projects you want to have a standard, cleaned and modelled dataset(bronze) that can be use to create specialized dataset for end-users(gold). See below for how our data will flow:

![Data Flow](./assets/images/dep-arch.png)

### **Recommended reading**: **[Multi-hop architecture](https://www.startdataengineering.com/post/de_best_practices/#31-use-standard-patterns-that-progressively-transform-your-data)** 

## Bronze: Extract raw data and confine it to standard names and data types 

Since our dataset has data from customer, nation, region, order, and lineitem input datasets, we will bring those data into bronze tables. We will keep their names the same as the input datasets.

Let's explore the input datasets and create our bronze datasets.

In [15]:
# read customer, order, and lineitem dataset from duckdb into Polars dataframe
import duckdb
import polars as pl

con = duckdb.connect("tpch.db")
customer_df = con.sql("select * from customer").pl()
orders_df = con.sql("select * from orders").pl()
lineitem_df = con.sql("select * from lineitem").pl()
nation_df = con.sql("select * from nation").pl()
region_df = con.sql("select * from region").pl()

con.close() #close DuckDB connection

# remove c_ and then rename custkey to customer_key
cleaned_customer_df = customer_df.rename(lambda col_name: col_name[2:]).rename({"custkey": "customer_key"})

# remove the n_ and r_ from the nation and region table's column names
cleaned_nation_df = nation_df.rename(lambda col_name: col_name[2:])
cleaned_region_df = region_df.rename(lambda col_name: col_name[2:])


**Exercise:**
Write code (similar to above)
1. Remove the o_ and l_ from the order and lineitem table's column names
2. We also rename customer key and order key to customer_key and order_key respectively

In [16]:
# your answer

## Silver: Model data for analytics

In the silver layer, the datasets are modeled using one of the popular styles (e.g., Kimball, Data Vault, etc.). We will use Kimball's dimensional model, as it is the most commonly used one and can account for many use cases.

### Data modeling

We will create the following datasets

1. **dim_customer**: A customer level table with all the necessary attributes of a customer. We will join nation and region data to the `cleaned_customer_df` to get all the attributes associated with a customer.
2. **fct_orders**: An order level fact(an event that happened) table. This will be the same as `cleaned_orders_df` since the `orders` table has all the necessary details about the order and how it associates with dimension tables like `customer_key`.
3. **fct_lineitem**: A lineitem (items that are part of an order) fact table. This table will be the same as `cleaned_lineitem_df` since the `lineitem` table has all the lineitem level details and keys to associate with dimension tables like `partkey` and `suppkey`.

### **Recommended reading**: **[Data warehouse overview](https://www.startdataengineering.com/post/what-is-a-data-warehouse/#3-what-is-a-data-warehouse)**

In [18]:
# create customer dimension by left-joining all the necessary data
dim_customer = cleaned_customer_df\
.join(cleaned_nation_df, on="nationkey", how="left", suffix="_nation")\
.join(cleaned_region_df, on="regionkey", how="left", suffix="_region")\
.rename({
    "name_nation": "nation_name",
    "name_region": "region_name",
    "comment_nation": "nation_comment",
    "comment_region": "region_comment"
})

# Most fact tables are direct data from the app
fct_orders = cleaned_orders_df
fct_lineitem = cleaned_lineitem_df


## Gold: Create tables for end-users

The gold layer contains datasets required for the end user. The user-required datasets are fact tables joined with dimension tables aggregated to the necessary grain. In real-world projects, multiple teams/users ask for datasets with differing grains from the same underlying fact and dimension tables. While you can join the necessary tables and aggregate them individually for each ask, it leads to repeated code and joins.

To avoid this issue, companies typically do the following:

1. **OBT**: This is a fact table with multiple dimension tables left joined with it.
2. **pre-aggregated table**: The OBT table rolled up to the end user/team requested grain. The pre-aggregated dataset will be the dataset that the end user accesses. By providing the end user with the exact columns they need, we can ensure that all the metrics are in one place and issues due to incorrect metric calculations by end users are significantly reduced. These tables act as our end-users SOT (source of truth). 

### OBT: Join the fact table with all its dimensions

In our example, we have two fact tables, `fct_orders` and `fct_lineitem`. Since we only have one dimension, `dim_customer,` we can join `fct_orders` and `dim_customer` to create `wide_orders`. For our use case, we can keep `fct_lineitem` as `wide_lineitem`.

That said, we can easily see a case where we might need to join `parts` and `supplier` data with `fct_lineitem` to get `wide_lineitem`. But since our use case doesn't require this, we can skip it!

Let's create our OBT tables

In [19]:
# create wide_orders table
wide_orders = fct_orders.join(dim_customer, on="customer_key", how="left")

# For our use case, we don't need more information at a lineitem level
wide_lineitem = fct_lineitem

**Exercise**: Assume that you have to create a `wide_lineitem` table with all the dimensions at its respective grain. What other tables will you left join with `fct_lineitem` table?


In [20]:
# your answer here, all lower case, use tables names from the data model
tables_to_left_join = []

### Pre-aggregated tables: Aggregate OBTs to stakeholder-specific grain

According to our **[data requirements](./setup-data-project.ipynb#Columns-and-metics-needed-in-the-dataset-produced)**, we need data from customer, orders, and lineitem. Since we already have customer and order data in `wide_orders`, we can join that with `wide_lineitem` to get the necessary data.

We can call the final dataset `customer_outreach_metrics` (read **[this article that discusses the importance of naming](https://docs.getdbt.com/blog/on-the-importance-of-naming)**).

Let's create our final dataset in Python 

In [21]:
# create customer_outreach_metrics

# get the number of lineitems per order
order_lineitem_metrics = wide_lineitem.group_by(pl.col("order_key")).agg(pl.col("linenumber").count().alias("num_lineitems"))
# join the above df with wide_orders and group by customer key in wide orders to get avg, min, max order value & avg num items per order
customer_outreach_metrics = wide_orders\
.join(order_lineitem_metrics, on="order_key", how="left")\
.group_by(
    pl.col("customer_key"), 
    pl.col("name").alias("customer_name"))\
.agg(
    pl.min("totalprice").alias("min_order_value"),
    pl.max("totalprice").alias("max_order_value"),
    pl.mean("totalprice").alias("avg_order_value"),
    pl.mean("num_lineitems").alias("avg_num_items_per_order"),
)

In [22]:
customer_outreach_metrics.limit(2)

customer_key,customer_name,min_order_value,max_order_value,avg_order_value,avg_num_items_per_order
i64,str,"decimal[15,2]","decimal[15,2]","decimal[15,2]",f64
1478,"""Customer#000001478""",45943.93,320623.44,,4.6
416,"""Customer#000000416""",53224.01,279221.38,,4.714286


**Question**: Why is `avg_order_value` all `null`?

In [23]:
# your answer
answer = None

# Data quality and code testing

## Data quality implementation

As part of our requirements, we saw that the output dataset needs to have 
1. Unique and distinct `customer_key`
2. Variance in `avg_*` columns between runs should not be more than 5% (across all customers)

While the first test is a simple check, the second one requires that we use the data from previous runs and compare it with the current run's data or store the sum(avg_*) of each run. Let's store the run-level metrics in a run_metadata table (in sqlite3).

Our pipelines should run data quality checks before making the data available to your end users. This ensures that you can catch any issues before they can cause damage.

![WAP pattern](./assets/images/wap.png)

### **Recommended reading**: **[Types of data quality checks](https://www.startdataengineering.com/post/types-of-dq-checks/)**, **[Implementing data quality checks with Great Expectations](https://www.startdataengineering.com/post/implement_data_quality_with_great_expectations/)**, & **[Write-Audit-Publish pattern](https://www.startdataengineering.com/post/de_best_practices/#32-ensure-data-is-valid-before-exposing-it-to-its-consumers-aka-data-quality-checks)**

Let's see how we can implement DQ checks in a Python

In [25]:
import json

# get current run's metrics
curr_metrics = json.loads(
    customer_outreach_metrics\
    .select(
        pl.col("avg_num_items_per_order").alias("sum_avg_num_items_per_order"),
        pl.col("avg_order_value").alias("sum_avg_order_value")
    )\
    .sum()\
    .write_json())[0]



In [26]:
# Store run metadata in a table
import sqlite3

# Connect to SQLite database
conn = sqlite3.connect('metadata.db')

# Create a cursor object
cursor = conn.cursor()

# Insert data into the run_metadata table
cursor.execute('''
    INSERT INTO run_metadata (run_id, metadata)
    VALUES (?, ?)
''', ('2024-09-15-10-00', json.dumps(curr_metrics)))

# Commit the changes and close the connection
conn.commit()
conn.close()

In [27]:
# Assume that another run of the data pipeline has been completed

# Get the most recent data from the run_metadata table
import sqlite3

# Connect to SQLite database
conn = sqlite3.connect('metadata.db')

# Create a cursor object
cursor = conn.cursor()

# Fetch the most recent row based on run_id
cursor.execute('''
    SELECT * FROM run_metadata
    ORDER BY run_id DESC
    LIMIT 1
''')

# Get the result
most_recent_row = cursor.fetchone()

# Close the connection
conn.close()

In [28]:
# get the most recent metric
prev_metric = json.loads(most_recent_row[1])

# get current metric
# This assumes the pipeline is rerun
curr_metric = json.loads(
    customer_outreach_metrics\
    .select(
        pl.col("avg_num_items_per_order").alias("sum_avg_num_items_per_order"),
        pl.col("avg_order_value").cast(int).alias("sum_avg_order_value")
    )\
    .sum()\
    .write_json())[0]

# Compare with current data for variance percentage
def percentage_difference(val1, val2):
    if val1 == 0 and val2 == 0:
        return 0.0
    elif val1 == 0 or val2 == 0:
        return 100.0
    return abs((val1 - val2) / ((val1 + val2) / 2)) * 100

prev_metric['sum_avg_order_value'] = int(float(prev_metric['sum_avg_order_value']))

comparison = {}
for key in curr_metric:
    if key in prev_metric:
        comparison[key] = percentage_difference(curr_metric[key], prev_metric[key])

if prev_metric is None:
    print('No prev metric')
    
# code to check if variance < 5
for k, v in comparison.items():
    if v >= 5:
        raise Exception(f"Difference for {k} is greater than 5%: {v}%")

In [29]:
# Insert current run data into the run_metadata table
# Store run metadata in a table
import sqlite3
from datetime import datetime

# Get current timestamp and format it
current_timestamp = datetime.now().strftime('%Y-%m-%d-%H-%M')

# Connect to SQLite database
conn = sqlite3.connect('metadata.db')

# Create a cursor object
cursor = conn.cursor()
comparison_json = json.dumps(comparison)

# Insert data into the run_metadata table
cursor.execute('''
    INSERT INTO run_metadata (run_id, metadata)
    VALUES (?, ?)
''', (current_timestamp, comparison_json))

# Commit the changes and close the connection
conn.commit()
conn.close()

**Exercise:** There is a deliberate mistake in the above cell, what is it?

**Hint** Is the code actually doing what it is supposed to do?

Your answer here

# Code organization

Deciding how to organize your code can be overwhelming. Typically, companies use one of the following options to organize code:

1. Based on multi-hop architecture. E.g. **[see this dbt folder structure](https://github.com/dbt-labs/jaffle_shop_duckdb/tree/duckdb/models)**
2. Based on existing company standards.

### Folder structure

We can use the following folder structure for our use case(and most real-life projects).

![Folder structure](./assets/images/folder.png)

Each file under the elt folder will have the code necessary to generate that dataset. The above folder structure enables anyone new to the project to quickly understand where the code to create a certain dataset will be.

Now compare this with dbt's recommended project structure:

![dbt folder structure](./assets/images/dbtps.png)

*Note* dbt recommends use of its `semantic layer` over pre-aggregated layer. The `semantic layer` involves aggregation with every query(approx) which can lead to skyrocketing costs.

### Code modularity

We have the code to create the necessary tables; now, we have to put them into functions that are easy to use and maintain. 

#### **Recommended reading**: **[How to write modular python code](https://www.startdataengineering.com/post/code-patterns/#1-functional-design)**

We will define the function `create_dataset` for each table in the Python script for our use case. Having a common named function will enable

1. Consistent naming. For example: `dim_customer.create_dataset`, `customer_outreach_metrics.create_dataset`
2. Pull out code commonalities into a base class. Moving code into a common base class will be covered in a future post.

Let's see what functions we would want to include in the `de_project/etl/gold/pre-aggregated/customer_outreach_metrics.py.`

**Note** We have moved code that involves reading/writing to metadata into **[de_project/utils/metadata.py](https://github.com/josephmachado/de_project/blob/main/de_project/utils/metadata.py)**.


In [30]:
# de_project/utils/metadata.py
import json

import sqlite3
from datetime import datetime

def get_latest_run_metrics():
    # Connect to SQLite database
    conn = sqlite3.connect("metadata.db")

    # Create a cursor object
    cursor = conn.cursor()

    # Fetch the most recent row based on run_id
    cursor.execute(
        """
        SELECT * FROM run_metadata
        ORDER BY run_id DESC
        LIMIT 1
    """
    )

    # Get the result
    most_recent_row = cursor.fetchone()

    # Close the connection
    conn.close()
    return (
        json.loads(most_recent_row[1])
        if most_recent_row and len(most_recent_row) > 0
        else None
    )


def insert_run_metrics(curr_metrics):
    # Connect to SQLite database
    conn = sqlite3.connect("metadata.db")

    # Create a cursor object
    cursor = conn.cursor()
    curr_metrics_json = json.dumps(curr_metrics)

    current_timestamp = datetime.now().strftime('%Y-%m-%d-%H-%M')
    # Insert data into the run_metadata table
    cursor.execute(
        """
        INSERT INTO run_metadata (run_id, metadata)
        VALUES (?, ?)
    """,
        (current_timestamp, curr_metrics_json),
    )

    # Commit the changes and close the connection
    conn.commit()
    conn.close()

In [31]:
# de_project/etl/gold/pre-aggregated/customer_outreach_metrics.py
import json
import polars as pl

def create_dataset(wide_lineitem, wide_orders):
    order_lineitem_metrics = wide_lineitem.group_by(pl.col("order_key")).agg(
        pl.col("linenumber").count().alias("num_lineitems")
    )
    return (
        wide_orders.join(order_lineitem_metrics, on="order_key", how="left")
        .group_by(pl.col("customer_key"), pl.col("name").alias("customer_name"))
        .agg(
            pl.min("totalprice").alias("min_order_value"),
            pl.max("totalprice").alias("max_order_value"),
            pl.mean("totalprice").alias("avg_order_value"),
            pl.mean("num_lineitems").alias("avg_num_items_per_order"),
        )
    )


def percentage_difference(val1, val2):
    if val1 == 0 and val2 == 0:
        return 0.0
    elif val1 == 0 or val2 == 0:
        return 100.0
    return abs((val1 - val2) / ((val1 + val2) / 2)) * 100


def check_no_duplicates(customer_outreach_metrics_df):
    # check uniqueness
    if (
        customer_outreach_metrics_df.filter(
            customer_outreach_metrics_df.select(pl.col("customer_key")).is_duplicated()
        ).shape[0]
        > 0
    ):
        raise Exception("Duplicate customer_keys")


def check_variance(customer_outreach_metrics_df, perc_threshold=5):
    prev_metric = get_latest_run_metrics()
    if prev_metric is None:
        return
    prev_metric['sum_avg_order_value'] = int(float(prev_metric['sum_avg_order_value']))
    curr_metric = json.loads(
        customer_outreach_metrics_df.select(
            pl.col("avg_num_items_per_order").alias("sum_avg_num_items_per_order"),
            pl.col("avg_order_value").cast(int).alias("sum_avg_order_value"),
        )
        .sum()
        .write_json()
    )[0]
    comparison = {}
    for key in curr_metric:
        if key in prev_metric:
            comparison[key] = percentage_difference(curr_metric[key], prev_metric[key])

    for k, v in comparison.items():
        if v >= perc_threshold:
            raise Exception(f"Difference for {k} is greater than 5%: {v}%")


def validate_dataset(customer_outreach_metrics_df):
    # data quality checks
    check_no_duplicates(customer_outreach_metrics_df)
    check_variance(customer_outreach_metrics_df)

Notice how we keep the functions performing one task and how the function name is `verb_noun`.

**Exercise**: How would you improve the `check_variance` method?


Your answer here

## Code testing

We will use `pytest` to test our code. Let's write a test case to test the `create_dataset` function for the `dim_customer` dataset. The test code is at **[de_project/tests/unit/dim_customer.py](https://github.com/josephmachado/de_project/blob/main/de_project/tests/unit/test_dim_customer.py)**.

### **Recommended reading**: **[How to use pytest to test your code](https://www.startdataengineering.com/post/code-patterns/#4-testing-with-pytest)**

We can run the tests via the terminal using


In [32]:
! python -m pytest de_project/tests/unit/test_dim_customer.py

platform linux -- Python 3.12.4, pytest-8.3.3, pluggy-1.5.0
rootdir: /home/josephkevinmachado/code/de_project
plugins: anyio-4.4.0
collected 1 item                                                               [0m

de_project/tests/unit/test_dim_customer.py [32m.[0m[32m                             [100%][0m



We will add the below code to our unit test case and run pytest.

## Orchestration and scheduling

We will run `Apache Airflow` to schedule and orchestrate our pipeline. We will set the schedule to be every 1 min (so that we can check them quickly) and we only have to run **[this function](https://github.com/josephmachado/de_project/blob/b9287d8e3a78f91626da71e8ed886875095f59dc/de_project/run_pipeline.py#L7)** which runs the ETL and outputs the `customer_outreach_metrics` data.

### **Recommended reading**: **[Why use Airflow](https://www.startdataengineering.com/post/why-to-use-orchestrators/)** & **[Docker for data engineers](https://www.startdataengineering.com/post/docker-for-de/)**.

In [33]:
! python ./setup/create_input_data.py
# recreate data to ensure data from above sections are cleaned out

Cleaning up tpch and metadata db files
Creating TPCH input data
Creating metadata table


In [None]:
# Run this your terminal, if the docker compose up command fails for you
#! sudo mkdir -p logs plugins temp dags tests migrations data visualization de_project && sudo chmod -R u=rwx,g=rwx,o=rwx logs plugins temp dags tests migrations data visualization de_project tpch.db metadata.db

When running the command above on VSCode, use its terminal.

Here's how you can run this via terminal (when using Jupyter notebook):

<video src="./assets/videos/perms.mp4" controls>

The below step takes a while (say 8min), the first time.

In [37]:
%%capture
! docker compose up airflow-init && docker compose up --build -d

In [38]:
! sleep 30
# this is to allow all airflow containers to fully start

In [39]:
! docker ps

CONTAINER ID   IMAGE                          COMMAND                  CREATED          STATUS                    PORTS                                       NAMES
2765ed630066   de_project-airflow-scheduler   "/usr/bin/dumb-init …"   2 minutes ago    Up 2 minutes (healthy)    8080/tcp                                    scheduler
408090f4413e   de_project-airflow-webserver   "/usr/bin/dumb-init …"   2 minutes ago    Up 2 minutes (healthy)    0.0.0.0:8080->8080/tcp, :::8080->8080/tcp   webserver
7f022043adf0   postgres:16                    "docker-entrypoint.s…"   11 minutes ago   Up 11 minutes (healthy)   0.0.0.0:5432->5432/tcp, :::5432->5432/tcp   postgres


Go to [http://localhost:8080](http://localhost:8080) or if you are running on GitHub Codespaces, go to the `ports` tab and click on the link for port 8080(like shown below).

**username**: airflow

**password**: airflow

![Open Airflow on GitHub Codespaces](./assets/images/cs3.png)

Your DAG will look like the below

![TPCH ETL DAG](./assets/images/dagdep.png)

In [40]:
%%capture
! docker compose down --volumes --rmi all
# shut down docker containers`

# What data project to build

Check out this post, **[that goes over how to decide on a data project for your portfolio](https://www.startdataengineering.com/post/what-data-project-to-build/)**.

# Note: If you liked this you'll enjoy my 3 weekend intensive data engineering hands-on workshop, **[sign up here](https://astounding-architect-5764.ck.page/684e1f422f)**

# Please provide feedback(anonymously) at https://form.typeform.com/to/AyUYk4RZ

# Next steps

In the next post, we will cover the following:

1. Setting up a proper DQ system
2. Creating a visualization dashboard
3. Moving common parts of code to the base class
4. Persisting tables in a cloud storage system (e.g. S3)
5. Deploying the pipeline to the cloud with Terraform
6. Setting up monitoring and alerting infrastructure
7. Setting up a dashboard to visualize the output

