# Inserting Datas Into Database Tables

In [1]:
# Import libraries
from pyspark.sql import SparkSession
import pandas as pd
from sqlalchemy import create_engine
import psycopg2
from pyspark.sql.functions import when

In [2]:
spark = SparkSession.builder.appName("InsertDataIntoPostgreSQL").getOrCreate()

In [90]:
df=spark.read.csv("export (1).csv",header=True, inferSchema=True, multiLine=True, escape='"')

In [91]:
# PostgreSQL database connection details
db_name = "retail_store_db"
db_host = "localhost"
db_username = "retail_db_user"
db_password = "retail_db_password"
db_port = "5432"

In [117]:
# Create SQLAlchemy engine for PostgreSQL and establish connection to database
engine = create_engine(f'postgresql+psycopg2://{db_username}:{db_password}@{db_host}:{db_port}/{db_name}')
conn = engine.connect()

### Data Insertion Overview

The following tables need to be populated as part of the data pipeline:

- **Sales**: Contains transactional sales data, including product details, customer information, and order metadata.
- **Orders**: Stores information about each order placed by customers.
- **Customers**: Maintains customer details.
- **Geography**: Holds postal code mappings to geographical locations.
- **Products**: Contains product-related details.


 **Customer Dimension Table**
<!-- CREATE TABLE Customers (
    CustomerID VARCHAR PRIMARY KEY,
    CustomerName VARCHAR NOT NULL,
    Segment VARCHAR CHECK (Segment IN ('Consumer', 'Corporate', 'Home Office'))
); -->

In [10]:
## Customers Table
customer_df = df.select("Customer ID", "Customer Name", "Segment").distinct()
customer_pd = customer_df.toPandas()
customer_pd.columns = ["customerid", "customername", "segment"]
customer_pd.to_sql("customers", con=conn, if_exists='append', index=False)

793

In [11]:
customer_pd

Unnamed: 0,customerid,customername,segment
0,LS-17200,Luke Schmidt,Corporate
1,PN-18775,Parhena Norris,Home Office
2,DP-13105,Dave Poirier,Corporate
3,GH-14665,Greg Hansen,Consumer
4,MG-17875,Michael Grace,Home Office
...,...,...,...
788,AB-10165,Alan Barnes,Consumer
789,EN-13780,Edward Nazzal,Consumer
790,NR-18550,Nick Radford,Consumer
791,LM-17065,Liz MacKendrick,Consumer


**Geography Dimension Table**
<!-- CREATE TABLE Geography (
    PostalCode VARCHAR PRIMARY KEY,
    City VARCHAR NOT NULL,
    State VARCHAR NOT NULL,
    Region VARCHAR CHECK (Region IN ('West', 'East', 'Central', 'South'))
);
 -->

In [93]:
# Check 'N/A' rows in df
df.select("Postal Code", "City", "State", "Region").filter(col("Postal Code") == "N/A").show()

+-----------+----------+-------+------+
|Postal Code|      City|  State|Region|
+-----------+----------+-------+------+
|        N/A|Burlington|Vermont|  East|
|        N/A|Burlington|Vermont|  East|
|        N/A|Burlington|Vermont|  East|
|        N/A|Burlington|Vermont|  East|
|        N/A|Burlington|Vermont|  East|
|        N/A|Burlington|Vermont|  East|
|        N/A|Burlington|Vermont|  East|
|        N/A|Burlington|Vermont|  East|
|        N/A|Burlington|Vermont|  East|
|        N/A|Burlington|Vermont|  East|
|        N/A|Burlington|Vermont|  East|
+-----------+----------+-------+------+



In [94]:
df = df.withColumn("Postal Code", when(col("Postal Code") == "N/A", "5401").otherwise(col("Postal Code")))
df.select("Postal Code", "City", "State", "Region").filter(col("Postal Code") == "N/A").show()


+-----------+----+-----+------+
|Postal Code|City|State|Region|
+-----------+----+-----+------+
+-----------+----+-----+------+



Since all missing postal codes are from **Burlington, Vermont (East region)**, they are being replaced with the correct postal code **"05401"** .

In [98]:
## Geography Table
geography_df = df.select("Postal Code", "City", "State", "Region").distinct()
geography_pd = geography_df.toPandas()
geography_pd.columns = ["postalcode", "city", "state", "region"]

In [99]:
# Identify duplicate postal codes
duplicate_postal_codes = geography_pd[geography_pd.duplicated(subset=['postalcode'], keep=False)]
# Print the updated duplicate rows
duplicate_postal_codes = geography_pd[geography_pd.duplicated(subset=['postalcode'], keep=False)]
print(duplicate_postal_codes)


    postalcode       city       state region
52       92024  San Diego  California   West
625      92024  Encinitas  California   West


In [100]:
df = df.withColumn("Postal Code", when(df["City"] == "San Diego", 92101).otherwise(df["Postal Code"]))

The dataset contains duplicate postal codes for different cities. Specifically, **92024** is assigned to both **San Diego** and **Encinitas** in California. To resolve this inconsistency, replaced the **Postal Code** for **San Diego** with **92101**.

In [105]:
## Geography Table
geography_df = df.select("Postal Code", "City", "State", "Region").distinct()
geography_pd = geography_df.toPandas()
geography_pd.columns = ["postalcode", "city", "state", "region"]
geography_pd.to_sql("geography", con=conn, if_exists='append', index=False)


