In [6]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, count, to_date
from pyspark.sql.types import IntegerType, DoubleType, StringType, DateType

# Initialize Spark session with legacy date parsing policy
spark = SparkSession.builder \
    .appName("Sales Analysis") \
    .config("spark.sql.shuffle.partitions", "8") \
    .config("spark.executor.memory", "2g") \
    .config("spark.driver.memory", "2g") \
    .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
    .getOrCreate()

# File path
file_path = r"C:\Users\CompUser\OneDrive\Documents\TigsitPracticaltest\ecommerce\data.csv"

# Read the CSV file
try:
    df = spark.read.option("header", "true").option("inferSchema", "true").csv(file_path)
    df.show(5, truncate=False)
    df.printSchema()
except Exception as e:
    print(f"Error reading CSV file: {e}")
    spark.stop()
    exit()

# Handle missing values and correct data types
df_cleaned = df.fillna({
    'InvoiceNo': 'Unknown',
    'StockCode': 'Unknown',
    'Description': 'No description',
    'Quantity': 0,
    'UnitPrice': 0.0,
    'CustomerID': 'Unknown',
    'Country': 'Unknown'
}).withColumn("InvoiceNo", col("InvoiceNo").cast(StringType())) \
 .withColumn("StockCode", col("StockCode").cast(StringType())) \
 .withColumn("Description", col("Description").cast(StringType())) \
 .withColumn("Quantity", col("Quantity").cast(IntegerType())) \
 .withColumn("UnitPrice", col("UnitPrice").cast(DoubleType())) \
 .withColumn("CustomerID", col("CustomerID").cast(StringType())) \
 .withColumn("Country", col("Country").cast(StringType()))

# Convert InvoiceDate to date format
try:
    df_cleaned = df_cleaned.withColumn("InvoiceDate", to_date(col("InvoiceDate"), "MM/dd/yyyy"))
    df_cleaned.filter(col("InvoiceDate").isNull()).show()
except Exception as e:
    print(f"Error converting InvoiceDate: {e}")

# Ensure no null values in Quantity and UnitPrice before calculation
df_cleaned = df_cleaned.na.fill({"Quantity": 0, "UnitPrice": 0.0})

# Calculate Total Sales and Number of Transactions per Day
try:
    sales_per_day = df_cleaned.withColumn("TotalSales", col("Quantity") * col("UnitPrice")) \
                              .groupBy("InvoiceDate") \
                              .agg(
                                  sum("TotalSales").alias("TotalSales"),
                                  count("InvoiceNo").alias("NumberOfTransactions")
                              )
    
    # Show results
    sales_per_day.show(truncate=False)
except Exception as e:
    print(f"Error during aggregation: {e}")

# Stop Spark session
spark.stop()


+---------+---------+-----------------------------------+--------+--------------+---------+----------+--------------+
|InvoiceNo|StockCode|Description                        |Quantity|InvoiceDate   |UnitPrice|CustomerID|Country       |
+---------+---------+-----------------------------------+--------+--------------+---------+----------+--------------+
|536365   |85123A   |WHITE HANGING HEART T-LIGHT HOLDER |6       |12/1/2010 8:26|2.55     |17850     |United Kingdom|
|536365   |71053    |WHITE METAL LANTERN                |6       |12/1/2010 8:26|3.39     |17850     |United Kingdom|
|536365   |84406B   |CREAM CUPID HEARTS COAT HANGER     |8       |12/1/2010 8:26|2.75     |17850     |United Kingdom|
|536365   |84029G   |KNITTED UNION FLAG HOT WATER BOTTLE|6       |12/1/2010 8:26|3.39     |17850     |United Kingdom|
|536365   |84029E   |RED WOOLLY HOTTIE WHITE HEART.     |6       |12/1/2010 8:26|3.39     |17850     |United Kingdom|
+---------+---------+-----------------------------------