### Implementing an ETL Pipeline in Python

To implement an ETL (Extract, Transform, Load) pipeline in Python, we'll go through the following steps using an example dataset from Kaggle. Let's break it down:

- **Extract:**  
  We’ll download data from a CSV file hosted on Kaggle (for this example, we have a dataset like `sample_sales_data.csv`).

- **Transform:**  
  We’ll clean the data (e.g., removing duplicates, handling missing values, formatting columns) and apply necessary transformations.

- **Load:**  
  We’ll load the cleaned and transformed data into a format suitable for a data warehouse (e.g., a DataFrame that can be easily loaded into a database like MySQL or PostgreSQL).


#### Step-by-Step ETL Implementation

- **Extract (Download CSV and Load Data):**  
  For this example, I will choose a publicly available dataset, such as the "Sample Sales Data" from Kaggle. You can find this dataset [here](https://www.kaggle.com/datasets/kyanyoga/sample-sales-data). We’ll follow the same steps: Extract, Transform, and Load.

  Here’s how to extract and load the raw data into a pandas DataFrame:

In [33]:
# import necessary libraries
import pandas as pd
from sqlalchemy import create_engine

In [34]:
# Step 1: Extract (Loading data from CSV)
data_path = './data/sales_data_sample.csv'  

def extract_data(file_path):
    # read the raw CSV data into a DataFrame
    raw_data = pd.read_csv(file_path, encoding='ISO-8859-1')
    return raw_data

In [35]:
# print the first few rows of the DataFrame
raw_data = extract_data(data_path)
raw_data.head()

Unnamed: 0,ORDERNUMBER,QUANTITYORDERED,PRICEEACH,ORDERLINENUMBER,SALES,ORDERDATE,STATUS,QTR_ID,MONTH_ID,YEAR_ID,...,ADDRESSLINE1,ADDRESSLINE2,CITY,STATE,POSTALCODE,COUNTRY,TERRITORY,CONTACTLASTNAME,CONTACTFIRSTNAME,DEALSIZE
0,10107,30,95.7,2,2871.0,2/24/2003 0:00,Shipped,1,2,2003,...,897 Long Airport Avenue,,NYC,NY,10022.0,USA,,Yu,Kwai,Small
1,10121,34,81.35,5,2765.9,5/7/2003 0:00,Shipped,2,5,2003,...,59 rue de l'Abbaye,,Reims,,51100.0,France,EMEA,Henriot,Paul,Small
2,10134,41,94.74,2,3884.34,7/1/2003 0:00,Shipped,3,7,2003,...,27 rue du Colonel Pierre Avia,,Paris,,75508.0,France,EMEA,Da Cunha,Daniel,Medium
3,10145,45,83.26,6,3746.7,8/25/2003 0:00,Shipped,3,8,2003,...,78934 Hillside Dr.,,Pasadena,CA,90003.0,USA,,Young,Julie,Medium
4,10159,49,100.0,14,5205.27,10/10/2003 0:00,Shipped,4,10,2003,...,7734 Strong St.,,San Francisco,CA,,USA,,Brown,Julie,Medium


In [36]:
raw_data.columns

Index(['ORDERNUMBER', 'QUANTITYORDERED', 'PRICEEACH', 'ORDERLINENUMBER',
       'SALES', 'ORDERDATE', 'STATUS', 'QTR_ID', 'MONTH_ID', 'YEAR_ID',
       'PRODUCTLINE', 'MSRP', 'PRODUCTCODE', 'CUSTOMERNAME', 'PHONE',
       'ADDRESSLINE1', 'ADDRESSLINE2', 'CITY', 'STATE', 'POSTALCODE',
       'COUNTRY', 'TERRITORY', 'CONTACTLASTNAME', 'CONTACTFIRSTNAME',
       'DEALSIZE'],
      dtype='object')

In [37]:
raw_data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2823 entries, 0 to 2822
Data columns (total 25 columns):
 #   Column            Non-Null Count  Dtype  
---  ------            --------------  -----  
 0   ORDERNUMBER       2823 non-null   int64  
 1   QUANTITYORDERED   2823 non-null   int64  
 2   PRICEEACH         2823 non-null   float64
 3   ORDERLINENUMBER   2823 non-null   int64  
 4   SALES             2823 non-null   float64
 5   ORDERDATE         2823 non-null   object 
 6   STATUS            2823 non-null   object 
 7   QTR_ID            2823 non-null   int64  
 8   MONTH_ID          2823 non-null   int64  
 9   YEAR_ID           2823 non-null   int64  
 10  PRODUCTLINE       2823 non-null   object 
 11  MSRP              2823 non-null   int64  
 12  PRODUCTCODE       2823 non-null   object 
 13  CUSTOMERNAME      2823 non-null   object 
 14  PHONE             2823 non-null   object 
 15  ADDRESSLINE1      2823 non-null   object 
 16  ADDRESSLINE2      302 non-null    object 


In [38]:
raw_data['STATUS'].value_counts()

STATUS
Shipped       2617
Cancelled       60
Resolved        47
On Hold         44
In Process      41
Disputed        14
Name: count, dtype: int64

In [39]:
raw_data.describe()

Unnamed: 0,ORDERNUMBER,QUANTITYORDERED,PRICEEACH,ORDERLINENUMBER,SALES,QTR_ID,MONTH_ID,YEAR_ID,MSRP
count,2823.0,2823.0,2823.0,2823.0,2823.0,2823.0,2823.0,2823.0,2823.0
mean,10258.725115,35.092809,83.658544,6.466171,3553.889072,2.717676,7.092455,2003.81509,100.715551
std,92.085478,9.741443,20.174277,4.225841,1841.865106,1.203878,3.656633,0.69967,40.187912
min,10100.0,6.0,26.88,1.0,482.13,1.0,1.0,2003.0,33.0
25%,10180.0,27.0,68.86,3.0,2203.43,2.0,4.0,2003.0,68.0
50%,10262.0,35.0,95.7,6.0,3184.8,3.0,8.0,2004.0,99.0
75%,10333.5,43.0,100.0,9.0,4508.0,4.0,11.0,2004.0,124.0
max,10425.0,97.0,100.0,18.0,14082.8,4.0,12.0,2005.0,214.0


In [40]:
# print duplicated rows
duplicated = raw_data.duplicated().sum()
print(f'There are {duplicated} duplicated rows in the dataset')

There are 0 duplicated rows in the dataset


In [41]:
# check for missing values
missing_values = raw_data.isnull().sum()
missing_values

ORDERNUMBER            0
QUANTITYORDERED        0
PRICEEACH              0
ORDERLINENUMBER        0
SALES                  0
ORDERDATE              0
STATUS                 0
QTR_ID                 0
MONTH_ID               0
YEAR_ID                0
PRODUCTLINE            0
MSRP                   0
PRODUCTCODE            0
CUSTOMERNAME           0
PHONE                  0
ADDRESSLINE1           0
ADDRESSLINE2        2521
CITY                   0
STATE               1486
POSTALCODE            76
COUNTRY                0
TERRITORY           1074
CONTACTLASTNAME        0
CONTACTFIRSTNAME       0
DEALSIZE               0
dtype: int64

- **Transform (Clean and Transform the Data):**  
  In this step, we perform various transformations:

  - Drop duplicate rows
  - Handle missing values (e.g., fill or drop)
  - Format date columns
  - Perform any other necessary transformations (e.g., filtering, column renaming)


In [42]:
# Step 2: Transform (Cleaning and transforming the data)
def transform_data(df):
   
    # drop duplicates
    df = df.drop_duplicates()

    # handle missing values
    df.fillna({
        'ADDRESSLINE2': 'Not Provided',
        'STATE': 'Unknown',
        'POSTALCODE': '00000',
        'TERRITORY': 'Unknown',
    }, inplace=True)

    # convert ORDERDATE to datetime
    df['ORDERDATE'] = pd.to_datetime(df['ORDERDATE'], errors='coerce')

    # filter rows where SALES are greater than 0
    df = df[df['SALES'] > 0]

    # create a new column for profit margin (assuming MSRP - PRICEEACH gives profit margin per unit)
    df['PROFIT_MARGIN'] = (df['MSRP'] - df['PRICEEACH']) * df['QUANTITYORDERED']

    # standardize DEALSIZE column to lowercase
    df['DEALSIZE'] = df['DEALSIZE'].str.lower()

    # extract year and month from ORDERDATE for analysis
    df['ORDER_YEAR'] = df['ORDERDATE'].dt.year
    df['ORDER_MONTH'] = df['ORDERDATE'].dt.month

    # drop unnecessary columns
    df = df.drop(columns=['PHONE', 'CONTACTLASTNAME', 'CONTACTFIRSTNAME'], errors='ignore')

    # rename columns for better readability
    df.rename(columns={
        'QUANTITYORDERED': 'QUANTITY_ORDERED',
        'PRICEEACH': 'PRICE_EACH',
        'ORDERLINENUMBER': 'ORDER_LINE_NUMBER',
    }, inplace=True)

    return df

In [43]:
# Transform the data
transformed_data = transform_data(raw_data)
print("Transformed Data:")
transformed_data.head()

Transformed Data:


Unnamed: 0,ORDERNUMBER,QUANTITY_ORDERED,PRICE_EACH,ORDER_LINE_NUMBER,SALES,ORDERDATE,STATUS,QTR_ID,MONTH_ID,YEAR_ID,...,ADDRESSLINE2,CITY,STATE,POSTALCODE,COUNTRY,TERRITORY,DEALSIZE,PROFIT_MARGIN,ORDER_YEAR,ORDER_MONTH
0,10107,30,95.7,2,2871.0,2003-02-24,Shipped,1,2,2003,...,Not Provided,NYC,NY,10022,USA,Unknown,small,-21.0,2003,2
1,10121,34,81.35,5,2765.9,2003-05-07,Shipped,2,5,2003,...,Not Provided,Reims,Unknown,51100,France,EMEA,small,464.1,2003,5
2,10134,41,94.74,2,3884.34,2003-07-01,Shipped,3,7,2003,...,Not Provided,Paris,Unknown,75508,France,EMEA,medium,10.66,2003,7
3,10145,45,83.26,6,3746.7,2003-08-25,Shipped,3,8,2003,...,Not Provided,Pasadena,CA,90003,USA,Unknown,medium,528.3,2003,8
4,10159,49,100.0,14,5205.27,2003-10-10,Shipped,4,10,2003,...,Not Provided,San Francisco,CA,0,USA,Unknown,medium,-245.0,2003,10


- **Load (Load the Data into a Data Warehouse):**  
  Finally, we’ll load the transformed data into a database (e.g., MySQL or PostgreSQL). We’ll use SQLAlchemy to connect to the database and load the data into a table.

In [44]:
# Step 3: Load (Load the cleaned data into a database)

def load_data(df, database_url, table_name='sales_data'):
    
    # create a connection to the database using SQLAlchemy
    engine = create_engine(database_url)
    
    # load the data into a table named 'sales_data' 
    df.to_sql(table_name, con=engine, if_exists='replace', index=False)

    print("Data has been successfully loaded into the database!")

In [45]:
# example database URL for MySQL
table_name = 'sales_data'
database_url = 'mysql+pymysql://root:123456@localhost/sales_db'

# load the data into the database
load_data(transformed_data, database_url, table_name)

Data has been successfully loaded into the database!


In [46]:
# query the database to verify the data load
def query_data(database_url, table_name):
    # create a connection to the database
    engine = create_engine(database_url)
    
    # read the data from the database into a DataFrame
    query = f'SELECT * FROM {table_name}'
    df = pd.read_sql(query, con=engine)
    
    return df


In [47]:
# query the database and print the first few rows
queried_data = query_data(database_url, table_name)
print("Queried Data:")
queried_data.head()

Queried Data:


Unnamed: 0,ORDERNUMBER,QUANTITY_ORDERED,PRICE_EACH,ORDER_LINE_NUMBER,SALES,ORDERDATE,STATUS,QTR_ID,MONTH_ID,YEAR_ID,...,ADDRESSLINE2,CITY,STATE,POSTALCODE,COUNTRY,TERRITORY,DEALSIZE,PROFIT_MARGIN,ORDER_YEAR,ORDER_MONTH
0,10107,30,95.7,2,2871.0,2003-02-24,Shipped,1,2,2003,...,Not Provided,NYC,NY,10022,USA,Unknown,small,-21.0,2003,2
1,10121,34,81.35,5,2765.9,2003-05-07,Shipped,2,5,2003,...,Not Provided,Reims,Unknown,51100,France,EMEA,small,464.1,2003,5
2,10134,41,94.74,2,3884.34,2003-07-01,Shipped,3,7,2003,...,Not Provided,Paris,Unknown,75508,France,EMEA,medium,10.66,2003,7
3,10145,45,83.26,6,3746.7,2003-08-25,Shipped,3,8,2003,...,Not Provided,Pasadena,CA,90003,USA,Unknown,medium,528.3,2003,8
4,10159,49,100.0,14,5205.27,2003-10-10,Shipped,4,10,2003,...,Not Provided,San Francisco,CA,0,USA,Unknown,medium,-245.0,2003,10


- **Running the Pipeline using Extract, Load and Tranform:**  

In [48]:
# function to run the pipeline
def run_pipeline(file_path, database_url, table_name):
    # Step 1: Extract
    raw_data = extract_data(file_path)
    
    # Step 2: Transform
    transformed_data = transform_data(raw_data)
    
    # Step 3: Load
    load_data(transformed_data, database_url, table_name)
    
    # Query the database to verify the load
    queried_data = query_data(database_url, table_name)
    
    return queried_data

In [49]:
# run the pipeline
table_name = 'sales_data'
data_path = './data/sales_data_sample.csv'  
database_url = 'mysql+pymysql://root:123456@localhost/sales_db'

pipeline_data = run_pipeline(data_path, database_url, table_name)
print("Pipeline Data:")
pipeline_data.head()

Data has been successfully loaded into the database!
Pipeline Data:


Unnamed: 0,ORDERNUMBER,QUANTITY_ORDERED,PRICE_EACH,ORDER_LINE_NUMBER,SALES,ORDERDATE,STATUS,QTR_ID,MONTH_ID,YEAR_ID,...,ADDRESSLINE2,CITY,STATE,POSTALCODE,COUNTRY,TERRITORY,DEALSIZE,PROFIT_MARGIN,ORDER_YEAR,ORDER_MONTH
0,10107,30,95.7,2,2871.0,2003-02-24,Shipped,1,2,2003,...,Not Provided,NYC,NY,10022,USA,Unknown,small,-21.0,2003,2
1,10121,34,81.35,5,2765.9,2003-05-07,Shipped,2,5,2003,...,Not Provided,Reims,Unknown,51100,France,EMEA,small,464.1,2003,5
2,10134,41,94.74,2,3884.34,2003-07-01,Shipped,3,7,2003,...,Not Provided,Paris,Unknown,75508,France,EMEA,medium,10.66,2003,7
3,10145,45,83.26,6,3746.7,2003-08-25,Shipped,3,8,2003,...,Not Provided,Pasadena,CA,90003,USA,Unknown,medium,528.3,2003,8
4,10159,49,100.0,14,5205.27,2003-10-10,Shipped,4,10,2003,...,Not Provided,San Francisco,CA,0,USA,Unknown,medium,-245.0,2003,10
