<a href="https://colab.research.google.com/github/regalleo/regalleo/blob/main/Untitled11.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder.appName("CSV Reader").getOrCreate()
df1=spark.read.csv("customers.csv",inferSchema=True,header=True,sep=",").alias("c")
df2=spark.read.csv("products.csv",inferSchema=True,header=True,sep=",").alias("p")
df3=spark.read.csv("orders.csv",inferSchema=True,header=True,sep=",").alias("o")
df4=spark.read.csv("order_items.csv",inferSchema=True,header=True,sep=",").alias("oi")
df5=spark.read.csv("deliveries.csv",inferSchema=True,header=True,sep=",").alias("d")
df1.show()
df2.show()
df3.show()
df4.show()
df5.show()
j1=df1.join(df3,on="customer_id",how="inner").join(df4,on="order_id",how="inner").join(df2,on="product_id",how="inner")
j1.show()

+-----------+-----------------+--------------------+----------+---------+-----------+-------+-----------+
|customer_id|             name|               email|     phone|     city|      state|country|signup_date|
+-----------+-----------------+--------------------+----------+---------+-----------+-------+-----------+
|    CUST001|Raj Shekhar Singh| raj.singh@gmail.com|9876543210|Bangalore|  Karnataka|  India| 2022-05-10|
|    CUST002|    Ananya Sharma|ananya.sharma@yah...|9898989898|    Delhi|      Delhi|  India| 2021-08-14|
|    CUST003|      Rahul Verma|rahul.verma@gmail...|9812345678|   Mumbai|Maharashtra|  India| 2023-01-12|
|    CUST004|       Priya Nair|priya.nair@gmail.com|9123456780|  Chennai| Tamil Nadu|  India| 2020-12-05|
|    CUST005|     Sneha Kapoor|sneha.kapoor@outl...|9911223344|Hyderabad|  Telangana|  India| 2023-03-21|
+-----------+-----------------+--------------------+----------+---------+-----------+-------+-----------+

+----------+---------------+---------------+-

In [25]:
# 📝 20 DataFrame API Questions

# (Use PySpark/Scala DataFrame functions like groupBy, agg, withColumn, filter, etc.)

# Calculate total revenue per customer.
total_revenue_per_cust = df1.join(df3, "customer_id").join(df4, "order_id").join(df2, "product_id").withColumn("revenue", col("quantity") * col("price")).groupBy("customer_id", "c.name").agg(sum("revenue").alias("total_revenue")).show()

# Find top 10 products by sales quantity.
top_10=df2.join(df4,on="product_id").groupBy("product_id","name").agg(sum(col("quantity")).alias("total_quantity")).orderBy(col("total_quantity").desc()).limit(10).show()

# Get customers with an average order value > 20,000.
cust_with_avg_ord=df1.join(df3,on="customer_id").join(df4,on="order_id").join(df2,on="product_id").groupBy("customer_id","c.name").agg(avg(col("price")*col("quantity")).alias("avg_order_value")).show()
# Find customers who only bought items from "Electronics".
cust_only_ele=df1.join(df3,on="customer_id").join(df4,on="order_id").join(df2,on="product_id").groupBy("customer_id","c.name").agg(countDistinct("p.category").alias("num_categories"),first("category").alias("category")).filter((col("num_categories") == 1) & (col("category") == "Electronics")).show()

# Find customers with no delivered orders.
cust_no_orders=df1.join(df3,on="customer_id").join(df5,on="order_id",how="left").groupBy("c.name").agg(sum(when(col("delivery_status")=="Delivered",1).otherwise(0)).alias("count_of_delivered")).filter(col("count_of_delivered")==0).select(col("c.name")).show()

# Find the average number of items per order.
avy_num_items=df2.join(df4,on="product_id").groupBy("order_id").agg(avg(col("quantity")).alias("avg_quantity_per_order")).orderBy(col("order_id").asc()).show()

