<a href="https://colab.research.google.com/github/sirishaallarapu/PySpark/blob/main/Tungsten_and_catalyst_optimizer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("PySpark").getOrCreate()

data = [("Siri", 25), ("Glory", 30), ("Nani", 35)]
columns = ["Name", "Age"]
df = spark.createDataFrame(data, columns)

df.show()


+-----+---+
| Name|Age|
+-----+---+
| Siri| 25|
|Glory| 30|
| Nani| 35|
+-----+---+



In [None]:
from pyspark.sql.functions import col

large_df = spark.range(1, 1000000)

result_df = large_df.select((col("id") * 2).alias("double_id"))

result_df.show(5)


+---------+
|double_id|
+---------+
|        2|
|        4|
|        6|
|        8|
|       10|
+---------+
only showing top 5 rows



In [None]:
df.createOrReplaceTempView("people")

optimized_df = spark.sql("SELECT Name FROM people WHERE Age > 28")

optimized_df.show()


+-----+
| Name|
+-----+
|Glory|
| Nani|
+-----+



In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit

spark = SparkSession.builder.appName("PySparkPractice").getOrCreate()

data = [("John", "Engineering", 60000),
        ("Sarah", "Marketing", 75000),
        ("Michael", "Finance", 80000)]

columns = ["Name", "Department", "Salary"]

df = spark.createDataFrame(data, columns)

df.show()

df_filtered = df.filter(col("Salary") > 65000)
df_filtered.show()

df_with_bonus = df.withColumn("Bonus", col("Salary") * 0.1)
df_with_bonus.show()

df_with_new_col = df.withColumn("Location", lit("New York"))
df_with_new_col.show()



+-------+-----------+------+
|   Name| Department|Salary|
+-------+-----------+------+
|   John|Engineering| 60000|
|  Sarah|  Marketing| 75000|
|Michael|    Finance| 80000|
+-------+-----------+------+

+-------+----------+------+
|   Name|Department|Salary|
+-------+----------+------+
|  Sarah| Marketing| 75000|
|Michael|   Finance| 80000|
+-------+----------+------+

+-------+-----------+------+------+
|   Name| Department|Salary| Bonus|
+-------+-----------+------+------+
|   John|Engineering| 60000|6000.0|
|  Sarah|  Marketing| 75000|7500.0|
|Michael|    Finance| 80000|8000.0|
+-------+-----------+------+------+

+-------+-----------+------+--------+
|   Name| Department|Salary|Location|
+-------+-----------+------+--------+
|   John|Engineering| 60000|New York|
|  Sarah|  Marketing| 75000|New York|
|Michael|    Finance| 80000|New York|
+-------+-----------+------+--------+



In [None]:
df.createOrReplaceTempView("employees")

query_result = spark.sql("SELECT Name, Salary FROM employees WHERE Salary > 65000")

query_result.show()


+-------+------+
|   Name|Salary|
+-------+------+
|  Sarah| 75000|
|Michael| 80000|
+-------+------+



In [None]:
large_df = spark.range(1, 10000000)

optimized_result = large_df.select((col("id") * 3 + 10).alias("calculated_value"))

optimized_result.show(5)



+----------------+
|calculated_value|
+----------------+
|              13|
|              16|
|              19|
|              22|
|              25|
+----------------+
only showing top 5 rows



In [None]:
data = [("Finance", 80000),
        ("Marketing", 75000),
        ("Finance", 90000),
        ("Engineering", 70000),
        ("Marketing", 72000)]

columns = ["Department", "Salary"]

df = spark.createDataFrame(data, columns)

df_grouped = df.groupBy("Department").avg("Salary")
df_grouped.show()

df_count = df.groupBy("Department").count()
df_count.show()


+-----------+-----------+
| Department|avg(Salary)|
+-----------+-----------+
|    Finance|    85000.0|
|  Marketing|    73500.0|
|Engineering|    70000.0|
+-----------+-----------+

+-----------+-----+
| Department|count|
+-----------+-----+
|    Finance|    2|
|  Marketing|    2|
|Engineering|    1|
+-----------+-----+



In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder.appName("TungstenOptimization").getOrCreate()

data = [("Apple", "Fruit", 52),
        ("Banana", "Fruit", 89),
        ("Carrot", "Vegetable", 41),
        ("Tomato", "Vegetable", 18),
        ("Mango", "Fruit", 60),
        ("Broccoli", "Vegetable", 55)]

columns = ["Name", "Type", "Calories"]

df = spark.createDataFrame(data, columns)

optimized_df = df.withColumn("Calories_Doubled", col("Calories") * 2)

optimized_df.show()





+--------+---------+--------+----------------+
|    Name|     Type|Calories|Calories_Doubled|
+--------+---------+--------+----------------+
|   Apple|    Fruit|      52|             104|
|  Banana|    Fruit|      89|             178|
|  Carrot|Vegetable|      41|              82|
|  Tomato|Vegetable|      18|              36|
|   Mango|    Fruit|      60|             120|
|Broccoli|Vegetable|      55|             110|
+--------+---------+--------+----------------+



In [None]:
df.createOrReplaceTempView("foods")

optimized_query = spark.sql("SELECT Name, Calories FROM foods WHERE Calories > 50")

optimized_query.show()

optimized_query.explain(mode="formatted")


+--------+--------+
|    Name|Calories|
+--------+--------+
|   Apple|      52|
|  Banana|      89|
|   Mango|      60|
|Broccoli|      55|
+--------+--------+

