In [142]:
!kaggle datasets list -s "Supermarket sales"

ref                                                                  title                                            size  lastUpdated          downloadCount  voteCount  usabilityRating  
-------------------------------------------------------------------  ----------------------------------------------  -----  -------------------  -------------  ---------  ---------------  
aungpyaeap/supermarket-sales                                         Supermarket sales                                36KB  2019-05-27 07:08:04         295758       2621  0.88235295       
yapwh1208/supermarket-sales-data                                     Supermarket Sales Data                            8MB  2023-09-22 01:06:21          14036        131  1.0              
surajjha101/stores-area-and-sales-data                               Supermarket store branches sales analysis        10KB  2022-04-29 11:10:16          44139        711  1.0              
lovishbansal123/sales-of-a-supermarket            

In [143]:
!kaggle datasets download -p data -d "aungpyaeap/supermarket-sales"

Dataset URL: https://www.kaggle.com/datasets/aungpyaeap/supermarket-sales
License(s): other
supermarket-sales.zip: Skipping, found more recently modified local copy (use --force to force download)


In [None]:
!unzip data/supermarket-sales.zip

Archive:  data/supermarket-sales.zip
replace supermarket_sales - Sheet1.csv? [y]es, [n]o, [A]ll, [N]one, [r]ename: 

Create the spark session and read file


In [None]:
from pyspark.sql import SparkSession
import os

# Create a SparkSession
spark = SparkSession.builder \
    .appName("SQLite JDBC") \
    .config("spark.jars", f"{os.getcwd()}/sqlite-jdbc-3.47.0.0.jar") \
    .config("spark.driver.extraClassPath", f"{os.getcwd()}/sqlite-jdbc-3.34.0.jar") \
    .getOrCreate()

In [None]:
# Read the CSV file
df = spark.read.csv("data/supermarket_sales - Sheet1.csv", header=True, inferSchema=True)

# Show the first few rows of the DataFrame
df.show(5)

Check for null and duplicates

In [None]:
# Check for any null values
from functools import reduce
from pyspark.sql import functions as F

cols = [F.col(c) for c in df.columns]
filter_expr = reduce(lambda a, b: a | b.isNull(), cols[1:], cols[0].isNull())
df.filter(filter_expr).show()


In [None]:
# Check for dupliactes
df = df.groupBy(df.columns).agg(F.count("*").alias("count"))
duplicates = df.filter("count > 1")
duplicates.show()

Fromat column names

In [None]:
from pyspark.sql import functions as F
from pyspark.sql.functions import date_format,monotonically_increasing_id,to_date, col,to_timestamp,concat
from pyspark.sql import functions as F
df = df.select([F.col(col).alias(col.replace(' ', '_')) for col in df.columns])

for col in df.columns:
    df = df.withColumnRenamed(col, col.lower())


In [None]:
df.columns

Check Schema and cast dates and timestamps correctly

In [None]:
df.printSchema()

In [None]:
df = df.withColumn("time_of_p", date_format("time", "HH:mm:ss"))
df = df.withColumn("time_of_purchase", to_timestamp(concat(df.date, df.time_of_p), "M/d/yyyyHH:mm:ss"))
df = df.withColumn("date_of_purchase", to_date("date", "M/d/yyyy"))
df = df.drop("time","date","time_of_p")


Extract branch and city , for the dimension table

In [None]:
branch_city_df = df.select("Branch", "City").dropDuplicates().orderBy("Branch")

In [None]:
branch_city_df.show()

In [None]:
branch_city_df = branch_city_df.withColumn("branch_id", monotonically_increasing_id()+1)

In [None]:
branch_city_df.show()

In [None]:
df.columns

Left branch and city to main dataframe to replace these with an id, so the sales data can be prepared for sales facts, and customer dimension

In [None]:
df_with_branch_city = df.join(branch_city_df,["Branch","City"],"left")

In [None]:
df_with_branch_city.show(5)

In [None]:
df_with_branch_city = df_with_branch_city.drop("Branch","City")

In [None]:
df_with_branch_city.show(5)

In [None]:
customer_unique_df = df.select("customer_type", "gender","payment").dropDuplicates().orderBy("customer_type","payment","gender")
customer_unique_df_id = customer_unique_df.withColumn("cust_info_id", monotonically_increasing_id()+1)


In [None]:
customer_unique_df.show()

In [None]:
customer_unique_df_id.show()

In [None]:
df_with_customer_info = df_with_branch_city.join(customer_unique_df_id,["customer_type","gender","payment"],"left")


In [None]:
df_with_customer_info.show()

In [None]:
df_with_customer_info = df_with_customer_info.drop("customer_type","gender","payment")

In [None]:
df_with_customer_info.show()

The sales data is ready

In [None]:
sales_df = df_with_customer_info

Generated 3 dataframes.
1.branch_city_df - Branch & City Dimension dataframe
2.customer_unique_df_id - Customer information dimension dataframe
3.sales_df - Sales facts

Check schema, correct as needed and write to database.

In [None]:
branch_city_df.printSchema()

In [None]:
customer_unique_df_id.printSchema()

In [None]:
sales_df.printSchema()

In [None]:
sales_df.show()

In [None]:
# sales_df.write.format("jdbc") \
#     .option("url", "jdbc:sqlite:66degrees.db") \
#     .option("dbtable", "your_table_name") \
#     .mode("overwrite") \
#     .save()

In [None]:
import sqlite3
import pandas as pd
sqliteConnection = sqlite3.connect('66sales.sqlite')


In [None]:
cursor = sqliteConnection.cursor()

query = "select sqlite_version();"
cursor.execute(query)
result = cursor.fetchall()
print('SQLite Version is {}'.format(result))

In [None]:
sales_df_pandas = sales_df.toPandas()

In [None]:
sales_df_pandas.columns

In [None]:
type(sales_df_pandas)

In [None]:

sales_df_pandas['date_of_purchase'] = pd.to_datetime(sales_df_pandas['date_of_purchase'])

In [None]:
sales_df_pandas.dtypes

In [None]:
sales_df_pandas.to_sql(name='sales',con=sqliteConnection,if_exists='replace', index=False)

In [None]:
customer_unique_df_id.show()

In [None]:
customer_pd_df = customer_unique_df_id.toPandas()

In [None]:
customer_pd_df.to_sql(name='customer',con=sqliteConnection)

In [None]:
branch_city_df.show()

In [None]:
df_branch_city_pd = branch_city_df.toPandas()

In [None]:
df_branch_city_pd.to_sql(name='branch',con=sqliteConnection)