# Get all orders that contain both "Laptop" and "Mouse".
had_both=df2.join(df4,on="product_id").filter(col("p.name").isin("Laptop","Mouse")).groupBy("order_id").agg(countDistinct("name").alias("cnt")).filter(col("cnt")==2).show()

# Extract unique product categories purchased per customer.

# Find orders where total quantity of items >= 5.
total_quan=df4.groupBy("order_id").agg(sum(col("quantity")).alias("total_quantity")).filter(col("total_quantity")>=5).show()

# Get all customers who used more than 2 different payment methods.
diff_pay=df1.join(df3,on="customer_id").groupBy("customer_id","name").agg(countDistinct("payment_method").alias("distinct_methods")).filter(col("distinct_methods")>2).show()


# 🟢 Concat / String Functions
# Concatenate customer_id and name into a new column called customer_tag (e.g., "CUST001 - Raj Shekhar Singh").
con1=df1.select(concat_ws(" - ","customer_id","name").alias("details")).show()

# Create a customer_email_id by concatenating lower(c.name) + "_" + city + "@shop.com".
con2=df1.select(concat_ws("",lower(col("name")),lit("_"),col("city"),lit("@shop.com")).alias("details")).show()

# Extract only the domain from customers.email (e.g., gmail.com, yahoo.com).
con3=df1.select(concat_ws("",split("email","@")[1])).show()

# Mask customer phone so only last 4 digits are visible (e.g., ******3210).
con4=df1.select(concat(lit("******"),substring("phone",-4,4)).alias("masked_phone")).show()

# 🟢 Regex / String
# Find all customers whose emails end with "@gmail.com".
reg1=df1.filter(col("email").endswith("@gmail.com")).select("name","email").show()

# Find all customers whose names contain only two words (using regex split).
reg2=df1.filter(size(split("name"," "))==2).select(col("name")).show()

# 🟢 Date / Time
# Get all orders placed in the last 30 days.
date1=df3.filter(col("order_date")>=date_sub(current_date(),30)).select(col("order_id"),col("order_date")).show()

# Find the average delivery time (days) for each courier (delivery_date – order_date).
date2=df5.join(df3,on="order_id").filter(col("delivery_date").isNotNull()).groupBy(col("courier")).agg(avg(datediff(col("delivery_date"),col("order_date"))).alias("avg_delivery_days")).show()

# Extract the month name from orders.order_date and group orders by month.
date3=df3.withColumn("month_name",date_format(col("order_date"),"MMMM")).groupBy(col("month_name")).agg(count("*").alias("total_orders")).show()

# 🟢 Mixed
# Create a new column order_summary → concatenate order_id, customer name, and status (e.g., "ORD1001 - Raj Shekhar Singh - Completed").
mixed=df1.join(df3,on="customer_id").withColumn("order_summary",concat_ws("",col("order_id"),lit("-"),col("name"),lit("-"),col("status"))).select(col("order_summary")).show()

+-----------+-----------------+-------------+
|customer_id|             name|total_revenue|
+-----------+-----------------+-------------+
|    CUST003|      Rahul Verma|        62000|
|    CUST004|       Priya Nair|        32400|
|    CUST001|Raj Shekhar Singh|        77400|
|    CUST005|     Sneha Kapoor|         8500|
|    CUST002|    Ananya Sharma|        48800|
+-----------+-----------------+-------------+

+----------+---------------+--------------+
|product_id|           name|total_quantity|
+----------+---------------+--------------+
|      P301|   Water Bottle|             4|
|      P401| Detergent Pack|             2|
|      P101|          Mouse|             2|
|      P500|          Shoes|             2|
|      P100|         Laptop|             1|
|      P400|Washing Machine|             1|
|      P202|     Headphones|             1|
|      P300|   Refrigerator|             1|
|      P201|     Phone Case|             1|
|      P200|     Smartphone|             1|
+----------+-