In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("Advanced Spark RDD and DataFrame Examples") \
    .getOrCreate()

sc = spark.sparkContext

# Sample data
data = [("Alice", 21), ("Bob", 22), ("Charlie", 23), ("Alice", 21)]
rdd = sc.parallelize(data, 4)

# ============ RDD PART ============

# a) Repartition and Coalesce, Shuffle Partitions
print("\n=== a) Repartition & Coalesce ===")
print("Initial Partitions:", rdd.getNumPartitions())
rdd_repartitioned = rdd.repartition(6)
print("After Repartition:", rdd_repartitioned.getNumPartitions())

rdd_coalesced = rdd_repartitioned.coalesce(2)
print("After Coalesce:", rdd_coalesced.getNumPartitions())

# Shuffle Partitions (default is 200, can be changed)
print("Default Shuffle Partitions:", spark.conf.get("spark.sql.shuffle.partitions"))
spark.conf.set("spark.sql.shuffle.partitions", "10")
print("Updated Shuffle Partitions:", spark.conf.get("spark.sql.shuffle.partitions"))

# b) Broadcast Variables and Accumulator Variables
print("\n=== b) Broadcast & Accumulator ===")
broadcast_var = sc.broadcast({"Alice": "F", "Bob": "M", "Charlie": "M"})
acc = sc.accumulator(0)

def gender_mapper(name_age):
    global acc
    name, age = name_age
    if name in broadcast_var.value:
        acc += 1
    return (name, age, broadcast_var.value.get(name, "Unknown"))

rdd_with_gender = rdd.map(gender_mapper)
print("With Gender Info:", rdd_with_gender.collect())
print("Accumulator count:", acc.value)

# c) Convert RDD to DataFrame
print("\n=== c) RDD to DataFrame ===")
df = rdd.toDF(["name", "age"])
df.show()

# ============ DATAFRAME PART ============

# createDataFrame()
print("\n=== createDataFrame() ===")
df2 = spark.createDataFrame([("David", 24), ("Eva", 25)], ["name", "age"])
df2.show()

# where() and filter()
print("\n=== where() and filter() ===")
df.filter(col("age") > 21).show()
df.where("age > 21").show()

# withColumn()
print("\n=== withColumn() ===")
df = df.withColumn("age_plus_1", col("age") + 1)
df.show()

# withColumnRenamed()
df = df.withColumnRenamed("age_plus_1", "age_next_year")
df.show()

# drop()
df = df.drop("age_next_year")
df.show()

# distinct()
print("\n=== distinct() ===")
df.distinct().show()

# groupBy()
print("\n=== groupBy() ===")
df.groupBy("name").count().show()

# join()
print("\n=== join() ===")
df_join = df.join(df2, on="name", how="inner")
df_join.show()

# map() vs mapPartitions()
print("\n=== map() vs mapPartitions() ===")
rdd_mapped = rdd.map(lambda x: (x[0], x[1] + 10))
rdd_partitioned = rdd.mapPartitions(lambda iter: [(x[0], x[1] * 2) for x in iter])
print("map:", rdd_mapped.collect())
print("mapPartitions:", rdd_partitioned.collect())

# foreach() vs foreachPartition()
print("\n=== foreach() vs foreachPartition() ===")

def print_each(record):
    print("foreach:", record)

def print_partition(partition):
    for record in partition:
        print("foreachPartition:", record)

rdd.foreach(print_each)
rdd.foreachPartition(print_partition)

# pivot()
print("\n=== pivot() ===")
df_pivot = df.groupBy("name").pivot("age").count()
df_pivot.show()

# union()
print("\n=== union() ===")
df_union = df.union(df2)
df_union.show()

# collect()
print("\n=== collect() ===")
print("Collect result:", df.collect())

# cache() and persist()
print("\n=== cache() and persist() ===")
df.cache()
df.count()  # Triggers caching
df.persist()
df.count()  # Triggers persist

# udf()
print("\n=== udf() ===")
def upper_name(name):
    return name.upper()

upper_udf = udf(upper_name, StringType())
df = df.withColumn("upper_name", upper_udf(col("name")))
df.show()

# Stop Spark session
spark.stop()



=== a) Repartition & Coalesce ===
Initial Partitions: 4
After Repartition: 6
After Coalesce: 2
Default Shuffle Partitions: 200
Updated Shuffle Partitions: 10

=== b) Broadcast & Accumulator ===
With Gender Info: [('Alice', 21, 'F'), ('Bob', 22, 'M'), ('Charlie', 23, 'M'), ('Alice', 21, 'F')]
Accumulator count: 4

=== c) RDD to DataFrame ===
+-------+---+
|   name|age|
+-------+---+
|  Alice| 21|
|    Bob| 22|
|Charlie| 23|
|  Alice| 21|
+-------+---+


=== createDataFrame() ===
+-----+---+
| name|age|
+-----+---+
|David| 24|
|  Eva| 25|
+-----+---+


=== where() and filter() ===
+-------+---+
|   name|age|
+-------+---+
|    Bob| 22|
|Charlie| 23|
+-------+---+

+-------+---+
|   name|age|
+-------+---+
|    Bob| 22|
|Charlie| 23|
+-------+---+


=== withColumn() ===
+-------+---+----------+
|   name|age|age_plus_1|
+-------+---+----------+
|  Alice| 21|        22|
|    Bob| 22|        23|
|Charlie| 23|        24|
|  Alice| 21|        22|
+-------+---+----------+

+-------+---+-------