<a href="https://colab.research.google.com/github/kareemullah123456789/bigdatafoundation-june/blob/main/rdd_ptimized.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
pip install pyspark



In [2]:
#RDD (Resilient Distributed Dataset) is the fundamental data structure in Apache Spark. It's an immutable, distributed collection of objects that can be processed in parallel across a cluster.

## Setting Up PySpark
#sc.stop

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import time

# Initialize Spark
conf = SparkConf().setAppName("RDD Tutorial").setMaster("local[*]")
sc = SparkContext(conf=conf)
spark = SparkSession.builder.appName("RDD Tutorial").getOrCreate()


In [4]:
## Basic RDD Operations

### Creating RDDs

# Create RDD from a list
numbers = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])

# Create RDD from a text file
# text_rdd = sc.textFile("path/to/your/file.txt")

# Create RDD with specific number of partitions
numbers_partitioned = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], numSlices=4)


### Transformations and Actions


# Transformations (lazy evaluation)
squared = numbers.map(lambda x: x * x)
evens = numbers.filter(lambda x: x % 2 == 0)
doubled = numbers.map(lambda x: x * 2)

# Actions (trigger computation)
result = squared.collect()
count = evens.count()
sum_result = numbers.reduce(lambda a, b: a + b)

print(f"Squared numbers: {result}")
print(f"Count of even numbers: {count}")
print(f"Sum: {sum_result}")


## Understanding Lazy Evaluation




Squared numbers: [1, 4, 9, 16, 25, 36, 49, 64, 81, 100]
Count of even numbers: 5
Sum: 55
Creating transformations...
Transformations created, but no computation yet!
Computation completed in 0.41 seconds
First 10 results: [2000, 4000, 6000, 8000, 10000, 12000, 14000, 16000, 18000, 20000]


In [5]:
# Demonstrate lazy evaluation
print("Creating transformations...")
large_numbers = sc.parallelize(range(1, 1000000))
filtered = large_numbers.filter(lambda x: x % 1000 == 0)
mapped = filtered.map(lambda x: x * 2)

print("Transformations created, but no computation yet!")

# Only when we call an action, computation happens
start_time = time.time()
result = mapped.collect()
end_time = time.time()

print(f"Computation completed in {end_time - start_time:.2f} seconds")
print(f"First 10 results: {result[:10]}")

Creating transformations...
Transformations created, but no computation yet!
Computation completed in 0.47 seconds
First 10 results: [2000, 4000, 6000, 8000, 10000, 12000, 14000, 16000, 18000, 20000]


In [7]:
def demonstrate_caching():
    # Create a dataset
    large_data = sc.parallelize(range(1, 1000000))

    # Complex transformation that we'll reuse
    processed_data = large_data.map(lambda x: x * x).filter(lambda x: x % 1000 == 0)

    print("=== Without Caching ===")

    # First operation
    start = time.time()
    count1 = processed_data.count()
    time1 = time.time() - start

    # Second operation (recomputes everything)
    start = time.time()
    sum1 = processed_data.reduce(lambda a, b: a + b)
    time2 = time.time() - start

    print(f"First operation (count): {time1:.2f}s")
    print(f"Second operation (sum): {time2:.2f}s")
    print(f"Total time: {time1 + time2:.2f}s")

    print("\n=== With Caching ===")

    # Cache the RDD
    processed_data.cache()

    # First operation (computes and caches)
    start = time.time()
    count2 = processed_data.count()
    time3 = time.time() - start

    # Second operation (uses cached data)
    start = time.time()
    sum2 = processed_data.reduce(lambda a, b: a + b)
    time4 = time.time() - start

    print(f"First operation (count + cache): {time3:.2f}s")
    print(f"Second operation (sum from cache): {time4:.2f}s")
    print(f"Total time: {time3 + time4:.2f}s")

    # Clean up cache
    processed_data.unpersist()

demonstrate_caching()

=== Without Caching ===
First operation (count): 0.69s
Second operation (sum): 0.50s
Total time: 1.19s

=== With Caching ===
First operation (count + cache): 0.72s
Second operation (sum from cache): 0.26s
Total time: 0.98s


In [9]:
from pyspark import StorageLevel

# Create a complex RDD that we'll use multiple times
data = sc.parallelize(range(1, 100000))
expensive_rdd = data.map(lambda x: x * x).filter(lambda x: x % 100 == 0)

# Method 1: cache() - stores in memory only
expensive_rdd.cache()

# Method 2: persist() - more control over storage
expensive_rdd.persist(StorageLevel.MEMORY_AND_DISK)

UnsupportedOperationException: Cannot change storage level of an RDD after it was already assigned a level

In [10]:
## RDD Caching and Persistence

### Why Cache?

#When you perform multiple actions on the same RDD, Spark recomputes the entire lineage each time. Caching stores the RDD in memory/disk to avoid recomputation.

