<a href="https://colab.research.google.com/github/rani-sikdar/PySpark/blob/main/pyspark_intermediate.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from pyspark.sql import SparkSession

sess = SparkSession.builder.appName("demo1").getOrCreate()
sess

In [None]:
d = [
    (1, 'aaa', 25),
    (2, 'bbb', 10),
    (3, 'ccc', 15),
    (4, 'ddd', 20)
]

col = ['id', 'name', 'marks']

In [None]:
df = sess.createDataFrame(d, col)
df.show()

+---+----+-----+
| id|name|marks|
+---+----+-----+
|  1| aaa|   25|
|  2| bbb|   10|
|  3| ccc|   15|
|  4| ddd|   20|
+---+----+-----+



In [None]:
df.sort('marks').show()

+---+----+-----+
| id|name|marks|
+---+----+-----+
|  2| bbb|   10|
|  3| ccc|   15|
|  4| ddd|   20|
|  1| aaa|   25|
+---+----+-----+



In [None]:
df.orderBy('marks').show()  # by default asc

+---+----+-----+
| id|name|marks|
+---+----+-----+
|  2| bbb|   10|
|  3| ccc|   15|
|  4| ddd|   20|
|  1| aaa|   25|
+---+----+-----+



In [None]:
from pyspark.sql.functions import col

df.orderBy(col('marks').desc()).show()

+---+----+-----+
| id|name|marks|
+---+----+-----+
|  1| aaa|   25|
|  4| ddd|   20|
|  3| ccc|   15|
|  2| bbb|   10|
+---+----+-----+



In [None]:
df.sort(df.marks.desc()).show()

+---+----+-----+
| id|name|marks|
+---+----+-----+
|  1| aaa|   25|
|  4| ddd|   20|
|  3| ccc|   15|
|  2| bbb|   10|
+---+----+-----+



In [None]:
df.orderBy(['marks','id']).show()

+---+----+-----+
| id|name|marks|
+---+----+-----+
|  2| bbb|   10|
|  3| ccc|   15|
|  4| ddd|   20|
|  1| aaa|   25|
+---+----+-----+



#### UDF `user defined functions`

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, IntegerType

In [None]:
def to_upper(name):
  return name.upper() if name else None


In [None]:
to_upper_udf = udf(to_upper, StringType())

In [None]:
d = [('aaa',), ('bbb',), ('ccc',), ('ddd',)]

df = sess.createDataFrame(d, ['name'])
df.show()

+----+
|name|
+----+
| aaa|
| bbb|
| ccc|
| ddd|
+----+



In [None]:
df.withColumn("upper_name",to_upper_udf(df.name)).show()

+----+----------+
|name|upper_name|
+----+----------+
| aaa|       AAA|
| bbb|       BBB|
| ccc|       CCC|
| ddd|       DDD|
+----+----------+



### Using UDF in Spark SQL


In [None]:
sess.udf.register("to_upper_udf", to_upper, StringType())

In [None]:
df.createOrReplaceTempView("temp")

In [None]:
sess.sql("select * from temp").show()

+----+
|name|
+----+
| aaa|
| bbb|
| ccc|
| ddd|
+----+



In [None]:
sess.sql("select name, to_upper_udf(name) as upper_name from temp").show()

+----+----------+
|name|upper_name|
+----+----------+
| aaa|       AAA|
| bbb|       BBB|
| ccc|       CCC|
| ddd|       DDD|
+----+----------+



### pandas udf

In [None]:
import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import StringType


In [None]:
# pandas UDF (vectorized)

@pandas_udf(StringType())
def to_upper_pandas(name):
    return name.str.upper()

In [None]:
df.withColumn("upper_name", to_upper_pandas("name")).show()

+----+----------+
|name|upper_name|
+----+----------+
| aaa|       AAA|
| bbb|       BBB|
| ccc|       CCC|
| ddd|       DDD|
+----+----------+



In [None]:
df.rdd.getNumPartitions()

2

In [None]:
# increase partitions

df =df.repartition(10)

In [None]:
df.rdd.getNumPartitions()

10

In [None]:
df3 = df.coalesce(2)

In [None]:
df3.rdd.getNumPartitions()

2

In [None]:
# Partition by column when writing:

df.write.partitionBy("column name").parquet("output/path/")

In [None]:
df.rdd.glom().map(len).collect()

[1, 0, 0, 0, 1, 1, 0, 0, 0, 1]

In [None]:
df2 = df.repartition(20)
df2.rdd.glom().map(len).collect()

[0, 0, 0, 0, 1, 1, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0]

In [None]:
import time

df_large = sess.range(10000000)  # 10M rows

# Default partitions
start = time.time()
df_large.groupBy("id").count().collect()
print("Default partition time:", time.time() - start)

# Repartition to 100 partitions
df_large2 = df_large.repartition(100)
start = time.time()
df_large2.groupBy("id").count().collect()
print("100 partitions time:", time.time() - start)


Default partition time: 118.18139243125916
100 partitions time: 146.58254146575928


### transformations vs actions

In [None]:
from pyspark.sql import SparkSession

ss = SparkSession.builder.appName('demo2').getOrCreate()
ss

In [None]:
d = [('tom',30), ('harry',25), ('charlie',27), ('david',33)]

df = sess.createDataFrame(d, ['name','age'])
df.show()

+-------+---+
|   name|age|
+-------+---+
|    tom| 30|
|  harry| 25|
|charlie| 27|
|  david| 33|
+-------+---+



