In [1]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder\
        .appName("RDD-manipulation")\
        .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/07/25 07:57:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
spark

# Create RDD

In [8]:
numbers = [1,2,3,4,5]
rdd = spark.sparkContext.parallelize(numbers)

In [9]:
rdd

ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:289

In [10]:
rdd.collect()

[1, 2, 3, 4, 5]

In [15]:
world_champions = [
    ("Brasil", 5),
    ("Alemania", 4),
    ("Italia", 4),
    ("Argentina", 3),
    ("Uruguay", 2),
    ("Francia", 2),
    ("Inglaterra", 1),
    ("España", 1)
]

In [16]:
rdd = spark.sparkContext.parallelize(world_champions)

In [17]:
rdd

ParallelCollectionRDD[2] at readRDDFromFile at PythonRDD.scala:289

In [18]:
rdd.collect()

[('Brasil', 5),
 ('Alemania', 4),
 ('Italia', 4),
 ('Argentina', 3),
 ('Uruguay', 2),
 ('Francia', 2),
 ('Inglaterra', 1),
 ('España', 1)]

# RDD Operations: Actions
Perform operations on RDDs, eager evaluated.

In [19]:
# Count
rdd.count()

                                                                                

8

In [20]:
# First
rdd.first()

('Brasil', 5)

In [21]:
rdd.take(2)

[('Brasil', 5), ('Alemania', 4)]

In [23]:
rdd.foreach(lambda x: print(x[0], x[1]/22))

Brasil 0.22727272727272727
Alemania 0.18181818181818182
Inglaterra 0.045454545454545456
España 0.045454545454545456
Uruguay 0.09090909090909091
Francia 0.09090909090909091
Italia 0.18181818181818182
Argentina 0.13636363636363635


# RDD Operations: Transformations
Return a new RDD, lazy evaluated.

In [24]:
# Map
mapper_rdd = rdd.map(lambda x: (x[0], x[1]/22))

In [25]:
mapper_rdd

PythonRDD[7] at RDD at PythonRDD.scala:53

In [26]:
# Have to call an Action to execute the transformation
mapper_rdd.collect()

[('Brasil', 0.22727272727272727),
 ('Alemania', 0.18181818181818182),
 ('Italia', 0.18181818181818182),
 ('Argentina', 0.13636363636363635),
 ('Uruguay', 0.09090909090909091),
 ('Francia', 0.09090909090909091),
 ('Inglaterra', 0.045454545454545456),
 ('España', 0.045454545454545456)]

In [27]:
# Filter
filter_rdd = rdd.filter(lambda x: x[1] > 2)

In [28]:
filter_rdd.collect()

[('Brasil', 5), ('Alemania', 4), ('Italia', 4), ('Argentina', 3)]

In [35]:
# Sortby
sorted_rdd = rdd.sortBy(lambda x: x[1], ascending = True)

In [36]:
sorted_rdd.collect()

[('Inglaterra', 1),
 ('España', 1),
 ('Uruguay', 2),
 ('Francia', 2),
 ('Argentina', 3),
 ('Alemania', 4),
 ('Italia', 4),
 ('Brasil', 5)]

In [33]:
# Reduce by key
reduced_rdd = rdd.reduceByKey(lambda x, y : x + y)

In [34]:
reduced_rdd.collect()

[('Francia', 2),
 ('Inglaterra', 1),
 ('Brasil', 5),
 ('Uruguay', 2),
 ('España', 1),
 ('Alemania', 4),
 ('Italia', 4),
 ('Argentina', 3)]

# Save RDDs from text file, read RDDs from text file

In [37]:
rdd.saveAsTextFile("world_champions.txt")

Review of the output file. Check that .txt is actually a folder and the rdd was partitionated:
~~~
[nic0der@fedora first_project]$ ls
README.md  SparkSession_SparkContext.ipynb  Untitled.ipynb  venv  world_champions.txt
[nic0der@fedora first_project]$ cd world_champions.txt/
[nic0der@fedora world_champions.txt]$ ls
part-00000  part-00001  part-00002  part-00003  _SUCCESS
[nic0der@fedora world_champions.txt]$ cat part-0000
cat: part-0000: No such file or directory
[nic0der@fedora world_champions.txt]$ cat part-00000
('Brasil', 5)
('Alemania', 4)
[nic0der@fedora world_champions.txt]$ cat part-00001
('Italia', 4)
('Argentina', 3)
[nic0der@fedora world_champions.txt]$ cat part-00002
('Uruguay', 2)
('Francia', 2)
[nic0der@fedora world_champions.txt]$ cat part-00003
('Inglaterra', 1)
('España', 1)
[nic0der@fedora world_champions.txt]$ cat _SUCCESS
[nic0der@fedora world_champions.txt]$ S

~~~

In [40]:
rdd_text = spark.sparkContext.textFile("world_champions.txt")

In [41]:
rdd_text.collect()

["('Inglaterra', 1)",
 "('España', 1)",
 "('Brasil', 5)",
 "('Alemania', 4)",
 "('Italia', 4)",
 "('Argentina', 3)",
 "('Uruguay', 2)",
 "('Francia', 2)"]

# Shutdown the SparkSession

In [42]:
spark.stop()