In [None]:
pip install pyspark



# PySpark Demo Notebook

This notebook provides practical examples covering PySpark fundamentals, including RDDs, DataFrames, SparkSession setup, partitioning, caching, joins, broadcast variables, accumulators, and dynamic resource allocation.

## Setup SparkSession

Initialize SparkSession which serves as the entry point for DataFrame and SQL operations in Spark 2.x+.

In [None]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("PySpark Demo")
    .master("local[*]")
    .config("spark.dynamicAllocation.enabled", "true")
    .config("spark.dynamicAllocation.initialExecutors", "2")
    .config("spark.dynamicAllocation.minExecutors", "1")
    .config("spark.dynamicAllocation.maxExecutors", "4")
    .config("spark.dynamicAllocation.executorIdleTimeout", "60s")
    .getOrCreate()
)

sc = spark.sparkContext

print(f"AppName: {sc.appName}")
print(f"Master: {sc.master}")
print(f"Dynamic Allocation Enabled: {spark.conf.get('spark.dynamicAllocation.enabled')}")

AppName: PySpark Demo
Master: local[*]
Dynamic Allocation Enabled: true


---
## RDD: Resilient Distributed Datasets
RDD is Spark's low-level immutable distributed collection. It supports fine-grained control over transformations and actions.

In [None]:
data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
rdd = sc.parallelize(data, numSlices=3)
print("Number of partitions:", rdd.getNumPartitions())
print("Collect:", rdd.collect())

Number of partitions: 3
Collect: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]


### Transformations & Actions

- Transformations are lazy (e.g., `map`, `filter`).
- Actions trigger execution (e.g., `collect`, `count`).

In [None]:
text_data = sc.parallelize([
    "Apache Spark is fast",
    "PySpark brings Python to Spark",
    "Spark runs in memory",
    "Spark supports RDD and DataFrame APIs"
])

words = text_data.flatMap(lambda line: line.split(" "))
word_pairs = words.map(lambda w: (w.lower(), 1))
counts = word_pairs.reduceByKey(lambda a, b: a + b)

print(counts.collect())

[('apache', 1), ('fast', 1), ('brings', 1), ('python', 1), ('to', 1), ('runs', 1), ('memory', 1), ('supports', 1), ('rdd', 1), ('and', 1), ('dataframe', 1), ('spark', 4), ('is', 1), ('pyspark', 1), ('in', 1), ('apis', 1)]


---
## DataFrames & Spark SQL
High-level API for structured data. It provides schema-based transformations and SQL querying capabilities.

In [None]:
from pyspark.sql import SparkSession

json_data = [
    {"id": 1, "name": "Alice", "age": 30},
    {"id": 2, "name": "Bob", "age": 25},
    {"id": 3, "name": "Charlie", "age": 35}
]

df = spark.createDataFrame(json_data)

df.show()
df.printSchema()

+---+---+-------+
|age| id|   name|
+---+---+-------+
| 30|  1|  Alice|
| 25|  2|    Bob|
| 35|  3|Charlie|
+---+---+-------+

root
 |-- age: long (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)



In [None]:
df.select("name", "age").filter(df.age > 28).show()

df.groupBy("age").count().show()

df.createOrReplaceTempView("people")
spark.sql("SELECT name FROM people WHERE age BETWEEN 26 AND 34").show()

+-------+---+
|   name|age|
+-------+---+
|  Alice| 30|
|Charlie| 35|
+-------+---+

+---+-----+
|age|count|
+---+-----+
| 30|    1|
| 25|    1|
| 35|    1|
+---+-----+

+-----+
| name|
+-----+
|Alice|
+-----+



---
## Partitioning
Spark divides data into partitions. Control partitions with `repartition` and `coalesce`.

In [None]:
rdd_part = sc.parallelize(range(20), 4)
print("Original partitions:", rdd_part.getNumPartitions())

rdd_repart = rdd_part.repartition(6)
print("Repartitioned (6):", rdd_repart.getNumPartitions())

rdd_coalesce = rdd_part.coalesce(2)
print("Coalesced (2):", rdd_coalesce.getNumPartitions())

Original partitions: 4
Repartitioned (6): 6
Coalesced (2): 2


---
## Caching & Persistence
Persist datasets in memory for faster reuse.

In [None]:
rdd_cached = rdd_part.cache()

print(rdd_cached.count())

print(rdd_cached.count())    # Returns count quickly from cached data

rdd_cached.unpersist()

20
20


PythonRDD[97] at RDD at PythonRDD.scala:53

---
## Broadcast Variables & Accumulators
Share read-only data efficiently and aggregate values across tasks.

In [None]:
broadcast_list = sc.broadcast([2, 4, 6])
print(broadcast_list.value)

acc = sc.accumulator(0)

def add_if_even(x):
    if x % 2 == 0:
        acc.add(1)
    return x

rdd_nums = sc.parallelize(range(10))

rdd_nums.foreach(add_if_even)

print("Even count via accumulator:", acc.value)

[2, 4, 6]
Even count via accumulator: 5


---
## Join Strategies
Demonstrate broadcast hash join vs sort-merge join using DataFrames.

In [None]:
from pyspark.sql.functions import broadcast