626

 **Product Dimension Table**
<!-- CREATE TABLE Products (
    ProductID VARCHAR NOT NULL,
    ProductName VARCHAR NOT NULL,
    SubCategory VARCHAR NOT NULL,
    Category VARCHAR CHECK (Category IN ('Furniture', 'Office Supplies', 'Technology')),
    PRIMARY KEY (ProductID,ProductName)
); -->

In [None]:
## Products Table
product_df = df.select("Product ID", "Product Name", "Sub-Category", "Category").distinct()
product_pd = product_df.toPandas()
product_pd.columns = ["productid", "productname", "subcategory", "category"]

In [58]:
# Check for duplicate values based on "productid"
duplicate_rows = product_pd[product_pd.duplicated(subset=["productid"], keep=False)]
# Print duplicate values
print(duplicate_rows)


            productid                                        productname  \
0     OFF-PA-10000659  TOPS Carbonless Receipt Book, Four 2-3/4 x 7-1...   
4     OFF-ST-10001228       Fellowes Personal Hanging Folder Files, Navy   
31    FUR-FU-10004091            Eldon 200 Class Desk Accessories, Black   
50    FUR-FU-10004270     Executive Impressions 13" Clairmont Wall Clock   
64    TEC-PH-10001530                     Plantronics Voyager Pro Legend   
...               ...                                                ...   
1795  OFF-ST-10004950           Acco Perma 3000 Stacking Storage Drawers   
1805  FUR-FU-10001473  Eldon Executive Woodline II Desk Accessories, ...   
1808  FUR-CH-10001146                           Global Task Chair, Black   
1841  OFF-PA-10000659  Adams Phone Message Book, Professional, 400 Me...   
1843  OFF-BI-10004654  Avery Binding System Hidden Tab Executive Styl...   

      subcategory         category  
0           Paper  Office Supplies  
4         Sto

In the Products table, since ProductID alone is not unique, a composite primary key (ProductID, ProductName) is used.

In [74]:
product_pd.to_sql("products", con=conn, if_exists='append', index=False)

893

**Order Dimension Table**
<!-- CREATE TABLE Orders (
    OrderID VARCHAR PRIMARY KEY,
    OrderDate DATE NOT NULL,
    ShipDate DATE NOT NULL,
    ShipMode VARCHAR CHECK (ShipMode IN ('First Class', 'Second Class', 'Standard Class', 'Same Day'))
); -->

In [56]:
## Orders Table
order_df = df.select("Order ID", "Order Date", "Ship Date", "Ship Mode").distinct()
order_pd = order_df.toPandas()
order_pd.columns = ["orderid", "orderdate", "shipdate", "shipmode"]
order_pd.to_sql("orders", con=conn, if_exists='append', index=False)

922

**Fact Table: Sales**
<!-- CREATE TABLE Retail_Sales (
    RowID SERIAL PRIMARY KEY,
    OrderID VARCHAR NOT NULL,
    CustomerID VARCHAR NOT NULL,
    PostalCode VARCHAR NOT NULL,
    ProductID VARCHAR NOT NULL,
    ProductName VARCHAR NOT NULL,
    Sales DECIMAL(10,2) NOT NULL CHECK (Sales >= 0), 
    OrderYear INT CHECK (OrderYear BETWEEN 2000 AND 2100),
    OrderMonth INT CHECK (OrderMonth BETWEEN 1 AND 12),
    FOREIGN KEY (OrderID) REFERENCES Orders(OrderID) ON DELETE CASCADE,
    FOREIGN KEY (CustomerID) REFERENCES Customers(CustomerID) ON DELETE CASCADE,
    FOREIGN KEY (PostalCode) REFERENCES Geography(PostalCode) ON DELETE CASCADE,
    FOREIGN KEY (ProductID, ProductName) REFERENCES Products(ProductID, ProductName) ON DELETE CASCADE
); -->

In [118]:

retail_sales_df = df.select(
    col("Row ID").alias("rowid"),
    col("Order ID").alias("orderid"),
    col("Customer ID").alias("customerid"),
    col("Postal Code").alias("postalcode"),
    col("Product ID").alias("productid"),
    col("Product Name").alias("productname"),
    col("Sales").alias("sales"),
    col("Order Year").alias("orderyear"),
    col("Order Month").alias("ordermonth")
)

# Convert the PySpark DataFrame to Pandas DataFrame for database insertion
retail_sales_pd = retail_sales_df.toPandas()

# Insert data into the Retail_Sales table
retail_sales_pd.to_sql("retail_sales", con=engine, if_exists="append", index=False)

print("Data successfully inserted into Retail_Sales table!")


Data successfully inserted into Retail_Sales table!


In [120]:
retail_sales_pd.count()

rowid          9800
orderid        9800
customerid     9800
postalcode     9800
productid      9800
productname    9800
sales          9800
orderyear      9800
ordermonth     9800
dtype: int64

In [14]:
# # Insert data into PostgreSQL table
# df_pd = df.toPandas
# table_name = "main123"
# df_pd.to_sql(table_name, con=conn, if_exists='append', index=False)
# print(f"Data inserted into {table_name} successfully")

Data inserted into main123 successfully


## Conclusion
Designed efficient PostgreSQL schema for retail analytics and corresponding data are then populated into the schema.