In [None]:
spark.conf.set("spark.sql.adaptive.enabled", "False")

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [None]:
spark = SparkSession.builder.appName("Spark DataFrames").getOrCreate()
spark

In [None]:
data = [
    ("Alice", 10,"New York"),
    ("Bob", 20,"India"),
    ("Charlie", 30,"London"),
    ("David", 40,"New York"),
    ("Eve", 50,"India"),
    ("Frank", 60,"London"),
    ("Grace", 70,"New York"),
    ("Hannah", 80,"India"),
    ("Ivan", 90,"London"),
    ("Jack", 100,"New York")
]

schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("city", StringType(), True)
])
df = spark.createDataFrame(data, schema)

**Narrow Transformation**

In [None]:
df = df.filter(col('city')== "London")

In [None]:
display(df)

DataFrame[name: string, age: int, city: string]

In [None]:
df.explain(True)

== Parsed Logical Plan ==
'Filter ('city = London)
+- LogicalRDD [name#0, age#1, city#2], false

== Analyzed Logical Plan ==
name: string, age: int, city: string
Filter (city#2 = London)
+- LogicalRDD [name#0, age#1, city#2], false

== Optimized Logical Plan ==
Filter (isnotnull(city#2) AND (city#2 = London))
+- LogicalRDD [name#0, age#1, city#2], false

== Physical Plan ==
*(1) Filter (isnotnull(city#2) AND (city#2 = London))
+- *(1) Scan ExistingRDD[name#0,age#1,city#2]



In [None]:
df.show()

+-------+---+------+
|   name|age|  city|
+-------+---+------+
|Charlie| 30|London|
|  Frank| 60|London|
|   Ivan| 90|London|
+-------+---+------+




**Wide Transformation**
---



In [None]:
df = df.groupBy('city').agg(max(col('age')))

In [None]:
df.show()

+--------+--------+
|    city|max(age)|
+--------+--------+
|  London|      90|
|   India|      80|
|New York|     100|
+--------+--------+



In [None]:
df.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[city#40], functions=[max(age#39)])
   +- Exchange hashpartitioning(city#40, 200), ENSURE_REQUIREMENTS, [plan_id=129]
      +- HashAggregate(keys=[city#40], functions=[partial_max(age#39)])
         +- Project [age#39, city#40]
            +- Scan ExistingRDD[name#38,age#39,city#40]




### **Repartition VS Coalesce**

In [None]:
df.rdd.getNumPartitions()

1

In [None]:
#Repartition

df = df.repartition(10)

In [None]:
df.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   ShuffleQueryStage 1
   +- Exchange RoundRobinPartitioning(10), REPARTITION_BY_NUM, [plan_id=204]
      +- *(2) HashAggregate(keys=[city#40], functions=[max(age#39)])
         +- AQEShuffleRead coalesced
            +- ShuffleQueryStage 0
               +- Exchange hashpartitioning(city#40, 200), ENSURE_REQUIREMENTS, [plan_id=184]
                  +- *(1) HashAggregate(keys=[city#40], functions=[partial_max(age#39)])
                     +- *(1) Project [age#39, city#40]
                        +- *(1) Scan ExistingRDD[name#38,age#39,city#40]
+- == Initial Plan ==
   Exchange RoundRobinPartitioning(10), REPARTITION_BY_NUM, [plan_id=172]
   +- HashAggregate(keys=[city#40], functions=[max(age#39)])
      +- Exchange hashpartitioning(city#40, 200), ENSURE_REQUIREMENTS, [plan_id=170]
         +- HashAggregate(keys=[city#40], functions=[partial_max(age#39)])
            +- Project [age#39, city#40]
               

In [None]:
#Coalesce

df = df.coalesce(1)

In [None]:
df.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   Coalesce 1
   +- ShuffleQueryStage 1
      +- Exchange RoundRobinPartitioning(10), REPARTITION_BY_NUM, [plan_id=358]
         +- *(2) HashAggregate(keys=[city#40], functions=[max(age#39)])
            +- AQEShuffleRead coalesced
               +- ShuffleQueryStage 0
                  +- Exchange hashpartitioning(city#40, 200), ENSURE_REQUIREMENTS, [plan_id=334]
                     +- *(1) HashAggregate(keys=[city#40], functions=[partial_max(age#39)])
                        +- *(1) Project [age#39, city#40]
                           +- *(1) Scan ExistingRDD[name#38,age#39,city#40]
+- == Initial Plan ==
   Coalesce 1
   +- Exchange RoundRobinPartitioning(10), REPARTITION_BY_NUM, [plan_id=321]
      +- HashAggregate(keys=[city#40], functions=[max(age#39)])
         +- Exchange hashpartitioning(city#40, 200), ENSURE_REQUIREMENTS, [plan_id=319]
            +- HashAggregate(keys=[city#40], functions=[partial_max

# **Data Reading**

In [None]:
df = spark.read.format("csv")\
      .option("header", "true")\
      .option("inferSchema", "true")\
      .load("/content/MegaMart.csv")

In [None]:
spark

In [None]:
df.show()

+--------+-------+----------+----------+----------------+--------------------+--------+--------------+--------------+------------+
|order_id|user_id|order_date|product_id|product_category|        product_name|quantity|price_per_unit|payment_method|order_status|
+--------+-------+----------+----------+----------------+--------------------+--------+--------------+--------------+------------+
|    1001|   U188|2025-04-20|      P940|         Fashion|            Sneakers|       2|         58.53|        PayPal|   Cancelled|
|    1002|   U062|2025-04-16|      P794|         Fashion|             T-Shirt|       3|         83.76|           UPI|    Returned|
|    1003|   U058|2025-04-18|      P326|         Fashion|          Sunglasses|       2|         78.85|        PayPal|  Processing|
|    1004|   U011|2025-04-10|      P574|         Fashion|          Sunglasses|       5|         46.49|        PayPal|   Delivered|
|    1005|   U003|2025-04-19|      P988|      Home Decor|         Photo Frame|     

In [None]:
df= df.filter(col('product_name')== "Sneakers")

In [None]:
df = df.select('order_id','product_name')

In [None]:
df= df.groupBy('product_name').agg(count(col('order_id')))

In [None]:
df.show()

+------------+---------------+
|product_name|count(order_id)|
+------------+---------------+
|    Sneakers|             41|
+------------+---------------+

