
**Scenario:** Daily Processing of Past Day's Orders

**1. Extract:**

* **Tools:** Python, `psycopg2`

In [10]:
import psycopg2

conn = psycopg2.connect(dbname="pacbook", user="postgres", password="mypassword", host="103.217.145.217")

# Create a cursor object
cur = conn.cursor()

# Extract data from the cust_order table
cur.execute("SELECT * FROM cust_order")
cust_orders = cur.fetchall()

# Extract data from other tables as needed
# Extract data from the order_line table
cur.execute("SELECT * FROM order_line")
order_lines = cur.fetchall()

# Extract data from the country table
cur.execute("SELECT * FROM country")
countries = cur.fetchall()

# Extract data from the author table
cur.execute("SELECT * FROM author")
authors = cur.fetchall()

# Extract data from the book_author table
cur.execute("SELECT * FROM book_author")
book_authors = cur.fetchall()

# Extract data from the book table
cur.execute("SELECT * FROM book")
books = cur.fetchall()

# Extract data from the customer_address table
cur.execute("SELECT * FROM customer_address")
customer_addresses = cur.fetchall()

# Extract data from the book table
cur.execute("SELECT * FROM address")
books = cur.fetchall()

# Close the cursor and connection
cur.close()
conn.close()


**2. Load to Staging:**

* **Tools:** Python, `pandas` (optional)

In [11]:
import pandas as pd

# Load data to staging using pandas (optional)
cust_orders_df = pd.DataFrame(cust_orders)
order_lines_df = pd.DataFrame(order_lines)
countries_df = pd.DataFrame(countries)
authors_df = pd.DataFrame(authors)
book_authors_df = pd.DataFrame(book_authors)
books_df = pd.DataFrame(books)
customer_addresses_df = pd.DataFrame(customer_addresses)

# Load data to staging area in Data Warehouse database (replace with actual method)
cust_orders_df.to_sql("staging_cust_orders", con="dw_connection_string", index=False, if_exists="replace")
order_lines_df.to_sql("staging_order_lines", con="dw_connection_string", index=False, if_exists="replace")
countries_df.to_sql("staging_countries", con="dw_connection_string", index=False, if_exists="replace")
authors_df.to_sql("staging_authors", con="dw_connection_string", index=False, if_exists="replace")
book_authors_df.to_sql("staging_book_authors", con="dw_connection_string", index=False, if_exists="replace")
books_df.to_sql("staging_books", con="dw_connection_string", index=False, if_exists="replace")
customer_addresses_df.to_sql("staging_customer_addresses", con="dw_connection_string", index=False, if_exists="replace")

   0                          1  2  3    4
0  1 2022-06-23 02:18:54.998111  1  3  138
1  2 2022-08-24 23:10:36.998111  2  4  755
2  3 2021-06-06 23:38:26.998111  3  3  735
3  4 2021-06-23 09:35:51.998111  4  1  746
4  5 2022-07-28 06:53:05.998111  5  2  662


**3. Transform:**

* **Tools:** dbt

```sql
-- dbt model: transform_order_data.sql

CREATE OR REPLACE VIEW fct_order_country AS
WITH transformed_data AS (
  SELECT
    ol.order_id,
    ol.book_id,
    ol.price,
    c.first_name,
    c.last_name,
    b.title,
    a.author_name,
    b.isbn13,
    c2.country_name
FROM
    order_line ol
    JOIN cust_order co ON ol.order_id = co.order_id
    JOIN book b ON ol.book_id = b.book_id
    JOIN customer c ON co.customer_id = c.customer_id
    JOIN book_author ba ON b.book_id = ba.book_id
    JOIN author a ON ba.author_id = a.author_id
    left join address a on a.address_id = co.dest_address_id 
    left join country c2 on a.country_id = c2.country_id 
GROUP BY
    ol.order_id,
    ol.book_id,
    ol.price,
    c.first_name,
    c.last_name,
    b.title,
    a.author_name,
    b.isbn13,
    c2.country_name
);

SELECT * FROM transformed_data;
```

This dbt model joins the extracted data with relevant dimension tables, calculates the total order value, and creates a view (`fct_order_country`) representing the transformed data.

**4. Load to Data Warehouse:**

dbt can handle loading the transformed data from the view (`fct_order_country`) into the final Data Warehouse table automatically.

**Running the Pipeline:**

This Python script can be wrapped into a Luigi task and scheduled to run daily using cron jobs or cloud-based schedulers.

**Testing Scenarios:**

**Scenario 1: Initial Load (First Day's Run)**

* **Testing Approach:** Run the pipeline manually for the first time.
* **Expected Outcome:**
    - The Data Warehouse table `fct_order_country` should be populated with data for yesterday's orders.
    - Verify data presence by querying the table using tools like pgAdmin or SQL commands.
    - Use dbt tests to confirm data counts and basic data integrity.

**Scenario 2: New Data (Processing Additional Orders from Today)**

* **Testing Approach:** Introduce new orders into the source system for today's date.
* **Expected Outcome:**
    - Run the pipeline, and it should process the new orders along with yesterday's data.
    - Verify that the `fct_order_country` table reflects the new orders alongside existing data.
    - dbt tests can help ensure data integrity remains consistent.

**Scenario 3: Error Handling (Missing Price in Source Data)**

* **Testing Approach:** Modify a record in the source system to have a missing price value.
* **Expected Outcome:**
    - The pipeline should handle the missing price gracefully (e.g., by logging a warning or using a default value).