In [None]:
# transformation
filtered_df = df.filter(df.age>=25)  # Narrrow
mapped_df = df.select(df.name, df.age * 2)  # Narrow

grouped_df = df.groupBy(df.age).count()  # Wide

In [None]:
# Actions
print("filtered_df :",filtered_df.collect())
print("count :",df.count())
df.show()
df.write.csv("output3.csv")


filtered_df : [Row(name='tom', age=30), Row(name='harry', age=25), Row(name='charlie', age=27), Row(name='david', age=33)]
count : 4
+-------+---+
|   name|age|
+-------+---+
|    tom| 30|
|  harry| 25|
|charlie| 27|
|  david| 33|
+-------+---+



In [None]:
data = [("Alice", "Math", 85),
        ("Bob", "Math", 90),
        ("Alice", "Physics", 95),
        ("Bob", "Physics", 80)]

df = ss.createDataFrame(data, ["name", "subject", "score"])

# Wide transformation: groupBy
df_grouped = df.groupBy("name").avg("score")
df_grouped.show()


+-----+----------+
| name|avg(score)|
+-----+----------+
|  Bob|      85.0|
|Alice|      90.0|
+-----+----------+



### actions

In [None]:
# collect
rows = df.collect()
for row in rows:
    print(row)


Row(name='Alice', subject='Math', score=85)
Row(name='Bob', subject='Math', score=90)
Row(name='Alice', subject='Physics', score=95)
Row(name='Bob', subject='Physics', score=80)


In [None]:
print(df.count())

4


In [None]:
df.show()

+-----+-------+-----+
| name|subject|score|
+-----+-------+-----+
|Alice|   Math|   85|
|  Bob|   Math|   90|
|Alice|Physics|   95|
|  Bob|Physics|   80|
+-----+-------+-----+



In [None]:
df.take(3)

[Row(name='Alice', subject='Math', score=85),
 Row(name='Bob', subject='Math', score=90),
 Row(name='Alice', subject='Physics', score=95)]

In [None]:
df.first()

Row(name='Alice', subject='Math', score=85)

In [None]:
from pyspark.sql import SparkSession
from pyspark.storagelevel import StorageLevel

ses = SparkSession.builder.appName('CacheVsPersist').getOrCreate()
ses

In [None]:
ses

In [None]:
df = ses.read.format('csv').option("header","true").option("inferSchema","true").load("/content/employee.csv")
df.show(5)

+---+---+------+------+-------------------+-----------------+-----------------+
| No|Age|Gender|Salary|Monthly Expenditure|       Occupation|Healthy Lifestyle|
+---+---+------+------+-------------------+-----------------+-----------------+
|  1| 34|     0| 35760|              34908|          Teacher|        Moderate |
|  2| 21|     0| 23500|              20950|          Analyst|             Good|
|  3| 59|     1| 21000|              12080| Graphic Designer|              Bad|
|  4| 45|     1| 45000|              34090|Software Engineer|             Good|
|  5| 37|     0| 67050|              45780|          Manager|         Moderate|
+---+---+------+------+-------------------+-----------------+-----------------+
only showing top 5 rows



In [None]:
# without caching

df.count()   # First action

df.select("Occupation").distinct().show()  # Recomputes again


+-----------------+
|       Occupation|
+-----------------+
|          Teacher|
| Graphic Designer|
|          Analyst|
|          Manager|
|Software Engineer|
+-----------------+



In [None]:
# with cache()
df.cache()

DataFrame[No: int, Age: int, Gender: int, Salary: int, Monthly Expenditure: int, Occupation: string, Healthy Lifestyle: string]

In [None]:
df.count()   # First action

df.select("Occupation").distinct().show()  # Recomputes again

+-----------------+
|       Occupation|
+-----------------+
|          Teacher|
| Graphic Designer|
|          Analyst|
|          Manager|
|Software Engineer|
+-----------------+



In [None]:
# With persist()

df.persist(StorageLevel.MEMORY_ONLY)
df.count()


15

In [None]:
from pyspark.sql import SparkSession
import time
spark = SparkSession.builder.appName("CachingBenchmark").getOrCreate()

# Simulate a large dataset
df = spark.range(0, 100_000_000)  # 100 million rows

In [None]:
# Transformation (simulate heavy computation)
df_transformed = df.withColumn("squared", (df["id"] ** 2))


In [None]:
# --- Without caching ---
start = time.time()
df_transformed.filter("squared > 500000000").count()
df_transformed.filter("squared < 1000").count()
print("Time without cache:", time.time() - start)

# --- With caching ---
df_transformed.cache()  # or df_transformed.persist()
df_transformed.count()  # trigger caching

start = time.time()
df_transformed.filter("squared > 500000000").count()
df_transformed.filter("squared < 1000").count()
print("Time with cache:", time.time() - start)


Time without cache: 11.28271770477295
Time with cache: 7.267090797424316


In [None]:
# unpersist

df_transformed.unpersist()

DataFrame[id: bigint, squared: double]

### different storage levels

In [None]:
df.persist(StorageLevel.MEMORY_ONLY)

DataFrame[id: bigint]

In [None]:
df.cache()  # Same as df.persist(StorageLevel.MEMORY_AND_DISK)

DataFrame[id: bigint]

In [None]:
df.persist(StorageLevel.DISK_ONLY)

DataFrame[id: bigint]

In [None]:
# df.persist(StorageLevel.MEMORY_ONLY_SER)

# df.persist(StorageLevel.MEMORY_AND_DISK_SER)

In [None]:
df.persist(StorageLevel.OFF_HEAP)

DataFrame[id: bigint]