<a href="https://colab.research.google.com/github/sirishaallarapu/AdvancedPySpark-/blob/main/Day4.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import files
uploaded = files.upload()



Saving Restaurant customer data.csv to Restaurant customer data.csv


In [None]:
import shutil
shutil.move("Restaurant customer data.csv", "/content/Restaurant_customer_data.csv")

df = spark.read.csv("/content/Restaurant_customer_data.csv", header=True, inferSchema=True)
df.show(5)


+------+---------+-----------+------+--------------+----------------+--------+---------+--------------+-----------+----------+----------+-------------------+--------+------------+-----+------+------+------+
|userID| latitude|  longitude|smoker|   drink_level|dress_preference|ambience|transport|marital_status|      hijos|birth_year|  interest|        personality|religion|    activity|color|weight|budget|height|
+------+---------+-----------+------+--------------+----------------+--------+---------+--------------+-----------+----------+----------+-------------------+--------+------------+-----+------+------+------+
| U1001|22.139997|-100.978803| false|    abstemious|        informal|  family|  on foot|        single|independent|      1989|   variety|  thrifty-protector|    none|     student|black|    69|medium|  1.77|
| U1002|22.150087|-100.983325| false|    abstemious|        informal|  family|   public|        single|independent|      1990|technology|hunter-ostentatious|Catholic|     s

In [None]:
df.groupBy("ambience").count().show()


+--------+-----+
|ambience|count|
+--------+-----+
| friends|   46|
|  family|   70|
|       ?|    6|
|solitary|   16|
+--------+-----+



In [None]:
df.groupBy("religion").agg(avg("height").alias("Average Height"), avg("weight").alias("Average Weight")).show()


+---------+------------------+------------------+
| religion|    Average Height|    Average Weight|
+---------+------------------+------------------+
|   Mormon|               1.6|              68.0|
|Christian|1.6457142857142857|57.714285714285715|
| Catholic|1.6754545454545446| 65.28282828282828|
|     none|1.6410000000000002|63.233333333333334|
|   Jewish|               1.9|             120.0|
+---------+------------------+------------------+



In [None]:
from pyspark.sql import SparkSession
import time

# Initialize Spark Session
spark = SparkSession.builder.appName("OptimizationExample").getOrCreate()

# Read the dataset
df = spark.read.csv("/content/Restaurant_customer_data.csv", header=True, inferSchema=True)

# Choose a valid column for analysis
valid_column = "smoker"  # Change to any suitable column from your dataset

# Measure execution time without caching
start_time = time.time()
df.groupBy(valid_column).count().show()
end_time = time.time()
print("Execution Time (Without Cache):", end_time - start_time)

# Cache the DataFrame
df.cache()

# Measure execution time after caching
start_time = time.time()
df.groupBy(valid_column).count().show()
end_time = time.time()
print("Execution Time (With Cache):", end_time - start_time)



+------+-----+
|smoker|count|
+------+-----+
| false|  109|
|     ?|    3|
|  true|   26|
+------+-----+

Execution Time (Without Cache): 0.5151190757751465
+------+-----+
|smoker|count|
+------+-----+
| false|  109|
|     ?|    3|
|  true|   26|
+------+-----+

Execution Time (With Cache): 0.27075624465942383


In [None]:
from pyspark import StorageLevel

df.persist(StorageLevel.MEMORY_AND_DISK)

start_time = time.time()
df.groupBy(valid_column).count().show()
end_time = time.time()
print("Execution Time (With Persist):", end_time - start_time)

df.unpersist()



+------+-----+
|smoker|count|
+------+-----+
| false|  109|
|     ?|    3|
|  true|   26|
+------+-----+

Execution Time (With Persist): 0.3011033535003662


DataFrame[userID: string, latitude: double, longitude: double, smoker: string, drink_level: string, dress_preference: string, ambience: string, transport: string, marital_status: string, hijos: string, birth_year: int, interest: string, personality: string, religion: string, activity: string, color: string, weight: int, budget: string, height: double]

In [None]:

df_repartitioned = df.repartition(10)

print("Number of partitions after repartitioning:", df_repartitioned.rdd.getNumPartitions())


Number of partitions after repartitioning: 10


In [None]:

df_coalesced = df.repartition(10).coalesce(5)

print("Number of partitions after coalescing:", df_coalesced.rdd.getNumPartitions())


Number of partitions after coalescing: 5


In [None]:
df_selected = df.select("smoker", "color", "height")



In [None]:
df.write.parquet("/mnt/data/optimized.parquet")


In [None]:
df_selected = df.select("smoker", "ambience", "budget")
df_selected.show(5)


