## Building ETL Pipelines

In [None]:
import pandas as pd
import numpy as np
from datetime import datetime, timedelta

# Generate sample sales data
np.random.seed(42)
n_records = 1000

sample_data = pd.DataFrame({
    'order_id': range(1, n_records + 1),
    'customer_id': np.random.randint(1000, 2000, n_records),
    'product_id': np.random.randint(100, 200, n_records),
    'product_name': np.random.choice(['Laptop', 'Mouse', 'Keyboard', 'Monitor', 'Headphones'], n_records),
    'quantity': np.random.randint(1, 10, n_records),
    'price': np.round(np.random.uniform(10, 1000, n_records), 2),
    'order_date': [datetime(2024, 1, 1) + timedelta(days=int(x)) for x in np.random.randint(0, 365, n_records)],
    'region': np.random.choice(['North', 'South', 'East', 'West'], n_records),
    'status': np.random.choice(['Completed', 'Pending', 'Cancelled'], n_records)
})

# Calculate total amount
sample_data['total_amount'] = sample_data['quantity'] * sample_data['price']

# Save to parquet file
sample_data.to_parquet("../data/sales_data.parquet", engine="fastparquet", index=False)

print("Sample sales_data.parquet created successfully!")

In [5]:
import pandas as pd

# Read the sales data into a DataFrame
sales_data = pd.read_parquet("../data/sales_data.parquet", engine="fastparquet")

# Check the data type of the columns of the DataFrames
print(sales_data.dtypes)

# Print the shape of the DataFrame, as well as the head
print(sales_data.shape)
print(sales_data.head())


order_id                 int64
customer_id              int64
product_id               int64
product_name            object
quantity                 int64
price                  float64
order_date      datetime64[ns]
region                  object
status                  object
total_amount           float64
dtype: object
(1000, 10)
   order_id  customer_id  product_id product_name  quantity   price  \
0         1         1102         104       Laptop         3  773.91   
1         2         1435         132   Headphones         4  503.91   
2         3         1860         164     Keyboard         7   21.98   
3         4         1270         117   Headphones         1   18.95   
4         5         1106         195       Laptop         7  363.46   

  order_date region     status  total_amount  
0 2024-07-09  South    Pending       2321.73  
1 2024-06-30  North    Pending       2015.64  
2 2024-11-30   West  Completed        153.86  
3 2024-07-27  South  Cancelled         18.95  
4 2

### Pulling data from SQL databases
SQL databases are one of the most used data storage tools in the world. Many companies have teams of several individuals responsible for creating and maintaining these databases, which typically store data crucial for day-to-day operations. These SQL databases are commonly used as source systems for a wide range of data pipelines.

In [None]:
import sqlalchemy

# Create a connection to the sales database
db_engine = sqlalchemy.create_engine("postgresql+psycopg2://repl:password@localhost:5432/sales")

# Query the sales table
raw_sales_data = pd.read_sql("SELECT * FROM sales;", con=db_engine)
print(raw_sales_data)


### Building functions to extract data
It's important to modularize code when building a data pipeline. This helps to make pipelines more readable and reusable, and can help to expedite troubleshooting efforts. Creating and using functions for distinct operations in a pipeline can even help when getting started on a new project by providing a framework to begin development.

pandas has been imported as pd, and sqlalchemy is ready to be used.

In [None]:
def extract():
    connection_uri = "postgresql+psycopg2://repl:password@localhost:5432/sales"
    db_engine = sqlalchemy.create_engine(connection_uri)
    raw_data = pd.read_sql("SELECT * FROM sales WHERE quantity_ordered = 1", db_engine)

    # Print the head of the DataFrame
    print(raw_data.head())

    # Return the extracted DataFrame
    return raw_data

# Call the extract() function
raw_sales_data = extract()
