# PROG8245_Lab2 - Data Collection & Pre-Processing

This code snippet is part of a data processing pipeline for e-commerce transactions. It includes loading, cleaning, transforming, and calculating features from transaction data.

## 	Hello, Data!

In [1]:
import pandas as pd
from dataclasses import dataclass
from datetime import datetime

# 1	Load raw CSV, display first 3 rows
data_ecommerce = pd.read_csv("./data/US_Regional_Sales_Data.csv")
data_ecommerce.head(3)

Unnamed: 0,OrderNumber,Sales Channel,WarehouseCode,ProcuredDate,OrderDate,ShipDate,DeliveryDate,CurrencyCode,_SalesTeamID,_CustomerID,_StoreID,_ProductID,Order Quantity,Discount Applied,Unit Cost,Unit Price
0,SO - 000101,In-Store,WARE-UHY1004,31/12/17,31/5/18,14/6/18,19/6/18,USD,6,15,259,12,5,0.075,1001.18,1963.1
1,SO - 000102,Online,WARE-NMK1003,31/12/17,31/5/18,22/6/18,2/7/18,USD,14,20,196,27,3,0.075,3348.66,3939.6
2,SO - 000103,Distributor,WARE-UHY1004,31/12/17,31/5/18,21/6/18,1/7/18,USD,21,16,213,16,1,0.05,781.22,1775.5


## Pick the Right Container

A dict would be more appropriate for representing each row. 
This is because I can easily access the values by their column names. A dict is more flexible and does not require a lot of code

## Transaction Class and OO data structure
Creating a dataclass a data class using dataclass decorator. This class includes methods for cleaning and transforming the data, as well as properties for calculating total and profit.

In [2]:
# Micro-class representing a row of transaction data
@dataclass
class Transaction:
    OrderNumber: str
    OrderDate: str
    CustomerID: int
    ProductID: int
    StoreID: int
    UnitPrice: float
    UnitsSold: int
    UnitCost: float
    DiscountApplied: float
    SalesChannel: str
    
# *******CLEANING DATA********
    def clean(self):
        # Convert a numeric fields to appropriate types and handle negative values
        self.UnitPrice = float(str(self.UnitPrice).replace(",", ""))
        self.UnitCost = float(str(self.UnitCost).replace(",", ""))
        if self.UnitPrice < 0:
            self.UnitPrice = 0.01  # Set minimum valid price to 0.01
            
        #Validate OrderDate format
        if not self.OrderDate or self.OrderDate.strip() == "":
            self.OrderDate = "1/1/90"  # Default date if missing     

        # fix discount between 0 and 1
        if self.DiscountApplied < 0:
            self.DiscountApplied = 0
        elif self.DiscountApplied > 1:
            self.DiscountApplied = 1

        # Set minimum valid price to 0.01
        if self.UnitPrice <= 0:
            self.UnitPrice = 0.01

# *******TRANSFORMING DATA********
    def transformed(self):
         # Parse orderdate d/m/yy to YYYY-MM-DD 
        try:
            dt = datetime.strptime(self.OrderDate.strip(), "%d/%m/%y")
            self.OrderDate = dt.strftime("%Y-%m-%d")
        except (ValueError, AttributeError):
            self.OrderDate = "MISSING"

        #Convert SalesChannel to a numeric ID
        channels = ['Distributor', 'Wholesale', 'In-Store', 'Online', 'Direct Sales']
        try:
            clean_channel = self.SalesChannel.strip().title()  # Ej: "  online " â†’ "Online"
            self.channel_id = channels.index(clean_channel) + 1
        except (ValueError, AttributeError):
            self.channel_id = 0

# *******FEATURES*******
    # Calculating total 
    @property
    def total(self) -> float:
        return self.UnitPrice * (1 - self.DiscountApplied) * self.UnitsSold

    # Calculating profit
    @property
    def profit(self) -> float:
        net_price = self.UnitPrice * (1 - self.DiscountApplied)
        return (net_price - self.UnitCost) * self.UnitsSold
    

## Bulk Loader
This function loads the CSV file, and returns a list of Transaction objects.


In [3]:
# 5. Bulk loader Function
def load_transactions(path: str) -> list[Transaction]:
    df = pd.read_csv(path)
    transactions = [
        Transaction(
            OrderNumber=row["OrderNumber"],
            OrderDate=row["OrderDate"],
            CustomerID=row["_CustomerID"],
            ProductID=row["_ProductID"],
            StoreID=row["_StoreID"],
            UnitPrice=row["Unit Price"],
            UnitsSold=row["Order Quantity"],
            UnitCost=row["Unit Cost"],
            DiscountApplied=row["Discount Applied"],
            SalesChannel=row["Sales Channel"]
        )
        for _, row in df.iterrows()
    ]

    return transactions

