# Parts of data project

Most data engineering tool falls into one of the parts shown below: (as explained in this [post](https://www.startdataengineering.com/post/parts-of-dataengineering/))

![Data tools](./assets/images/data-tools.png)

In this notebook, we will go over the parts of a data project and choose tools to build a data pipeline. While we chose bitcoin data for this project, one can choose any data set they find interesting and follow the below steps to quickly build their data pipeline.

# Requirements

When it comes to requirements work with the end user to define them. The set of requirements are usually the following:

## Input dataset

Let's assume that we are working with a car part seller database (tpch). The data is available in a duckdb database. The data model is shown below:

![TPCH data model](./assets/images/tpch_erd.png)

In [1]:
import os

# Remove the file if it exists
if os.path.exists("tpch.db"):
    os.remove("tpch.db")
else:
    print("The file does not exist.")


In [2]:
# Create TPCH dataset 
import duckdb
con = duckdb.connect("tpch.db") # Define a .db file to persist the generated tpch data 
con.sql("INSTALL tpch;LOAD tpch;CALL dbgen(sf = 0.01);") # generate a 100MB TPCH dataset

┌─────────┐
│ Success │
│ boolean │
├─────────┤
│ 0 rows  │
└─────────┘

In [3]:
# check presence of TPCH tables
con.sql("select * from customer limit 2") 

┌───────────┬────────────────────┬────────────────────────────────┬─────────────┬─────────────────┬───────────────┬──────────────┬─────────────────────────────────────────────────────────────────┐
│ c_custkey │       c_name       │           c_address            │ c_nationkey │     c_phone     │   c_acctbal   │ c_mktsegment │                            c_comment                            │
│   int64   │      varchar       │            varchar             │    int32    │     varchar     │ decimal(15,2) │   varchar    │                             varchar                             │
├───────────┼────────────────────┼────────────────────────────────┼─────────────┼─────────────────┼───────────────┼──────────────┼─────────────────────────────────────────────────────────────────┤
│         1 │ Customer#000000001 │ j5JsirBM9PsCy0O1m              │          15 │ 25-989-741-2988 │        711.56 │ BUILDING     │ y final requests wake slyly quickly special accounts. blithely  │
│         2 │ C

## Columns and metics needed in the dataset produced

Assume that we have to create metrics for the customer outreach team. The dataset requires the following columns:

For each customer (i.e. one row per customer)

1. **customer_key**: The unique identifier for the customer 
2. **customer_name**: The customer name
3. **min_order_value**: The value of the order with the lowest value placed by this customer
4. **max_order_value**: The value of the order with the highest value placed by this customer
5. **avg_order_value**: The average value of all the orders placed by this customer  
6. **avg_num_items_per_order**: The average number of items per order placed by this customer

In [4]:
# simple query to get the output dataset
import duckdb
con = duckdb.connect("tpch.db")
con.sql("""
WITH order_items AS (
    SELECT
        l_orderkey,
        COUNT(*) AS item_count
    FROM
        lineitem
    GROUP BY
        l_orderkey
),
customer_orders AS (
    SELECT
        o.o_custkey,
        o.o_orderkey,
        o.o_totalprice,
        oi.item_count
    FROM
        orders o
    JOIN
        order_items oi ON o.o_orderkey = oi.l_orderkey
)
SELECT
    c.c_custkey AS customer_key,
    c.c_name AS customer_name,
    MIN(co.o_totalprice) AS min_order_value,
    MAX(co.o_totalprice) AS max_order_value,
    AVG(co.o_totalprice) AS avg_order_value,
    AVG(co.item_count) AS avg_num_items_per_order
FROM
    customer c
JOIN
    customer_orders co ON c.c_custkey = co.o_custkey
GROUP BY
    c.c_custkey, c.c_name;
""")

┌──────────────┬────────────────────┬─────────────────┬─────────────────┬────────────────────┬─────────────────────────┐
│ customer_key │   customer_name    │ min_order_value │ max_order_value │  avg_order_value   │ avg_num_items_per_order │
│    int64     │      varchar       │  decimal(15,2)  │  decimal(15,2)  │       double       │         double          │
├──────────────┼────────────────────┼─────────────────┼─────────────────┼────────────────────┼─────────────────────────┤
│         1234 │ Customer#000001234 │         9003.19 │       278195.78 │        139255.1685 │                     4.3 │
│         1249 │ Customer#000001249 │        24930.42 │       277810.17 │ 134356.99653846154 │       4.153846153846154 │
│          619 │ Customer#000000619 │         9264.42 │       358646.80 │ 154229.86541666667 │       4.208333333333333 │
│          100 │ Customer#000000100 │        30287.40 │       328307.40 │      168177.359375 │                    4.25 │
│          442 │ Customer#000000

In [5]:
con.commit()
con.close() # close connection, since duckdb only allows one connection to tpch.db

## SLA: Data freshness

Assume that our stakeholders require the data to be no older than 12h. This means that your pipeline should run completely atleast once every 12h. If we assume that the pipeline runs in 2h, we need to ensure that the pipeline is run atleast every 10h, so that the data is not older than 12h at any given time.

* **SLA**: Max 12h latency
* **Pipeline frequency**: Run pipeline every 1h

## Table constraints and data quality

Since our dataset is at a customer level we can infer that 

1. **customer_key**: Has to be unique and not null

In order to avoid any surprises we can do a variance check on our metrics between each runs:

1. **avg_*** columns should not differ by more than 5% compared to prior runs (across all customers)

Read **[this article that covers all the different types of data quality checks for a detailed list of all the checks one may consider doing](https://www.startdataengineering.com/post/types-of-dq-checks/)**.

# Data flow architecture

Most tools have their version of the 3-hop architecture. For example dbt has its own version (stage, intermediate, mart) and spark has medallion (bronze, silver, gold).

Read **[this article that goes over the standard multi hop architecture in data systems](https://www.startdataengineering.com/post/de_best_practices/#31-use-standard-patterns-that-progressively-transform-your-data)**.

You may be wondering why we need this data flow architecture when we have **[the results easily with a simple query shown here](./setup-data-project.ipynb#Columns-and-metics-needed-in-the-dataset-produced)**. 

While this is a simple example, in most real-world projects you want to have a standard, cleaned and modelled dataset(bronze) that can be use to create specialized dataset for end-users(gold).

## Bronze

Since our dataset has data from customer, order and lineitem input datasets, we will bring those data into bronze tables. We will keep their names the same as the input datasets.

We will use `polars` as our data processor.

In [6]:
import duckdb
import polars as pl

con = duckdb.connect("tpch.db")
con.sql("show tables")

┌──────────┐
│   name   │
│ varchar  │
├──────────┤
│ customer │
│ lineitem │
│ nation   │
│ orders   │
│ part     │
│ partsupp │
│ region   │
│ supplier │
└──────────┘

In [7]:
# read customer, order, lineitem dataset from duckdb into polars dataframe
import duckdb
import polars as pl

con = duckdb.connect("tpch.db")
customer_df = con.sql("select * from customer").pl()
orders_df = con.sql("select * from orders").pl()
lineitem_df = con.sql("select * from lineitem").pl()

In [8]:
con.close() #close DuckDB connection

In [9]:
customer_df.schema

Schema([('c_custkey', Int64),
        ('c_name', String),
        ('c_address', String),
        ('c_nationkey', Int32),
        ('c_phone', String),
        ('c_acctbal', Decimal(precision=15, scale=2)),
        ('c_mktsegment', String),
        ('c_comment', String)])

In [10]:
# rename columns to confine to warehouse standards
cleaned_customer_df = customer_df.rename(lambda col_name: col_name[2:]).rename({"custkey": "customer_key"})
# remove c_ and then rename custkey to customer_key

In [11]:
cleaned_customer_df.schema

Schema([('customer_key', Int64),
        ('name', String),
        ('address', String),
        ('nationkey', Int32),
        ('phone', String),
        ('acctbal', Decimal(precision=15, scale=2)),
        ('mktsegment', String),
        ('comment', String)])

In [12]:
orders_df.schema, lineitem_df.schema
cleaned_orders_df = orders_df.rename(lambda col_name: col_name[2:]).rename({"custkey": "customer_key", "orderkey": "order_key"})
cleaned_lineitem_df = lineitem_df.rename(lambda col_name: col_name[2:]).rename({"orderkey": "order_key"})

In [13]:
cleaned_orders_df.schema, cleaned_lineitem_df.schema

(Schema([('order_key', Int64),
         ('customer_key', Int64),
         ('orderstatus', String),
         ('totalprice', Decimal(precision=15, scale=2)),
         ('orderdate', Date),
         ('orderpriority', String),
         ('clerk', String),
         ('shippriority', Int32),
         ('comment', String)]),
 Schema([('order_key', Int64),
         ('partkey', Int64),
         ('suppkey', Int64),
         ('linenumber', Int64),
         ('quantity', Decimal(precision=15, scale=2)),
         ('extendedprice', Decimal(precision=15, scale=2)),
         ('discount', Decimal(precision=15, scale=2)),
         ('tax', Decimal(precision=15, scale=2)),
         ('returnflag', String),
         ('linestatus', String),
         ('shipdate', Date),
         ('commitdate', Date),
         ('receiptdate', Date),
         ('shipinstruct', String),
         ('shipmode', String),
         ('comment', String)]))

## Silver

In the silver layer the datasets are modelled based on one of the popular styles (e.g. Kimball, Data vault, etc). We will use Kimball's dimensional model as it is the most commonly used one and can account for a lot of use cases.

We will create the following datasets

1. **dim_customer**: A customer level table with all the necessary attributes of a customer. We will join nation and region data to the `cleaned_customer_df` to get all the attributes associated with a customer.
2. **fct_orders**: An order level fact(an event that happened) table. This will be the same as `cleaned_orders_df` since the `orders` table has all the necessary details about the order and how it associates with dimension tables like `customer_key`.
3. **fct_lineitem**: A lineitem (items that are part of an order) fact table. This table will be the same as `cleaned_lineitem_df` since the `lineitem` table has all the lineitem level details and keys to associate with dimension tables like `partkey` and `suppkey`.

In [14]:
# pull nation and region datasets into bronze layer
import duckdb
import polars as pl

con = duckdb.connect("tpch.db")
nation_df = con.sql("select * from nation").pl()
region_df = con.sql("select * from region").pl()

cleaned_nation_df = nation_df.rename(lambda col_name: col_name[2:])
cleaned_region_df = region_df.rename(lambda col_name: col_name[2:])
con.close() # close duckdb connection

In [15]:
cleaned_region_df.schema, cleaned_nation_df.schema, cleaned_customer_df.schema

(Schema([('regionkey', Int32), ('name', String), ('comment', String)]),
 Schema([('nationkey', Int32),
         ('name', String),
         ('regionkey', Int32),
         ('comment', String)]),
 Schema([('customer_key', Int64),
         ('name', String),
         ('address', String),
         ('nationkey', Int32),
         ('phone', String),
         ('acctbal', Decimal(precision=15, scale=2)),
         ('mktsegment', String),
         ('comment', String)]))

In [16]:
dim_customer = cleaned_customer_df\
.join(cleaned_nation_df, on="nationkey", how="left", suffix="_nation")\
.join(cleaned_region_df, on="regionkey", how="left", suffix="_region")\
.rename({
    "name_nation": "nation_name",
    "name_region": "region_name",
    "comment_nation": "nation_comment",
    "comment_region": "region_comment"
})

In [17]:
dim_customer.schema

Schema([('customer_key', Int64),
        ('name', String),
        ('address', String),
        ('nationkey', Int32),
        ('phone', String),
        ('acctbal', Decimal(precision=15, scale=2)),
        ('mktsegment', String),
        ('comment', String),
        ('nation_name', String),
        ('regionkey', Int32),
        ('nation_comment', String),
        ('region_name', String),
        ('region_comment', String)])

In [18]:
fct_orders = cleaned_orders_df
fct_lineitem = cleaned_lineitem_df

## Gold

In the gold layer datasets required for the end user are generated. The user required datasets are usually an aggregate of fact tables joined with dimension tables. In real-world projects you'd have multiple teams/users asking for differently grouped dataset from the same underlying fact and dimension tables. While you can join the necessary tables and aggregate them individually for each ask, it leads to repeated code and joins.

To avoid this issue, companies typically do the following:

1. **OBT**: This is usually a fact table with multiple dimension tables left joined with it.
2. **pre-aggregated table**; The OBT table is aggregated to the grain required by the end user/team. This will be the dataset that the end user access. By providing the end user with the exact columns they need, we can ensure that all the metrics are defined in one place and issues due to incorrect metric calculations by end-users are greatly reduced. These tables act as the SOT (source of truth) for our end-users. 

### OBT

In our example we have 2 fact tables, `fct_orders` and `fct_lineitem`. Since we only have one dimension `dim_customer` we can join `fct_orders` and `dim_customer` to create `wide_orders`. For our use case we can keep `fct_lineitem` as `wide_lineitem`.

Having said that we can easily see a case where we might need to join `parts` and `supplier` data with `fct_lineitem` to get `wide_lineitem`. But since our use case doesn't require this we can skip it!

In [19]:
fct_orders.schema, dim_customer.schema

(Schema([('order_key', Int64),
         ('customer_key', Int64),
         ('orderstatus', String),
         ('totalprice', Decimal(precision=15, scale=2)),
         ('orderdate', Date),
         ('orderpriority', String),
         ('clerk', String),
         ('shippriority', Int32),
         ('comment', String)]),
 Schema([('customer_key', Int64),
         ('name', String),
         ('address', String),
         ('nationkey', Int32),
         ('phone', String),
         ('acctbal', Decimal(precision=15, scale=2)),
         ('mktsegment', String),
         ('comment', String),
         ('nation_name', String),
         ('regionkey', Int32),
         ('nation_comment', String),
         ('region_name', String),
         ('region_comment', String)]))

In [20]:
# create wide_orders table
wide_orders = fct_orders.join(dim_customer, on="customer_key", how="left")

In [21]:
wide_orders.shape

(15000, 21)

In [22]:
fct_orders.shape

(15000, 9)

In [23]:
wide_orders.schema

Schema([('order_key', Int64),
        ('customer_key', Int64),
        ('orderstatus', String),
        ('totalprice', Decimal(precision=15, scale=2)),
        ('orderdate', Date),
        ('orderpriority', String),
        ('clerk', String),
        ('shippriority', Int32),
        ('comment', String),
        ('name', String),
        ('address', String),
        ('nationkey', Int32),
        ('phone', String),
        ('acctbal', Decimal(precision=15, scale=2)),
        ('mktsegment', String),
        ('comment_right', String),
        ('nation_name', String),
        ('regionkey', Int32),
        ('nation_comment', String),
        ('region_name', String),
        ('region_comment', String)])

In [24]:
wide_lineitem = fct_lineitem

### Pre-aggregated data

According to our **[data requirements](./setup-data-project.ipynb#Columns-and-metics-needed-in-the-dataset-produced)** we can see that we need data from customer, orders and lineitem. Since we already have customer and order data in `wide_orders`, we can join that with `wide_lineitem` to get the necessary data.

We can call the final dataset as `customer_outreach_metrics` (read **[this article that goes over importance of naming](https://docs.getdbt.com/blog/on-the-importance-of-naming)**).

In [25]:
wide_lineitem.schema

Schema([('order_key', Int64),
        ('partkey', Int64),
        ('suppkey', Int64),
        ('linenumber', Int64),
        ('quantity', Decimal(precision=15, scale=2)),
        ('extendedprice', Decimal(precision=15, scale=2)),
        ('discount', Decimal(precision=15, scale=2)),
        ('tax', Decimal(precision=15, scale=2)),
        ('returnflag', String),
        ('linestatus', String),
        ('shipdate', Date),
        ('commitdate', Date),
        ('receiptdate', Date),
        ('shipinstruct', String),
        ('shipmode', String),
        ('comment', String)])

In [26]:
wide_orders.schema

Schema([('order_key', Int64),
        ('customer_key', Int64),
        ('orderstatus', String),
        ('totalprice', Decimal(precision=15, scale=2)),
        ('orderdate', Date),
        ('orderpriority', String),
        ('clerk', String),
        ('shippriority', Int32),
        ('comment', String),
        ('name', String),
        ('address', String),
        ('nationkey', Int32),
        ('phone', String),
        ('acctbal', Decimal(precision=15, scale=2)),
        ('mktsegment', String),
        ('comment_right', String),
        ('nation_name', String),
        ('regionkey', Int32),
        ('nation_comment', String),
        ('region_name', String),
        ('region_comment', String)])

In [27]:
# create customer_outreach_metrics

# get number of lineitems per order
order_lineitem_metrics = wide_lineitem.group_by(pl.col("order_key")).agg(pl.col("linenumber").count().alias("num_lineitems"))
# join the above df with wide_orders and group by customer key in wide orders to get avg, min, max order value & avg num items per order
customer_outreach_metrics = wide_orders\
.join(order_lineitem_metrics, on="order_key", how="left")\
.group_by(
    pl.col("customer_key"), 
    pl.col("name").alias("customer_name"))\
.agg(
    pl.min("totalprice").alias("min_order_value"),
    pl.max("totalprice").alias("max_order_value"),
    pl.mean("totalprice").alias("avg_order_value"),
    pl.mean("num_lineitems").alias("avg_num_items_per_order"),
)

In [28]:
customer_outreach_metrics.schema

Schema([('customer_key', Int64),
        ('customer_name', String),
        ('min_order_value', Decimal(precision=15, scale=2)),
        ('max_order_value', Decimal(precision=15, scale=2)),
        ('avg_order_value', Decimal(precision=15, scale=2)),
        ('avg_num_items_per_order', Float64)])

In [29]:
customer_outreach_metrics.limit(2)

customer_key,customer_name,min_order_value,max_order_value,avg_order_value,avg_num_items_per_order
i64,str,"decimal[15,2]","decimal[15,2]","decimal[15,2]",f64
958,"""Customer#000000958""",31973.21,314926.5,,4.388889
905,"""Customer#000000905""",17681.24,284167.18,,3.8


# Data model

## Facts and Dimensions

# Data processor and tools

## Data processor


## Data quality

As part of our requirements we saw that the output dataset needs to have 
1. Unique and distinct `customer_key`
2. Variance in `avg_*` columns between run should not be more than 5% (across all customers)

While the first test is a simple check, the second one requires that we use the previous runs data and compare it with the current run's data or we store the sum(avg_*) of each run. Let's store the run level metrics in a run_metadata table (in sqlite3)


In [30]:
import os

# Remove the file if it exists
if os.path.exists("metadata.db"):
    os.remove("metadata.db")
else:
    print("The file does not exist.")


In [31]:
# create sqlite3 table
import sqlite3

# Connect to SQLite database (or create it)
conn = sqlite3.connect('metadata.db')

# Create a cursor object
cursor = conn.cursor()

# Create the run_metadata table
cursor.execute('''
    CREATE TABLE run_metadata (
        run_id TEXT PRIMARY KEY,
        metadata TEXT
    )
''')

# Commit the changes and close the connection
conn.commit()
conn.close()


In [32]:
# store run metdata in table
import sqlite3

# Connect to SQLite database
conn = sqlite3.connect('metadata.db')

# Create a cursor object
cursor = conn.cursor()

# Insert data into the run_metadata table
cursor.execute('''
    INSERT INTO run_metadata (run_id, metadata)
    VALUES (?, ?)
''', ('2024-09-15-10-00', '{"sum_avg_num_items_per_order": 40045.28069559008, "sum_avg_order_value": 0.00}'))

# Commit the changes and close the connection
conn.commit()
conn.close()


In [33]:
# compare current run with the most recent run
customer_outreach_metrics.schema

Schema([('customer_key', Int64),
        ('customer_name', String),
        ('min_order_value', Decimal(precision=15, scale=2)),
        ('max_order_value', Decimal(precision=15, scale=2)),
        ('avg_order_value', Decimal(precision=15, scale=2)),
        ('avg_num_items_per_order', Float64)])

In [34]:
import json

json.loads(
    customer_outreach_metrics\
    .select(
        pl.col("avg_num_items_per_order").alias("sum_avg_num_items_per_order"),
        pl.col("avg_order_value").alias("sum_avg_order_value")
    )\
    .sum()\
    .write_json())[0]

{'sum_avg_num_items_per_order': 4008.885007123707,
 'sum_avg_order_value': '0.00'}

In [35]:
# Get most recent data from run_metadata table
import sqlite3

# Connect to SQLite database
conn = sqlite3.connect('metadata.db')

# Create a cursor object
cursor = conn.cursor()

# Fetch the most recent row based on run_id
cursor.execute('''
    SELECT * FROM run_metadata
    ORDER BY run_id DESC
    LIMIT 1
''')

# Get the result
most_recent_row = cursor.fetchone()

# Close the connection
conn.close()

# Print the result
print(most_recent_row)


('2024-09-15-10-00', '{"sum_avg_num_items_per_order": 40045.28069559008, "sum_avg_order_value": 0.00}')


In [36]:
prev_metric = json.loads(most_recent_row[1])

In [37]:
# get current metric
curr_metric = json.loads(
    customer_outreach_metrics\
    .select(
        pl.col("avg_num_items_per_order").alias("sum_avg_num_items_per_order"),
        pl.col("avg_order_value").cast(int).alias("sum_avg_order_value")
    )\
    .sum()\
    .write_json())[0]

In [38]:
curr_metric

{'sum_avg_num_items_per_order': 4008.885007123707, 'sum_avg_order_value': 0}

In [39]:
# Compare with current data for variance percentage
def percentage_difference(val1, val2):
    if val1 == 0 and val2 == 0:
        return 0.0
    elif val1 == 0 or val2 == 0:
        return 100.0
    return abs((val1 - val2) / ((val1 + val2) / 2)) * 100

comparison = {}
for key in curr_metric:
    if key in prev_metric:
        comparison[key] = percentage_difference(curr_metric[key], prev_metric[key])

print(comparison)


{'sum_avg_num_items_per_order': 163.60040016032573, 'sum_avg_order_value': 0.0}


In [40]:
comparison['sample'] = 10

In [41]:
for k, v in comparison.items():
    if v >= 5:
        raise Exception(f"Difference for {k} is greater than 5%: {v}%")

Exception: Difference for sum_avg_num_items_per_order is greater than 5%: 163.60040016032573%

In [42]:
del(comparison['sample'])

In [43]:
# Insert current run data into run_metadata table
# store run metdata in table
import sqlite3

# Connect to SQLite database
conn = sqlite3.connect('metadata.db')

# Create a cursor object
cursor = conn.cursor()
comparison_json = json.dumps(comparison)

# Insert data into the run_metadata table
cursor.execute('''
    INSERT INTO run_metadata (run_id, metadata)
    VALUES (?, ?)
''', ('2024-09-15-12-00', comparison_json))

# Commit the changes and close the connection
conn.commit()
conn.close()


In [44]:
import sqlite3

# Connect to SQLite database
conn = sqlite3.connect('metadata.db')

# Create a cursor object
cursor = conn.cursor()

# Select all data from run_metadata table
cursor.execute('SELECT * FROM run_metadata')

# Fetch all rows
rows = cursor.fetchall()

# Close the connection
conn.close()

# Print the result
for row in rows:
    print(row)


('2024-09-15-10-00', '{"sum_avg_num_items_per_order": 40045.28069559008, "sum_avg_order_value": 0.00}')
('2024-09-15-12-00', '{"sum_avg_num_items_per_order": 163.60040016032573, "sum_avg_order_value": 0.0}')


## Code testing

We will use `pytest` to test our code. Read **[this article for detailed explanation on how to use pytest](https://www.startdataengineering.com/post/code-patterns/#4-testing-with-pytest)**.
                                                                                                           
In order to test our code, we need to create modular functions. Let's create a function to create the silver `dim_customer` table.                                                                                                           

In [47]:
test_create_dim_customer(cleaned_customer_df, cleaned_nation_df, cleaned_region_df)

AttributeError: 'function' object has no attribute 'join'

# Code organization

Deciding how to organize your code can be overwhelming. Typically companies use one of the following options to organize code:

1. Based on multi hop architecture. E.g. **[see this dbt folder structure](https://github.com/dbt-labs/jaffle_shop_duckdb/tree/duckdb/models)**
2. Based on existing company standards.

For our use case(and most real life projects) we can use the following folder structure.

![Folder structure](./assets/images/folder.png)

## Code modularity

We have code in this notebook to create our `customer_outreach_metrics` table. We will use the functional principles explained **[in this post](https://www.startdataengineering.com/post/code-patterns/#1-functional-design)** to turn them into modular functions.

In [45]:
# this will be in python script named dim_customer.py, so calling this function would be indicative of the dataset it will produce
# dim_customer.create_dataset
def create_dataset(cleaned_customer_df, cleaned_nation_df, cleaned_region_df):
    return cleaned_customer_df\
        .join(cleaned_nation_df, on="nationkey", how="left", suffix="_nation")\
        .join(cleaned_region_df, on="regionkey", how="left", suffix="_region")\
        .rename({
            "name_nation": "nation_name",
            "name_region": "region_name",
            "comment_nation": "nation_comment",
            "comment_region": "region_comment"
        })

Let's go ahead an go over the code available.

In [46]:
import pytest
import polars as pl

# Sample data for testing
@pytest.fixture
def cleaned_customer_df():
    return pl.DataFrame({
        'custkey': [1, 2],
        'name': ['Customer1', 'Customer2'],
        'nationkey': [101, 102],
        'regionkey': [201, 202]
    })

@pytest.fixture
def cleaned_nation_df():
    return pl.DataFrame({
        'nationkey': [101, 102],
        'name_nation': ['Nation1', 'Nation2'],
        'comment_nation': ['Comment1', 'Comment2']
    })

@pytest.fixture
def cleaned_region_df():
    return pl.DataFrame({
        'regionkey': [201, 202],
        'name_region': ['Region1', 'Region2'],
        'comment_region': ['Comment3', 'Comment4']
    })

# The function to test
def test_create_dim_customer(cleaned_customer_df, cleaned_nation_df, cleaned_region_df):
    result_df = create_dim_customer(cleaned_customer_df, cleaned_nation_df, cleaned_region_df)

    expected_df = pl.DataFrame({
        'custkey': [1, 2, 3],
        'name': ['Customer1', 'Customer2'],
        'nationkey': [101, 102],
        'regionkey': [201, 202],
        'nation_name': ['Nation1', 'Nation2'],
        'region_name': ['Region1', 'Region2'],
        'nation_comment': ['Comment1', 'Comment2'],
        'region_comment': ['Comment3', 'Comment4']
    })

    assert result_df.frame_equal(expected_df)