df_large = spark.range(1, 1001).withColumnRenamed("id", "key")

df_small = spark.createDataFrame([(i, f"val_{i}") for i in range(1, 21)], ["key", "value"])

broadcast_join = df_large.join(broadcast(df_small), on="key")
broadcast_join.explain(True)

merge_join = df_large.join(df_small, on="key")
merge_join.explain(True)

== Parsed Logical Plan ==
'Join UsingJoin(Inner, [key])
:- Project [id#230L AS key#232L]
:  +- Range (1, 1001, step=1, splits=Some(2))
+- ResolvedHint (strategy=broadcast)
   +- LogicalRDD [key#234L, value#235], false

== Analyzed Logical Plan ==
key: bigint, value: string
Project [key#232L, value#235]
+- Join Inner, (key#232L = key#234L)
   :- Project [id#230L AS key#232L]
   :  +- Range (1, 1001, step=1, splits=Some(2))
   +- ResolvedHint (strategy=broadcast)
      +- LogicalRDD [key#234L, value#235], false

== Optimized Logical Plan ==
Project [key#232L, value#235]
+- Join Inner, (key#232L = key#234L), rightHint=(strategy=broadcast)
   :- Project [id#230L AS key#232L]
   :  +- Range (1, 1001, step=1, splits=Some(2))
   +- Filter isnotnull(key#234L)
      +- LogicalRDD [key#234L, value#235], false

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [key#232L, value#235]
   +- BroadcastHashJoin [key#232L], [key#234L], Inner, BuildRight, false
      :- Project [id#230L 

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, concat_ws, floor, rand, when, explode, sequence, lit

credits_data = [
    (101, "Tom Hardy", "Actor"),
    (101, "Tom Hardy", "Actor"),
    (101, "Tom Hardy", "Actor"),
    (101, "Tom Hardy", "Actor"),
    (101, "Tom Hardy", "Actor"),
    (101, "Tom Hardy", "Actor"),
    (101, "Tom Hardy", "Actor"),
    (101, "Tom Hardy", "Actor"),
    (101, "Tom Hardy", "Actor"),
    (101, "Tom Hardy", "Actor"),
    (101, "Tom Hardy", "Actor"),
    (102, "Emma Stone", "Actress"),
    (103, "Chris Evans", "Actor"),
    (104, "Scarlett Johansson", "Actress"),
    (105, "Mark Ruffalo", "Actor"),
]
credits_df = spark.createDataFrame(credits_data, ["id", "name", "role"])

titles_data = [
    (101, "Inception", "movie", 2010),
    (102, "La La Land", "movie", 2016),
    (103, "Captain America", "movie", 2011),
    (104, "Lucy", "movie", 2014),
    (105, "Hulk", "movie", 2003)
]
titles_df = spark.createDataFrame(titles_data, ["id", "title", "type", "release_year"])

skewed_keys_df = credits_df.groupBy("id").count().filter("count > 5")
skewed_ids = [row["id"] for row in skewed_keys_df.collect()]

credits_salted = credits_df.withColumn("original_id", col("id")).withColumn(
    "id_salted",
    when(
        col("id").isin(skewed_ids),
        concat_ws("_", col("id"), floor(rand(seed=68) * 10).cast("int"))
    ).otherwise(col("id"))
).alias("credits")

titles_skewed = titles_df.filter(col("id").isin(skewed_ids))
titles_not_skewed = titles_df.filter(~col("id").isin(skewed_ids))

titles_salted = titles_skewed.withColumn(
    "salt", explode(sequence(lit(0), lit(9)))
).withColumn(
    "id_salted", concat_ws("_", col("id"), col("salt"))
).drop("salt")

titles_not_skewed = titles_not_skewed.withColumn("id_salted", col("id"))

titles_salted_final = titles_salted.unionByName(titles_not_skewed).alias("titles")

joined_df = credits_salted.join(
    titles_salted_final,
    on="id_salted",
    how="inner"
).select(
    col("credits.original_id").alias("credit_id"),
    col("credits.name"),
    col("credits.role"),
    col("titles.title"),
    col("titles.type"),
    col("titles.release_year")
)

joined_df.show(20, truncate=False)

+---------+------------------+-------+---------------+-----+------------+
|credit_id|name              |role   |title          |type |release_year|
+---------+------------------+-------+---------------+-----+------------+
|101      |Tom Hardy         |Actor  |Inception      |movie|2010        |
|101      |Tom Hardy         |Actor  |Inception      |movie|2010        |
|101      |Tom Hardy         |Actor  |Inception      |movie|2010        |
|101      |Tom Hardy         |Actor  |Inception      |movie|2010        |
|101      |Tom Hardy         |Actor  |Inception      |movie|2010        |
|101      |Tom Hardy         |Actor  |Inception      |movie|2010        |
|101      |Tom Hardy         |Actor  |Inception      |movie|2010        |
|101      |Tom Hardy         |Actor  |Inception      |movie|2010        |
|101      |Tom Hardy         |Actor  |Inception      |movie|2010        |
|101      |Tom Hardy         |Actor  |Inception      |movie|2010        |
|101      |Tom Hardy         |Actor  |