In [22]:
import time
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CreateRDD").getOrCreate()

sc = spark.sparkContext

In [18]:
raw_data = ["apple core", "banana peel", "orange zest", "grape skin", "melon rind", "apple juice", "apple pie"]

rdd = sc.parallelize(raw_data)

In [19]:
print(rdd.collect())

['apple core', 'banana peel', 'orange zest', 'grape skin', 'melon rind', 'apple juice', 'apple pie']


In [20]:
rdd_map = rdd.map(lambda x: x.upper())
print(rdd_map.collect())

[Stage 1:>                                                        (0 + 10) / 10]

['APPLE CORE', 'BANANA PEEL', 'ORANGE ZEST', 'GRAPE SKIN', 'MELON RIND', 'APPLE JUICE', 'APPLE PIE']


                                                                                

In [21]:
rdd_flatmap = rdd.flatMap(lambda x: x.split())
print(rdd_flatmap.collect())

['apple', 'core', 'banana', 'peel', 'orange', 'zest', 'grape', 'skin', 'melon', 'rind', 'apple', 'juice', 'apple', 'pie']


In [10]:
filtered_rdd = rdd.map(lambda x: x.upper()) \
    .filter(lambda x: "APPLE" in x) \
    .flatMap(lambda x: x.split(" "))

print(filtered_rdd.collect())

['APPLE', 'CORE', 'APPLE', 'JUICE', 'APPLE', 'PIE']


In [24]:
sales_data = [
    ("Electronics", 1200), 
    ("Furniture", 300), 
    ("Electronics", 800), 
    ("Furniture", 450), 
    ("Grocery", 100)
]

sales_rdd = sc.parallelize(sales_data)

In [25]:
start_time = time.time()
total_sales_by_category = sales_rdd.reduceByKey(lambda x, y: x + y)

print(total_sales_by_category.collect())
end_time = time.time()
print(f"Execution time: {end_time - start_time} seconds")

[('Electronics', 2000), ('Furniture', 750), ('Grocery', 100)]
Execution time: 0.17885589599609375 seconds


In [None]:
start_time = time.time()
total_sales_by_category = sales_rdd.groupByKey().mapValues(sum)

print(total_sales_by_category.collect())
end_time = time.time()
print(f"Execution time: {end_time - start_time} seconds")

[('Electronics', 2000), ('Furniture', 750), ('Grocery', 100)]
Execution time: 0.12992382049560547 seconds


26/02/18 12:07:16 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 934562 ms exceeds timeout 120000 ms
26/02/18 12:07:16 WARN SparkContext: Killing executors is not supported by current scheduler.
26/02/18 12:07:21 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:53)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:359)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:132)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$

In [14]:
users = sc.parallelize([(1, "Alice"), (2, "Bob")])
cities = sc.parallelize([(1, "New York"), (2, "Los Angeles")])
user_city = users.join(cities)

print(user_city.collect())

[(1, ('Alice', 'New York')), (2, ('Bob', 'Los Angeles'))]


In [15]:
spark.stop()