In [4]:
#load transactions from CSV file using the bulk loader function
transactions = load_transactions("./data/US_Regional_Sales_Data.csv")
print(f"Total rows: {len(transactions)}")

Total rows: 7991


## Quick Profiling
This bloclk of code helps us understand the price distribution in the dataset and give us a quick overview.


In [5]:
# Quick profiling of Prices
# I noticed that the prices are stored as strings, so we need to convert them to floats for analysis.
prices = [float(str(t.UnitPrice).replace(",", "")) for t in transactions]

# Printing basic statistics like min, mean, and max prices
print("Min price:", min(prices))
print("Mean price:", sum(prices)/len(prices))
print("Max price:", max(prices))

# Finding unique shipping stores
shipping_store = set(t.StoreID for t in transactions) 
print("Unique shipping stores:", len(shipping_store))

Min price: 167.5
Mean price: 2284.5365035665122
Max price: 6566.0
Unique shipping stores: 367


## Spot the Grime
I changed some values to simulate dirty data for testing purposes, and I will carry out this activity.

In [6]:
# Spotting the Grime
transactions[395].UnitPrice = -10  # negative price
transactions[10].OrderDate = ''  # no date
for idx in [1, 52]:
    transactions[idx].DiscountApplied = 3  # discount greater than 1  

print("BEFORE CLEANING:")
# Finding strings in numeric fields
non_numeric_prices = [t for t in transactions if not isinstance(t.UnitPrice, (int, float))]
print(f"Transactions with non-numeric prices: {len(non_numeric_prices)}")
# Finding null or negative prices
dirty_prices = [t for t in transactions if t.UnitPrice is None or float(str(t.UnitPrice).replace(",", ""))  < 0]
print(f"Transactions with dirty prices: {len(dirty_prices)}")
# Transactions with missing or wrong dates
dirty_dates = [t for t in transactions if not t.OrderDate or t.OrderDate.strip() == ""]
print(f"Transactions with missing and wrong dates: {len(dirty_dates)}")
# Discount values outside [0, 1] range (if DiscountApplied is a proportion)
bad_discounts = [t for t in transactions if not 0 <= t.DiscountApplied <= 1]
print(f"Transactions with invalid discount values: {len(bad_discounts)}")



BEFORE CLEANING:
Transactions with non-numeric prices: 7990
Transactions with dirty prices: 1
Transactions with missing and wrong dates: 1
Transactions with invalid discount values: 2


## 	Cleaning Rules
Applying the .clean() method created earlier.

In [7]:
#Showing after run the cleaning function
transactions_cleaned = [t.clean() or t for t in transactions]

print("AFTER CLEANING:")
# Finding strings in numeric fields
non_numeric_prices = [t for t in transactions_cleaned if not isinstance(t.UnitPrice, (int, float))]
print(f"Transactions with non-numeric prices: {len(non_numeric_prices)}")
# Finding null or negative prices
dirty_prices = [t for t in transactions_cleaned if t.UnitPrice is None or float(str(t.UnitPrice).replace(",", ""))  < 0]
print(f"Transactions with dirty prices: {len(dirty_prices)}")
# Transactions with missing or wrong dates
dirty_dates = [t for t in transactions_cleaned if not t.OrderDate or t.OrderDate.strip() == ""]
print(f"Transactions with missing and wrong dates: {len(dirty_dates)}")
# Discount values outside [0, 1] range (if DiscountApplied is a proportion)
bad_discounts = [t for t in transactions_cleaned if not 0 <= t.DiscountApplied <= 1]
print(f"Transactions with invalid discount values: {len(bad_discounts)}")

AFTER CLEANING:
Transactions with non-numeric prices: 0
Transactions with dirty prices: 0
Transactions with missing and wrong dates: 0
Transactions with invalid discount values: 0


## Transformations
Aplying the transformation method created in the data class to change the date format and transform the channel sales in an id.

In [8]:
# Running the transformation function in the cleaned transactions
transactions_transformed = [t.transformed() or t for t in transactions_cleaned]


print("Sample new date format :", transactions_transformed[1].OrderDate)
print("Transaction with channel id:", transactions_transformed[5].channel_id)


Sample new date format : 2018-05-31
Transaction with channel id: 4


## Feature Engineering
I decided to calculate the total value and the profit. Using discount, quantity, unit price and unit cost.

In [9]:
#using the properties to calculate total and profit
print("Total sales for first transaction:", transactions_transformed[0].total)
print("Profit for first transaction:", transactions_transformed[0].profit)

Total sales for first transaction: 9079.3375
Profit for first transaction: 4073.4375000000005


