In [None]:
scala.util.Properties.versionString



In [None]:
import $ivy.`org.apache.spark::spark-sql:3.3.2`
import org.apache.spark.sql.SparkSession


In [None]:
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
  .appName("Retail Data Analytics")
  .master("local[*]")
  .getOrCreate()

// Import implicits for DataFrame operations
import spark.implicits._

In [None]:
val df = spark.read
  .option("header", "true")             // First row contains column names
  .option("inferSchema", "true")        // Automatically infer data types
  .option("encoding", "ISO-8859-1")     // Correct encoding for special characters
  .csv("online_retail_ii.csv")          // File path (same directory)

df.show(10)                             // Display first 10 rows


In [None]:
// Display the first 10 rows of the DataFrame to get an overview of the data
df.show(10)

// Print the schema to understand the structure and data types of the columns
df.printSchema()

// Count the total number of rows in the DataFrame
df.count()


In [None]:
// Remove all rows with any null values
val df_cleaned = df.na.drop()

// Show the first 5 rows of the cleaned DataFrame
df_cleaned.show(5)


In [None]:
// Remove all rows that contain any null (missing) values
val df_cleaned = df.na.drop()

// Display the first 5 rows of the cleaned DataFrame
df_cleaned.show(5)


In [None]:
// Count the total number of rows remaining after removing null values
df_cleaned.count()

In [None]:
// Display basic statistics for all numeric columns
df_cleaned.describe().show()

// Print the schema of the cleaned DataFrame
df_cleaned.printSchema()


In [None]:
// Remove rows with any null (missing) values
val df_cleaned = df
  .na.drop()
  // Keep only rows where Quantity is greater than 0 (removes returns or errors)
  .filter($"Quantity" > 0)
  // Add a new column "TotalPrice" by multiplying Quantity and Price
  .withColumn("TotalPrice", $"Quantity" * $"Price")

// Display the first 5 rows showing Quantity, Price, and TotalPrice
df_cleaned.select("Quantity", "Price", "TotalPrice").show(5)


In [None]:
import org.apache.spark.sql.functions._

// Group the data by country and calculate the total sales (sum of TotalPrice)
val sales_by_country = df_cleaned
  .groupBy("Country")
  .agg(sum("TotalPrice").as("TotalSales"))

// Display the top 10 countries with the highest total sales, in descending order
sales_by_country.orderBy(desc("TotalSales")).show(10)


In [None]:
import org.apache.spark.sql.functions._

// Group the data by product description and calculate total sales for each product
val top_products = df_cleaned
  .groupBy("Description")
  .agg(sum("TotalPrice").as("TotalSales"))

// Display the 10 products that generated the most revenue
top_products.orderBy(desc("TotalSales")).show(10)


In [None]:
import org.apache.spark.sql.functions._

// Group the data by product description and calculate the total quantity sold for each product
val top_quantity_products = df_cleaned
  .groupBy("Description")
  .agg(sum("Quantity").as("TotalQuantity"))

// Display the 10 products that were sold in the highest quantity
top_quantity_products.orderBy(desc("TotalQuantity")).show(10)


In [None]:
import org.apache.spark.sql.functions._

// Add a new column "Month" by extracting the month number from the InvoiceDate column
val df_with_month = df_cleaned.withColumn("Month", month(col("InvoiceDate")))

// Group the data by month and calculate total sales for each month
val monthly_sales = df_with_month
  .groupBy("Month")
  .agg(sum("TotalPrice").as("MonthlySales"))

// Display the total sales for each month in chronological order
monthly_sales.orderBy("Month").show()
