<a href="https://colab.research.google.com/github/mayureshpawashe/spark/blob/main/Sparkbasic_day4.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

##Spark Day 4

#Tungsten Code

In [None]:
!pip install pyspark

In [6]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("PySparkAPI").getOrCreate()
data = [("Mayuresh", 25), ("Onkar", 30), ("Rohit", 35)]
columns = ["Name", "Age"]
df = spark.createDataFrame(data, columns)
df.show()

+--------+---+
|    Name|Age|
+--------+---+
|Mayuresh| 25|
|   Onkar| 30|
|   Rohit| 35|
+--------+---+



In [7]:
# Creating a DataFrame with a large dataset
big_data = [(i, f"User{i}", i * 2) for i in range(1, 100000)]
columns = ["ID", "Name", "Value"]
df = spark.createDataFrame(big_data, columns)
df.groupBy("Name").sum("Value").show()

+--------+----------+
|    Name|sum(Value)|
+--------+----------+
| User285|       570|
| User509|      1018|
| User958|      1916|
|User1212|      2424|
|User1292|      2584|
|User1346|      2692|
|User1690|      3380|
|User2093|      4186|
|User2757|      5514|
|User2782|      5564|
|User2977|      5954|
|User3131|      6262|
|User3176|      6352|
|User3403|      6806|
|User3819|      7638|
|User3839|      7678|
|User3991|      7982|
|User4271|      8542|
|User4494|      8988|
|User4535|      9070|
+--------+----------+
only showing top 20 rows



##After Tungsten (With Optimization)

In [8]:
df.groupBy("Name").sum("Value").explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[Name#27], functions=[sum(Value#28L)])
   +- Exchange hashpartitioning(Name#27, 200), ENSURE_REQUIREMENTS, [plan_id=85]
      +- HashAggregate(keys=[Name#27], functions=[partial_sum(Value#28L)])
         +- Project [Name#27, Value#28L]
            +- Scan ExistingRDD[ID#26L,Name#27,Value#28L]




####Tungsten optimizes Spark jobs by reducing execution overhead and improving memory usage

##Catalyst Optimizer Code

In [10]:
from pyspark.sql.functions import col
data = [("Alice", 25, "Engineer"), ("Bob", 30, "Doctor"), ("Charlie", 35, "Teacher")]
columns = ["Name", "Age", "Profession"]
df = spark.createDataFrame(data, columns)
filtered_df = df.filter(col("Age") > 28)
# Show optimized execution plan
filtered_df.explain()
###Catalyst Optimizer ensures filtering happens before scanning, making it faster

== Physical Plan ==
*(1) Filter (isnotnull(Age#67L) AND (Age#67L > 28))
+- *(1) Scan ExistingRDD[Name#66,Age#67L,Profession#68]




##Setup Spark Session

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, count, avg
import time
spark = SparkSession.builder.appName("Tungsten_Catalyst_Example").getOrCreate()
data = [(i, f"User{i % 1000}", i * 1.5, (i % 5) + 1) for i in range(1, 1000000)]
columns = ["ID", "Username", "Amount", "Category"]
df = spark.createDataFrame(data, columns)
df.show(15)


+---+--------+------+--------+
| ID|Username|Amount|Category|
+---+--------+------+--------+
|  1|   User1|   1.5|       2|
|  2|   User2|   3.0|       3|
|  3|   User3|   4.5|       4|
|  4|   User4|   6.0|       5|
|  5|   User5|   7.5|       1|
|  6|   User6|   9.0|       2|
|  7|   User7|  10.5|       3|
|  8|   User8|  12.0|       4|
|  9|   User9|  13.5|       5|
| 10|  User10|  15.0|       1|
| 11|  User11|  16.5|       2|
| 12|  User12|  18.0|       3|
| 13|  User13|  19.5|       4|
| 14|  User14|  21.0|       5|
| 15|  User15|  22.5|       1|
+---+--------+------+--------+
only showing top 15 rows



##Catalyst Optimizer in Action

In [3]:
start_time = time.time()

optimized_df = (
    df.filter(col("Amount") > 1000)
      .groupBy("Category")
      .agg(
          sum("Amount").alias("Total_Amount"),
          avg("Amount").alias("Average_Amount"),
          count("ID").alias("Transaction_Count")
      )
      .orderBy(col("Total_Amount").desc())
)
optimized_df.explain(mode="formatted")
optimized_df.show()
end_time = time.time()

print(f"Execution Time: {round(end_time - start_time, 2)} seconds")


== Physical Plan ==
AdaptiveSparkPlan (9)
+- Sort (8)
   +- Exchange (7)
      +- HashAggregate (6)
         +- Exchange (5)
            +- HashAggregate (4)
               +- Project (3)
                  +- Filter (2)
                     +- Scan ExistingRDD (1)


(1) Scan ExistingRDD
Output [4]: [ID#25L, Username#26, Amount#27, Category#28L]
Arguments: [ID#25L, Username#26, Amount#27, Category#28L], MapPartitionsRDD[11] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:0, ExistingRDD, UnknownPartitioning(0)

(2) Filter
Input [4]: [ID#25L, Username#26, Amount#27, Category#28L]
Condition : (isnotnull(Amount#27) AND (Amount#27 > 1000.0))

(3) Project
Output [3]: [ID#25L, Amount#27, Category#28L]
Input [4]: [ID#25L, Username#26, Amount#27, Category#28L]

(4) HashAggregate
Input [3]: [ID#25L, Amount#27, Category#28L]
Keys [1]: [Category#28L]
Functions [3]: [partial_sum(Amount#27), partial_avg(Amount#27), partial_count(ID#25L)]
Aggregate Attributes [4]: [sum#64, sum#65, count#66L

##Project Tungsten in Action

In [4]:
optimized_df.explain(mode="cost")

== Optimized Logical Plan ==
Sort [Total_Amount#55 DESC NULLS LAST], true, Statistics(sizeInBytes=6.2 EiB)
+- Aggregate [Category#28L], [Category#28L, sum(Amount#27) AS Total_Amount#55, avg(Amount#27) AS Average_Amount#57, count(ID#25L) AS Transaction_Count#59L], Statistics(sizeInBytes=6.2 EiB)
   +- Project [ID#25L, Amount#27, Category#28L], Statistics(sizeInBytes=4.9 EiB)
      +- Filter (isnotnull(Amount#27) AND (Amount#27 > 1000.0)), Statistics(sizeInBytes=8.0 EiB)
         +- LogicalRDD [ID#25L, Username#26, Amount#27, Category#28L], false, Statistics(sizeInBytes=8.0 EiB)

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [Total_Amount#55 DESC NULLS LAST], true, 0
   +- Exchange rangepartitioning(Total_Amount#55 DESC NULLS LAST, 200), ENSURE_REQUIREMENTS, [plan_id=50]
      +- HashAggregate(keys=[Category#28L], functions=[sum(Amount#27), avg(Amount#27), count(ID#25L)], output=[Category#28L, Total_Amount#55, Average_Amount#57, Transaction_Count#59L])
         +- Excha