In [1]:
# Set the Pyspark enviroment variables
import os
os.environ['SPARK_HOME'] = "/Users/patricia/Apps/Spark"
os.environ['PYSPARK_DRIVER_PYTHON'] = 'jupyter' # only if you use jupyter notebook
os.environ['PYSPARK_DRIVER_PYTHON_OPTS'] = 'notebook' # only if you use jupyter notebook
os.environ['PYSPARK_PYTHON'] = 'python'

In [3]:
from pyspark.sql import SparkSession

In [4]:
# Create a SparkSession

spark = SparkSession.builder.appName("RDD-Demo").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/06 15:46:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
spark

24/11/06 15:46:41 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


### How to create RDDs

In [7]:
numbers = [1,2,3,4]
rdd = spark.sparkContext.parallelize(numbers)

In [8]:
# Collect action: Retrieve all elementos of the RDD
rdd.collect()

[1, 2, 3, 4]

In [9]:
# Create an RDD from a list of tuples
data = [("Alice",25),("Ali",22),("Alice",29),("Ali",30)]
rdd = spark.sparkContext.parallelize(data)
rdd.collect()

[('Alice', 25), ('Ali', 22), ('Alice', 29), ('Ali', 30)]

### RDDs Operation: Actions

In [11]:
# Count action: Count the number of elements in the RDD
count = rdd.count()
print("The total number of elements in rdd: ", count)



The total number of elements in rdd:  4


In [12]:
# First action: Retrieve the first element of the RDD
first_element = rdd.first()
print("The first element of the rdd: ", first_element)

The first element of the rdd:  ('Alice', 25)


In [13]:
# Take action: Retrieve the n elements of the RDD
taken_elements = rdd.take(2)
print("The first two elements of the rdd: ", taken_elements)

The first two elements of the rdd:  [('Alice', 25), ('Ali', 22)]


In [14]:
# Foreach action: Print each element of the RDD
rdd.foreach(lambda x: print(x))

('Ali', 30)
('Ali', 22)
('Alice', 29)
('Alice', 25)


### RDDs Operation: Transformations

In [15]:
# Map transformatios: Convert name to uppercase
mapped_rdd = rdd.map(lambda x: (x[0].upper(),x[1]))

In [17]:
print("RDD with uppercase name: ", mapped_rdd.collect())

RDD with uppercase name:  [('ALICE', 25), ('ALI', 22), ('ALICE', 29), ('ALI', 30)]


In [20]:
# Filter transformatios: Filter records where age is greater than 30
filtered_rdd = rdd.filter(lambda x:x[1]>25)
filtered_rdd.collect()


[('Alice', 29), ('Ali', 30)]

In [21]:
# ReduceByKey transformatios: Calculate the total age for each name

reduced_rdd = rdd.reduceByKey(lambda x,y: x+y)
reduced_rdd.collect()

[('Alice', 54), ('Ali', 52)]

In [22]:
# SortBy transformatio: Sort the RDD by age in descendig order

sorted_rdd = rdd.sortBy(lambda x:x[1], ascending=False)
sorted_rdd.collect()

[('Ali', 30), ('Alice', 29), ('Alice', 25), ('Ali', 22)]

### Save RDDs to text file and read RDDs from text file

In [23]:
# Save action: Save the RDD to a text file
rdd.saveAsTextFile("output.txt")

In [None]:
# Creat RDD from text file
rdd_text = spark.sparkContext.textFile("output.txt")
rdd_te