## Mini-Aggregation
I want to demostrate the summary of revenue and profit by channel, creating two dictionaries.

In [10]:
# dict to save revenue and profit by channel
revenue_by_channel = {}
profit_by_channel = {}

# Calculating revenue and profit by channel
for t in transactions_transformed:
    revenue_by_channel[t.channel_id] = revenue_by_channel.get(t.channel_id, 0) + t.total
    profit_by_channel[t.channel_id] = profit_by_channel.get(t.channel_id, 0) + t.profit

# Displaying revenue and profit by channel
print("Results by channel:")
for channel_id, revenue in sorted(revenue_by_channel.items()):
    profit = profit_by_channel.get(channel_id, 0)
    print(f"Channel {channel_id}: ${revenue:.2f} - Profit ${profit:.2f}")

Results by channel:
Channel 1: $13169147.65 - Profit $3887897.39
Channel 2: $8172768.54 - Profit $2472744.54
Channel 3: $30092517.07 - Profit $8787465.61
Channel 4: $21668977.02 - Profit $6137233.86


## Serialization Checkpoint
Saving the clean df in a json file and parquet file.

In [11]:
# saving the cleaned and transformed data in a df using vars() to convert dataclass to dict
df_clean = pd.DataFrame([vars(t) for t in transactions_transformed])
# Saving the cleaned and transformed data to JSON and Parquet formats
df_clean.to_json("data/transactions_cleaned.json", orient="records", indent=2)
df_clean.to_parquet("data/transactions_cleaned.parquet", index=False)


## Soft Interview Reflection

At first, because I have a background in database modeling, I thought using a dictionary would be more flexible. I could access data using keys and complete each task without needing much structure.

However, using OOP made the data cleaning and transformation process more organized and easier to manage. By putting logic like .clean(), .transform(), and calculated features such as .total and .profit inside the *Transaction* class, I avoided repeating code and made everything clearer and more reusable. The code also looks cleaner, and if I work on another dataset in the future, I can reuse this class with just small changes. Using OOP is harder in terms of coding at first, but it gave me more control and helped me think of each row as an object with behavior, not just raw data.



# Data-Dictionary Section
In this section, I will merge my second source, which contains additional information about stores such as city, province, and estimated population.

Merged data with store metadata:

In [12]:
#Mergind the cleaned data with my dictionary of store   
df_meta = pd.read_csv("data/store_metadata_canada.csv")

#unique store IDs in the cleaned data
df_store_cities = df_clean.merge(df_meta, on="StoreID", how="left")

#showing the first 5 rows of the merged data
print("Merged data with store metadata:")


Merged data with store metadata:


In [13]:

# Define the sources for each column
txn_cols = [
    "OrderNumber", "OrderDate", "CustomerID", "ProductID", "StoreID",
    "UnitPrice", "UnitsSold", "UnitCost", "DiscountApplied", "SalesChannel",
    "channel_id", "total", "profit"
]
meta_cols = [col for col in df_store_cities.columns if col not in txn_cols]

# Build the table
summary_table = []
for col in df_store_cities.columns:
    dtype = str(df_store_cities[col].dtype)
    if col in txn_cols:
        source = "US_Regional_Sales_Data.csv"
    elif col in meta_cols:
        source = "store_metadata_canada.csv"
    else:
        source = "Unknown"
    summary_table.append([col, dtype, source])

df_summary = pd.DataFrame(summary_table, columns=["Column", "Type", "Source"])
print(df_summary.to_markdown(index=False))

| Column               | Type    | Source                     |
|:---------------------|:--------|:---------------------------|
| OrderNumber          | object  | US_Regional_Sales_Data.csv |
| OrderDate            | object  | US_Regional_Sales_Data.csv |
| CustomerID           | int64   | US_Regional_Sales_Data.csv |
| ProductID            | int64   | US_Regional_Sales_Data.csv |
| StoreID              | int64   | US_Regional_Sales_Data.csv |
| UnitPrice            | float64 | US_Regional_Sales_Data.csv |
| UnitsSold            | int64   | US_Regional_Sales_Data.csv |
| UnitCost             | float64 | US_Regional_Sales_Data.csv |
| DiscountApplied      | float64 | US_Regional_Sales_Data.csv |
| SalesChannel         | object  | US_Regional_Sales_Data.csv |
| channel_id           | int64   | US_Regional_Sales_Data.csv |
| city                 | object  | store_metadata_canada.csv  |
| province             | object  | store_metadata_canada.csv  |
| estimated_population | int64   | store

## References

Sample Data:
https://www.kaggle.com/datasets/talhabu/us-regional-sales-data?select=US_Regional_Sales_Data.csv
Using Markdown:
https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_markdown.html