<a href="https://colab.research.google.com/github/ngtianxun08/bigDataworkshops/blob/main/Workshop_03_Apache_Spark_Core_Actions_and_Transformations.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [4]:
# Uninstall existing pyspark and findspark
!pip uninstall -y pyspark findspark
# install pyspark using pip
!pip install --ignore-install -q pyspark==3.5.1
# install findspark using pip
!pip install --ignore-install -q findspark

Found existing installation: pyspark 4.0.0
Uninstalling pyspark-4.0.0:
  Successfully uninstalled pyspark-4.0.0
Found existing installation: findspark 2.0.1
Uninstalling findspark-2.0.1:
  Successfully uninstalled findspark-2.0.1


In [5]:
from pyspark.sql import SparkSession
import collections

spark = SparkSession.builder.master("local").appName("Colab Demo for Actions and Transofrmations").config('spark.ui.port', '4050').getOrCreate()

Map

In [6]:
data = [1, 2, 3, 4, 5]
rdd = spark.sparkContext.parallelize(data)
# Multiply each element by 2
multiplied = rdd.map(lambda x: x * 2)
print(multiplied.collect())  # [2, 4, 6, 8, 10]

[2, 4, 6, 8, 10]


In [7]:
data_tuples = [(1, 'a'), (2, 'b'), (3, 'c')]
rdd_tuples = spark.sparkContext.parallelize(data_tuples)
# Add 1 to the number in each tuple
added = rdd_tuples.map(lambda x: (x[0] + 1, x[1]))
print(added.collect())  # [(2, 'a'), (3, 'b'), (4, 'c')]

[(2, 'a'), (3, 'b'), (4, 'c')]


In [8]:
data_strings = ["Hello", "PySpark", "World"]
rdd_strings = spark.sparkContext.parallelize(data_strings)
# Convert each string to uppercase
uppercase = rdd_strings.map(lambda x: x.upper())
print(uppercase.collect())  # ['HELLO', 'PYSPARK', 'WORLD']

['HELLO', 'PYSPARK', 'WORLD']


In [9]:
numbers = [1, 2, 3, 4, 5]
rdd_numbers = spark.sparkContext.parallelize(numbers)

# Perform a series of operations: square, then add 2
result = rdd_numbers.map(lambda x: x**2).map(lambda y: y + 2)
print(result.collect())  # [3, 6, 11, 18, 27]

[3, 6, 11, 18, 27]


In [10]:
def square_and_add_one(n):
  return n**2 + 1

numbers = [1, 2, 3, 4, 5]
rdd_numbers = spark.sparkContext.parallelize(numbers)
result = rdd_numbers.map(square_and_add_one)
print(result.collect())  # [2, 5, 10, 17, 26]

[2, 5, 10, 17, 26]


In [11]:
x = [1, 2, 3, 4, 5]
squared_rdd = rdd.map(lambda x: x*x)
print(squared_rdd.collect()) # [1, 4, 9, 16, 25]

[1, 4, 9, 16, 25]


Flatmap

In [12]:
sentences = ["Hello world", "I am learning PySpark"]
rdd_sentences = spark.sparkContext.parallelize(sentences)
words = rdd_sentences.flatMap(lambda x: x.split(" "))
print(words.collect())  # ['Hello', 'world', 'I', 'am', 'learning', 'PySpark']

['Hello', 'world', 'I', 'am', 'learning', 'PySpark']


In [13]:
numbers = [1, 3, 4]
rdd_numbers = spark.sparkContext.parallelize(numbers)
ranges = rdd_numbers.flatMap(lambda x: range(x + 1))
print(ranges.collect())  # [0, 1, 0, 1, 2, 3, 0, 1, 2, 3, 4]

[0, 1, 0, 1, 2, 3, 0, 1, 2, 3, 4]


In [14]:
lists = [[1, 2, 3], [4, 5], [6, 7, 8, 9]]
rdd_lists = spark.sparkContext.parallelize(lists)
flattened = rdd_lists.flatMap(lambda x: x)
print(flattened.collect())  # [1, 2, 3, 4, 5, 6, 7, 8, 9]

[1, 2, 3, 4, 5, 6, 7, 8, 9]


In [16]:
pairs = [("a", [1, 2, 3]), ("b", [4, 5])]
rdd_pairs = spark.sparkContext.parallelize(pairs)
expanded_pairs = rdd_pairs.flatMap(lambda x: [(x[0], i) for i in x[1]])
print(expanded_pairs.collect())
# [('a', 1), ('a', 2), ('a', 3), ('b', 4), ('b', 5)]