== Physical Plan ==
* Project (3)
+- * Filter (2)
   +- * Scan ExistingRDD (1)


(1) Scan ExistingRDD [codegen id : 1]
Output [3]: [Name#229, Type#230, Calories#231L]
Arguments: [Name#229, Type#230, Calories#231L], MapPartitionsRDD[73] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:0, ExistingRDD, UnknownPartitioning(0)

(2) Filter [codegen id : 1]
Input [3]: [Name#229, Type#230, Calories#231L]
Condition : (isnotnull(Calories#231L) AND (Calories#231L > 50))

(3) Project [codegen id : 1]
Output [2]: [Name#229, Calories#231L]
Input [3]: [Name#229, Type#230, Calories#231L]




In [None]:
large_data = [("Apple", "Fruit", 52 + i) for i in range(1, 1000000)]

large_df = spark.createDataFrame(large_data, columns)

filtered_df = large_df.filter(col("Calories") > 1000)

filtered_df.explain(mode="formatted")


== Physical Plan ==
* Filter (2)
+- * Scan ExistingRDD (1)


(1) Scan ExistingRDD [codegen id : 1]
Output [3]: [Name#268, Type#269, Calories#270L]
Arguments: [Name#268, Type#269, Calories#270L], MapPartitionsRDD[82] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:0, ExistingRDD, UnknownPartitioning(0)

(2) Filter [codegen id : 1]
Input [3]: [Name#268, Type#269, Calories#270L]
Condition : (isnotnull(Calories#270L) AND (Calories#270L > 1000))




In [None]:
df_transformed = df.withColumn("Calories_Doubled", col("Calories") * 2)

df_transformed.explain(mode="formatted")
df_transformed.show()


== Physical Plan ==
* Project (2)
+- * Scan ExistingRDD (1)


(1) Scan ExistingRDD [codegen id : 1]
Output [3]: [Name#229, Type#230, Calories#231L]
Arguments: [Name#229, Type#230, Calories#231L], MapPartitionsRDD[73] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:0, ExistingRDD, UnknownPartitioning(0)

(2) Project [codegen id : 1]
Output [4]: [Name#229, Type#230, Calories#231L, (Calories#231L * 2) AS Calories_Doubled#274L]
Input [3]: [Name#229, Type#230, Calories#231L]


+--------+---------+--------+----------------+
|    Name|     Type|Calories|Calories_Doubled|
+--------+---------+--------+----------------+
|   Apple|    Fruit|      52|             104|
|  Banana|    Fruit|      89|             178|
|  Carrot|Vegetable|      41|              82|
|  Tomato|Vegetable|      18|              36|
|   Mango|    Fruit|      60|             120|
|Broccoli|Vegetable|      55|             110|
+--------+---------+--------+----------------+



In [None]:
df.createOrReplaceTempView("foods")

optimized_query = spark.sql("SELECT Name FROM foods WHERE Calories > 50")

optimized_query.explain(mode="formatted")
optimized_query.show()


== Physical Plan ==
* Project (3)
+- * Filter (2)
   +- * Scan ExistingRDD (1)


(1) Scan ExistingRDD [codegen id : 1]
Output [3]: [Name#229, Type#230, Calories#231L]
Arguments: [Name#229, Type#230, Calories#231L], MapPartitionsRDD[73] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:0, ExistingRDD, UnknownPartitioning(0)

(2) Filter [codegen id : 1]
Input [3]: [Name#229, Type#230, Calories#231L]
Condition : (isnotnull(Calories#231L) AND (Calories#231L > 50))

(3) Project [codegen id : 1]
Output [1]: [Name#229]
Input [3]: [Name#229, Type#230, Calories#231L]


+--------+
|    Name|
+--------+
|   Apple|
|  Banana|
|   Mango|
|Broccoli|
+--------+



In [None]:
from pyspark.sql.functions import broadcast

price_data = [("Apple", 1.2), ("Banana", 0.5), ("Carrot", 0.8), ("Tomato", 1.5), ("Mango", 2.0), ("Broccoli", 1.8)]
price_columns = ["Name", "Price_per_kg"]

df_price = spark.createDataFrame(price_data, price_columns)

joined_df = df.join(broadcast(df_price), "Name", "inner")

joined_df.explain(mode="formatted")
joined_df.show()


== Physical Plan ==
AdaptiveSparkPlan (8)
+- Project (7)
   +- BroadcastHashJoin Inner BuildRight (6)
      :- Filter (2)
      :  +- Scan ExistingRDD (1)
      +- BroadcastExchange (5)
         +- Filter (4)
            +- Scan ExistingRDD (3)


(1) Scan ExistingRDD
Output [3]: [Name#229, Type#230, Calories#231L]
Arguments: [Name#229, Type#230, Calories#231L], MapPartitionsRDD[73] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:0, ExistingRDD, UnknownPartitioning(0)

(2) Filter
Input [3]: [Name#229, Type#230, Calories#231L]
Condition : isnotnull(Name#229)

(3) Scan ExistingRDD
Output [2]: [Name#302, Price_per_kg#303]
Arguments: [Name#302, Price_per_kg#303], MapPartitionsRDD[91] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:0, ExistingRDD, UnknownPartitioning(0)

(4) Filter
Input [2]: [Name#302, Price_per_kg#303]
Condition : isnotnull(Name#302)

(5) BroadcastExchange
Input [2]: [Name#302, Price_per_kg#303]
Arguments: HashedRelationBroadcastMode(List(input[0, str