In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('rdd_practicals').getOrCreate()

spark.sparkContext.getConf().getAll()

[('spark.eventLog.enabled', 'true'),
 ('spark.dynamicAllocation.minExecutors', '1'),
 ('spark.eventLog.dir',
  'gs://dataproc-temp-us-west1-423929572205-dq3ru5gd/aebf22d7-7e3e-417d-be10-34d127f8f21e/spark-job-history'),
 ('spark.sql.warehouse.dir', 'file:/spark-warehouse'),
 ('spark.sql.autoBroadcastJoinThreshold', '21m'),
 ('spark.yarn.historyServer.address', 'jobs-cluster-m:18080'),
 ('spark.yarn.am.memory', '640m'),
 ('spark.history.fs.logDirectory',
  'gs://dataproc-temp-us-west1-423929572205-dq3ru5gd/aebf22d7-7e3e-417d-be10-34d127f8f21e/spark-job-history'),
 ('spark.driver.appUIAddress',
  'http://jobs-cluster-m.us-west1-a.c.dev-de-training.internal:34521'),
 ('spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_HOSTS',
  'jobs-cluster-m'),
 ('spark.executor.instances', '2'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.yarn.unmanagedAM.enabled', 'true'),
 ('spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_URI_BASES'

In [1]:
from pyspark import SparkContext


# Large RDD
data_rdd = sc.parallelize([("John", 25), ("Jane", 30), ("Jake", 35), ("Jill", 40), ("Joe", 45)])

# Small set of allowed names
allowed_names = {"John", "Jane", "Jake"}

# Broadcast the allowed names
broadcast_names = sc.broadcast(allowed_names)

# Filter the RDD based on the broadcast variable
filtered_rdd = data_rdd.filter(lambda x: x[0] in broadcast_names.value)
print(filtered_rdd.collect())  # Output: [('John', 25), ('Jane', 30), ('Jake', 35)]


[Stage 0:>                                                          (0 + 1) / 2]

[('John', 25), ('Jane', 30), ('Jake', 35)]


24/08/24 02:18:23 WARN org.apache.spark.storage.BlockManagerMasterEndpoint: No more replicas available for broadcast_0_python !


In [2]:
# Create an accumulator to count errors
error_accumulator = sc.accumulator(0)

# Example RDD with some invalid data
data_rdd = sc.parallelize(["100", "200", "three hundred", "400", "five hundred"])

# Function to convert strings to integers and count errors
def safe_convert(x):
    try:
        return int(x)
    except ValueError:
        error_accumulator.add(1)
        return None

# Convert the RDD and count errors
converted_rdd = data_rdd.map(safe_convert).filter(lambda x: x is not None)
print(converted_rdd.collect())  # Output: [100, 200, 400]
print(f"Number of errors: {error_accumulator.value}")  # Output: Number of errors: 2


[Stage 1:>                                                          (0 + 1) / 2]

[100, 200, 400]
Number of errors: 2


                                                                                

In [None]:
# Large RDD of sales transactions: (product_id, amount)
sales_rdd = sc.parallelize([
    ("p1", 100), ("p2", 150), ("p3", 200), 
    ("p4", 250), ("p1", 300), ("p5", 350), 
    ("invalid", 400)
])

# Small dictionary mapping product IDs to categories
product_categories = {"p1": "electronics", "p2": "electronics", "p3": "clothing", "p4": "clothing", "p5": "groceries"}

# Broadcast the product categories
broadcast_categories = sc.broadcast(product_categories)

# Accumulator to count invalid transactions
invalid_transactions = sc.accumulator(0)

# Function to map product IDs to categories and handle errors
def map_to_category(sale):
    product_id, amount = sale
    if product_id in broadcast_categories.value:
        category = broadcast_categories.value[product_id]
        return (category, amount)
    else:
        invalid_transactions.add(1)
        return None

# Aggregate sales by category
category_sales_rdd = sales_rdd.map(map_to_category).filter(lambda x: x is not None).reduceByKey(lambda x, y: x + y)
print(category_sales_rdd.collect())  # Output: [('electronics', 550), ('clothing', 450), ('groceries', 350)]
print(f"Number of invalid transactions: {invalid_transactions.value}")  # Output: Number of invalid transactions: 1