### Cache vs Persist




# Available storage levels:
# - MEMORY_ONLY (default for cache())
# - MEMORY_AND_DISK
# - MEMORY_ONLY_SER (serialized)
# - MEMORY_AND_DISK_SER
# - DISK_ONLY
# - MEMORY_ONLY_2 (replicated)
# - MEMORY_AND_DISK_2


### Practical Caching Example





## Partition Optimization

### Understanding Partitions


# Check number of partitions
data = sc.parallelize(range(1, 1000), numSlices=8)
print(f"Number of partitions: {data.getNumPartitions()}")

# View data distribution across partitions
def show_partition_distribution(rdd):
    partition_sizes = rdd.mapPartitionsWithIndex(
        lambda idx, iterator: [(idx, len(list(iterator)))]
    ).collect()

    for partition_id, size in partition_sizes:
        print(f"Partition {partition_id}: {size} elements")

show_partition_distribution(data)

Number of partitions: 8
Partition 0: 124 elements
Partition 1: 125 elements
Partition 2: 125 elements
Partition 3: 125 elements
Partition 4: 125 elements
Partition 5: 125 elements
Partition 6: 125 elements
Partition 7: 125 elements


In [11]:


### Repartitioning Strategies


# Create unevenly distributed data
uneven_data = sc.parallelize([1]*100 + [2]*200 + [3]*50, numSlices=4)
print("Original distribution:")
show_partition_distribution(uneven_data)

# Repartition to balance load
balanced_data = uneven_data.repartition(6)
print("\nAfter repartitioning:")
show_partition_distribution(balanced_data)

# Coalesce to reduce partitions (more efficient than repartition for reduction)
coalesced_data = balanced_data.coalesce(3)
print("\nAfter coalescing:")
show_partition_distribution(coalesced_data)

Original distribution:
Partition 0: 87 elements
Partition 1: 87 elements
Partition 2: 87 elements
Partition 3: 89 elements

After repartitioning:
Partition 0: 67 elements
Partition 1: 57 elements
Partition 2: 49 elements
Partition 3: 50 elements
Partition 4: 60 elements
Partition 5: 67 elements

After coalescing:
Partition 0: 124 elements
Partition 1: 117 elements
Partition 2: 109 elements


In [13]:
## Advanced Optimization Techniques

### 1. Avoiding Shuffles


# Bad: This causes a shuffle
def bad_approach():
    data = sc.parallelize([(i, i*2) for i in range(1000)])
    # groupByKey causes expensive shuffle
    grouped = data.groupByKey()
    result = grouped.map(lambda x: (x[0], sum(x[1])))
    return result.collect()

# Good: Use reduceByKey instead
def good_approach():
    data = sc.parallelize([(i, i*2) for i in range(1000)])
    # reduceByKey combines locally before shuffling
    result = data.reduceByKey(lambda a, b: a + b)
    return result.collect()

print("Demonstrating shuffle optimization...")
print("bad approach :",bad_approach())
print("good approach :",good_approach())

Demonstrating shuffle optimization...
bad approach : [(0, 0), (2, 4), (4, 8), (6, 12), (8, 16), (10, 20), (12, 24), (14, 28), (16, 32), (18, 36), (20, 40), (22, 44), (24, 48), (26, 52), (28, 56), (30, 60), (32, 64), (34, 68), (36, 72), (38, 76), (40, 80), (42, 84), (44, 88), (46, 92), (48, 96), (50, 100), (52, 104), (54, 108), (56, 112), (58, 116), (60, 120), (62, 124), (64, 128), (66, 132), (68, 136), (70, 140), (72, 144), (74, 148), (76, 152), (78, 156), (80, 160), (82, 164), (84, 168), (86, 172), (88, 176), (90, 180), (92, 184), (94, 188), (96, 192), (98, 196), (100, 200), (102, 204), (104, 208), (106, 212), (108, 216), (110, 220), (112, 224), (114, 228), (116, 232), (118, 236), (120, 240), (122, 244), (124, 248), (126, 252), (128, 256), (130, 260), (132, 264), (134, 268), (136, 272), (138, 276), (140, 280), (142, 284), (144, 288), (146, 292), (148, 296), (150, 300), (152, 304), (154, 308), (156, 312), (158, 316), (160, 320), (162, 324), (164, 328), (166, 332), (168, 336), (170, 340

In [None]:
### 2. Broadcast Variables



# When you need to use a large dataset in transformations
lookup_table = {i: f"value_{i}" for i in range(1000)}

# Broadcast the lookup table
broadcast_lookup = sc.broadcast(lookup_table)

# Use broadcast variable in transformations
data = sc.parallelize(range(100))
enriched = data.map(lambda x: (x, broadcast_lookup.value.get(x, "unknown")))

print("Sample enriched data:", enriched.take(5))

# Clean up broadcast variable
broadcast_lookup.unpersist()