[('a', 1), ('a', 2), ('a', 3), ('b', 4), ('b', 5)]


In [18]:
sentences = ["Hello world", "PySpark is awesome"]
rdd_sentences = spark.sparkContext.parallelize(sentences)
word_lengths = rdd_sentences.flatMap(lambda x: [(word, len(word)) for word in x.split(" ")])
print(word_lengths.collect())  # [('Hello', 5), ('world', 5), ('PySpark', 7), ('is', 2), ('awesome', 7)]

[('Hello', 5), ('world', 5), ('PySpark', 7), ('is', 2), ('awesome', 7)]


Filter

In [21]:
sc = spark.sparkContext

In [19]:
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
even_rdd = rdd.filter(lambda x: x % 2 == 0)
print(even_rdd.collect())  # [2, 4, 6, 8, 10]

[2, 4, 6, 8, 10]


In [23]:
words_rdd = sc.parallelize(["Spark", "Python", "Data", "ML", "AI"])
long_words_rdd = words_rdd.filter(lambda x: len(x) > 4)
print(long_words_rdd.collect())  # ['Spark', 'Python']

['Spark', 'Python']


In [24]:
targets = ["Python", "Java", "Scala"]
languages_rdd = sc.parallelize(["Python", "Java", "C++", "Scala", "Go"])
filtered_rdd = languages_rdd.filter(lambda x: x in targets)
print(filtered_rdd.collect())  # ['Python', 'Java', 'Scala']

['Python', 'Java', 'Scala']


groupByKey

In [25]:
rdd = spark.sparkContext.parallelize([(1, "apple"), (2, "banana"), (3,
"cherry"), (1, "apricot"), (2, "blueberry")])
grouped_rdd = rdd.groupByKey()
# To see the results:
for key, values in grouped_rdd.collect():
  print(f"{key}: {list(values)}")

1: ['apple', 'apricot']
2: ['banana', 'blueberry']
3: ['cherry']


reduceByKey

In [26]:
rdd = spark.sparkContext.parallelize([(1, 1), (2, 1), (3, 1), (1, 1),
(2, 1)])
summed_rdd = rdd.reduceByKey(lambda x, y: x + y)
print(summed_rdd.collect())
# [(1, 2), (2, 2), (3, 1)]

[(1, 2), (2, 2), (3, 1)]


sortByKey

In [27]:
rdd = spark.sparkContext.parallelize([(2, "banana"), (1, "apple"), (3,
"cherry")])
sorted_rdd = rdd.sortByKey()
print(sorted_rdd.collect())
# [(1, 'apple'), (2, 'banana'), (3, 'cherry')]

[(1, 'apple'), (2, 'banana'), (3, 'cherry')]


In [28]:
sales_rdd = sc.parallelize([(101, 200.0), (102, 150.0), (101, 250.0),
(103, 300.0)])
total_sales = sales_rdd.reduceByKey(lambda x, y: x + y)
sorted_sales = total_sales.sortByKey()
print(sorted_sales.collect())
# [(101, 450.0), (102, 150.0), (103, 300.0)]

[(101, 450.0), (102, 150.0), (103, 300.0)]


Join

In [29]:
rdd1 = sc.parallelize([(1, "apple"), (2, "banana"), (3, "cherry")])
rdd2 = sc.parallelize([(1, "fruit"), (2, "fruit"), (4, "vegetable")])
joined_rdd = rdd1.join(rdd2)
print(joined_rdd.collect())
# [(1, ('apple', 'fruit')), (2, ('banana', 'fruit'))]

[(2, ('banana', 'fruit')), (1, ('apple', 'fruit'))]


Union

In [30]:
rdd3 = sc.parallelize([(1, "apple"), (2, "banana")])
rdd4 = sc.parallelize([(3, "cherry"), (4, "date")])
union_rdd = rdd3.union(rdd4)
print(union_rdd.collect())
# [(1, 'apple'), (2, 'banana'), (3, 'cherry'), (4, 'date')]

[(1, 'apple'), (2, 'banana'), (3, 'cherry'), (4, 'date')]


Actions

Collect


In [31]:
print(rdd.collect())

[(2, 'banana'), (1, 'apple'), (3, 'cherry')]


Count

In [32]:
print(rdd.count())

3


first

In [33]:
print(rdd.first())

(2, 'banana')


Take

In [34]:
print(rdd.take(3))

[(2, 'banana'), (1, 'apple'), (3, 'cherry')]


reduce

In [35]:
sum = rdd.reduce(lambda a, b: a + b)
print(sum)

(2, 'banana', 1, 'apple', 3, 'cherry')