+------+--------+------+
|smoker|ambience|budget|
+------+--------+------+
| false|  family|medium|
| false|  family|   low|
| false|  family|   low|
| false|  family|medium|
| false|  family|medium|
+------+--------+------+
only showing top 5 rows



In [None]:
row_count = df.select("userID").distinct().count()
print("Number of unique users:", row_count)


Number of unique users: 138


In [None]:
spark.conf.set("spark.sql.adaptive.enabled", "true")



In [None]:
import time

start_time = time.time()
df_filtered = df.filter(df.smoker == "false").groupBy("ambience").count()
df_filtered.show()
end_time = time.time()
print("Execution Time (Without Cache):", end_time - start_time)

df.cache()
start_time = time.time()
df_filtered = df.filter(df.smoker == "false").groupBy("ambience").count()
df_filtered.show()
end_time = time.time()
print("Execution Time (With Cache):", end_time - start_time)


+--------+-----+
|ambience|count|
+--------+-----+
| friends|   34|
|  family|   59|
|       ?|    1|
|solitary|   15|
+--------+-----+

Execution Time (Without Cache): 0.47017598152160645
+--------+-----+
|ambience|count|
+--------+-----+
| friends|   34|
|  family|   59|
|       ?|    1|
|solitary|   15|
+--------+-----+

Execution Time (With Cache): 1.1911425590515137


In [None]:
print("Number of partitions:", df.rdd.getNumPartitions())


Number of partitions: 1


In [None]:
df_repartitioned = df.repartition(8)
print("Partitions after repartition:", df_repartitioned.rdd.getNumPartitions())


Partitions after repartition: 8


In [None]:
df_coalesced = df.repartition(8).coalesce(4)
print("Partitions after coalesce:", df_coalesced.rdd.getNumPartitions())


Partitions after coalesce: 4


In [None]:
df_unique = df.dropDuplicates(["smoker", "ambience"])
df_unique.show()


+------+---------+-----------+------+--------------+----------------+--------+---------+--------------+-----------+----------+----------+-----------------+--------+------------+------+------+------+------+
|userID| latitude|  longitude|smoker|   drink_level|dress_preference|ambience|transport|marital_status|      hijos|birth_year|  interest|      personality|religion|    activity| color|weight|budget|height|
+------+---------+-----------+------+--------------+----------------+--------+---------+--------------+-----------+----------+----------+-----------------+--------+------------+------+------+------+------+
| U1024|22.154021|-100.976028|     ?|    abstemious|               ?|       ?|        ?|             ?|          ?|      1930|      none|      hard-worker|    none|           ?|yellow|    40|     ?|   1.2|
| U1083| 22.13392|-101.028373| false|    abstemious|               ?|       ?|        ?|             ?|          ?|      1981|      none|      hard-worker|    none|           ?

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def check_budget(budget):
    return "High" if budget == "high" else "Low"

udf_check_budget = udf(check_budget, StringType())

df = df.withColumn("budget_category", udf_check_budget(df.budget))
df.show()


+------+---------+-----------+------+--------------+----------------+--------+---------+--------------+-----------+----------+------------+-------------------+---------+------------+------+------+------+------+---------------+
|userID| latitude|  longitude|smoker|   drink_level|dress_preference|ambience|transport|marital_status|      hijos|birth_year|    interest|        personality| religion|    activity| color|weight|budget|height|budget_category|
+------+---------+-----------+------+--------------+----------------+--------+---------+--------------+-----------+----------+------------+-------------------+---------+------------+------+------+------+------+---------------+
| U1001|22.139997|-100.978803| false|    abstemious|        informal|  family|  on foot|        single|independent|      1989|     variety|  thrifty-protector|     none|     student| black|    69|medium|  1.77|            Low|
| U1002|22.150087|-100.983325| false|    abstemious|        informal|  family|   public|    

In [None]:
from pyspark.sql.functions import when, col

df = df.withColumn(
    "budget_category",
    when(col("budget") == "high", "High").otherwise("Low")
)
df.show()


+------+---------+-----------+------+--------------+----------------+--------+---------+--------------+-----------+----------+------------+-------------------+---------+------------+------+------+------+------+---------------+
|userID| latitude|  longitude|smoker|   drink_level|dress_preference|ambience|transport|marital_status|      hijos|birth_year|    interest|        personality| religion|    activity| color|weight|budget|height|budget_category|
+------+---------+-----------+------+--------------+----------------+--------+---------+--------------+-----------+----------+------------+-------------------+---------+------------+------+------+------+------+---------------+
| U1001|22.139997|-100.978803| false|    abstemious|        informal|  family|  on foot|        single|independent|      1989|     variety|  thrifty-protector|     none|     student| black|    69|medium|  1.77|            Low|
| U1002|22.150087|-100.983325| false|    abstemious|        informal|  family|   public|    