In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("RDD-Demo").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [2]:
numbers = [1, 2, 3, 4, 5]
rdd = spark.sparkContext.parallelize(numbers)

In [3]:
rdd.collect()

[1, 2, 3, 4, 5]

In [4]:
data = [("Alice", 25), ("Bob", 30), ("Charlie", 35), ("Alice", 40)]
rdd = spark.sparkContext.parallelize(data)

In [5]:
print(f"All elements of the rdd: {rdd.collect()}")

All elements of the rdd: [('Alice', 25), ('Bob', 30), ('Charlie', 35), ('Alice', 40)]


# RDDs Operation: Actions

In [6]:
print(f"The total number of elements in rdd: {rdd.count()}")

[Stage 2:>                                                        (0 + 16) / 16]

The total number of elements in rdd: 4


                                                                                

In [7]:
print(f"The first element of the rdd: {rdd.first()}")

The first element of the rdd: ('Alice', 25)


In [9]:
print(f"The first two elements of the rdd: {rdd.take(2)}")

The first two elements of the rdd: [('Alice', 25), ('Bob', 30)]


In [10]:
rdd.foreach(lambda x: print(x))

('Bob', 30)
('Charlie', 35)
('Alice', 25)
('Alice', 40)


# RDDs Operation: Transformation

In [11]:
mapped_rdd = rdd.map(lambda x: (x[0].upper(), x[1]))

In [12]:
mapped_rdd

PythonRDD[9] at RDD at PythonRDD.scala:53

In [13]:
print(f"rdd with uppercase name: {mapped_rdd.collect()}")

rdd with uppercase name: [('ALICE', 25), ('BOB', 30), ('CHARLIE', 35), ('ALICE', 40)]


In [15]:
filtered_rdd = rdd.filter(lambda x: x[1] > 30)
filtered_rdd.collect()

[('Charlie', 35), ('Alice', 40)]

In [16]:
reduced_rdd = rdd.reduceByKey(lambda x, y: x + y)
reduced_rdd.collect()

[('Charlie', 35), ('Bob', 30), ('Alice', 65)]

In [17]:
sorted_rdd = rdd.sortBy(lambda x: x[1], ascending=False)
sorted_rdd.collect()

[('Alice', 40), ('Charlie', 35), ('Bob', 30), ('Alice', 25)]

# Save RDDs to text file and read RDDs from text file

In [18]:
rdd.saveAsTextFile("output.txt")

In [None]:
rdd_text = spark.sparkContext.textFile("output.txt")