## Introduction to Spark

Industries are using Hadoop extensively to analyze their data sets. The reason is that Hadoop framework is based on a simple programming model (MapReduce), and it enables a computing solution that is scalable, flexible, fault-tolerant and cost effective. Here, the main concern is to maintain speed in processing large datasets in terms of waiting time between queries and waiting time to run the program.

Problems of Hadoop: No functionality available to minimize shuffling, minimize storage on hard disk and no efficient scheduling mechanism available - workflow management is weak (Apache oozie is a workflow manager for Hadoop but its technology is inefficient as compared to Spark’s DAG).


## Connection to Spark Cluster (PySpark)

In [1]:
import findspark
findspark.init()

In [2]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('jupyter-spark') \
    .enableHiveSupport()\
    .getOrCreate()
sc = spark.sparkContext

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
# Checking if spark is working or not
spark

In [4]:
!pwd

/notebook


## Loading file on HDFS

In [None]:
# !hadoop fs -mkdir -p /tmp/spark/

In [5]:
!hdfs dfs -put /notebook/input.txt /tmp/spark/
!hadoop fs -ls /tmp/spark

put: `/tmp/spark/input.txt': File exists
Found 1 items
-rw-r--r--   2 jovyan supergroup        114 2023-12-23 06:53 /tmp/spark/input.txt


In [6]:
rdd = sc.textFile('hdfs:///tmp/spark/input.txt')
# rdd = spark.read.text('file:///notebook/input.txt')
# with open('input.txt', 'r') as reader:
#     rdd = sc.parallelize(reader.readlines())

In [7]:
rdd.foreach(lambda f: print(f))

                                                                                

In [8]:
# since the above function is not working we'll use 
rdd.collect()

['hey i did not want this but that is how the world works',
 'do you know what that means',
 'the world is round and turning']

### Get the number of partitions:

In [9]:
rdd.getNumPartitions()

2

Re-partition:

In [10]:
reparRDD = rdd.repartition(4)
reparRDD.getNumPartitions()

4

## map():

The map function iterates over every line in RDD and splits into new RDD. Using map() transformation we take in any function, and that function is applied to every element of RDD.

In the map, we have the flexibility that the input and the return type of RDD may differ from each other. For example, we can input RDD type as String, after applying the map() function the return RDD can be Boolean.

For example, in RDD {1, 2, 3, 4, 5} if we apply “rdd.map(x=>x+2)” we will get the result as (3, 4, 5, 6, 7).


In [11]:
mapFile = rdd.map(lambda line: (line, len(line)))
# mapFile.foreach(lambda f: print(f))
mapFile.collect()

[('hey i did not want this but that is how the world works', 55),
 ('do you know what that means', 27),
 ('the world is round and turning', 30)]

## flatMap():

With the help of flatMap() function, to each input element, we have many elements in an output RDD. The most simple use of flatMap() is to split each input string into words. Map and flatMap are similar in the way that they take a line from input RDD and apply a function on that line. The key difference between map() and flatMap() is map() returns only one element, while flatMap() can return a list of elements.


In [12]:
for line in rdd.collect():
    print(line)
rdd2 = rdd.flatMap(lambda f: f.split(" "))
# rdd2.foreach(lambda f: print(f))
rdd2.collect()

hey i did not want this but that is how the world works
do you know what that means
the world is round and turning


['hey',
 'i',
 'did',
 'not',
 'want',
 'this',
 'but',
 'that',
 'is',
 'how',
 'the',
 'world',
 'works',
 'do',
 'you',
 'know',
 'what',
 'that',
 'means',
 'the',
 'world',
 'is',
 'round',
 'and',
 'turning']

In [13]:
# Create another RDD (rdd3) by applying the map transformation
rdd3 = rdd2.map(lambda m: (m, 1))

# Use foreach to print each element in the new RDD (rdd3)
rdd3.foreach(lambda x: print(x))
rdd3.collect()

[('hey', 1),
 ('i', 1),
 ('did', 1),
 ('not', 1),
 ('want', 1),
 ('this', 1),
 ('but', 1),
 ('that', 1),
 ('is', 1),
 ('how', 1),
 ('the', 1),
 ('world', 1),
 ('works', 1),
 ('do', 1),
 ('you', 1),
 ('know', 1),
 ('what', 1),
 ('that', 1),
 ('means', 1),
 ('the', 1),
 ('world', 1),
 ('is', 1),
 ('round', 1),
 ('and', 1),
 ('turning', 1)]

## Filter transformation:

Returns a new RDD, containing only the elements that meet a predicate. For example, suppose RDD contains the first five natural numbers (1, 2, 3, 4, and 5) and the predicate is a check for an even number. The resulting RDD after the filter will contain only the even numbers i.e., 2 and 4.


In [14]:
# Create another RDD (rdd4) by applying the filter transformation
rdd4 = rdd3.filter(lambda a: a[0].startswith("t"))

# Use foreach to print each element in the filtered RDD (rdd4)
# rdd4.foreach(lambda x: print(x))
rdd4.collect()

[('this', 1), ('that', 1), ('the', 1), ('that', 1), ('the', 1), ('turning', 1)]

## mapPartitions(func):

Converts each partition of the source RDD into many elements of the result (possibly none). In mapPartition(), the map() function is applied on each partition simultaneously. mapPartition() is like a map, but the difference is it runs separately on each partition (block) of the RDD


## mapPartitionWithIndex()

It is like mapPartition; Besides mapPartition it provides func with an integer value representing the index of the partition, and the map() is applied on partition index wise one after the other.

In [15]:
# Create an RDD using parallelize with 3 partitions
rdd1 = sc.parallelize(["yellow", "red", "blue", "cyan", "black"], 3)
print("rdd: >>> ", rdd1)

# Print the elements of the RDD
# rdd1.foreach(lambda x: print(x))
rdd1.collect()

rdd: >>>  ParallelCollectionRDD[14] at readRDDFromFile at PythonRDD.scala:274


['yellow', 'red', 'blue', 'cyan', 'black']

## reduceByKey()

In [17]:
# Create another RDD (rdd5) by applying the reduceByKey transformation
rdd5 = rdd3.reduceByKey(lambda x, y: x + y)

# Use foreach to print each element in the reduced RDD (rdd5)
rdd5.foreach(lambda x: print(x))
rdd5.collect()

[('is', 2),
 ('round', 1),
 ('turning', 1),
 ('hey', 1),
 ('i', 1),
 ('did', 1),
 ('want', 1),
 ('this', 1),
 ('but', 1),
 ('works', 1),
 ('know', 1),
 ('not', 1),
 ('that', 2),
 ('how', 1),
 ('the', 2),
 ('world', 2),
 ('do', 1),
 ('you', 1),
 ('what', 1),
 ('means', 1),
 ('and', 1)]

## sortByKey():

sorting wrt the count (2nd column) and then the 1st column

In [18]:
# Create another RDD (rdd6) by applying the map and sortByKey transformations
rdd6 = rdd5.map(lambda a: (a[1], a[0])).sortByKey()

# Use foreach to print each element in the sorted RDD (rdd6)
rdd6.foreach(lambda x: print(x))
rdd6.collect()

[(1, 'round'),
 (1, 'turning'),
 (1, 'hey'),
 (1, 'i'),
 (1, 'did'),
 (1, 'want'),
 (1, 'this'),
 (1, 'but'),
 (1, 'works'),
 (1, 'know'),
 (1, 'not'),
 (1, 'how'),
 (1, 'do'),
 (1, 'you'),
 (1, 'what'),
 (1, 'means'),
 (1, 'and'),
 (2, 'that'),
 (2, 'the'),
 (2, 'world'),
 (2, 'is')]

## Actions - top:

If ordering is present in our RDD, then we can extract top elements from our RDD using top(). Action top() use default ordering of data.


In [19]:
# Read the text file from HDFS into an RDD
data = spark.read.text("hdfs:///tmp/spark/input.txt").rdd

In [20]:
# Map each line to a tuple of the line and its length
map_file = data.map(lambda line: (line, len(line)))

# Get the top 3 elements based on the length of lines
res = map_file.top(3, key=lambda x: x[1])

# Use foreach to print each element in the result
for element in res:
    print(element)

[Stage 28:>                                                         (0 + 1) / 1]

(Row(value='hey i did not want this but that is how the world works'), 1)
(Row(value='do you know what that means'), 1)
(Row(value='the world is round and turning'), 1)


                                                                                

## Action – foreach

When we have a situation where we want to apply operation on each element of RDD, but it should not return value to the driver. In this case, foreach() function is useful. For example, inserting a record into the database.


In [21]:
rdd6.foreach(lambda x: print(x)) # might not workrdd6
rdd6.collect()

[(1, 'not'),
 (1, 'how'),
 (1, 'do'),
 (1, 'you'),
 (1, 'what'),
 (1, 'means'),
 (1, 'and'),
 (1, 'round'),
 (1, 'turning'),
 (1, 'hey'),
 (1, 'i'),
 (1, 'did'),
 (1, 'want'),
 (1, 'this'),
 (1, 'but'),
 (1, 'works'),
 (1, 'know'),
 (2, 'is'),
 (2, 'that'),
 (2, 'the'),
 (2, 'world')]

In [26]:
rdd_test = rdd6.foreach(lambda x: "sdfkhdfgkjhdfj" )
rdd_test.collect()

AttributeError: 'NoneType' object has no attribute 'collect'

In [27]:
#Count the occurrences of each unique tuple
result = map_file.countByValue()

# Use foreach to print each unique tuple and its count
for (value, count) in result.items():
    print(f"{value}: {count}")

(Row(value='hey i did not want this but that is how the world works'), 1): 1
(Row(value='do you know what that means'), 1): 1
(Row(value='the world is round and turning'), 1): 1


## Action – count

count() returns the number of elements in RDD


In [28]:
print("Count : " + str(rdd6.count()))

Count : 21


## Action – max

Returns the maximum value in the RDD – below we are printing max wrt both columns


In [None]:
datMax = rdd6.max()
print("Max Record : {}, {}".format(datMax[0], datMax[1]))

## Action – reduce

The reduce() function takes the two elements as input from the RDD and then produces the output of the same type as that of the input elements. The simple forms of such function are an addition. We can add the elements of RDD, count the number of words. It accepts commutative and associative operations as an argument.

We will add the total number of words in the file, and then print the last word – this will add the word count pair by pair in an iterative fashion. So, the final word will be printed in b._2


In [None]:
totalWordCount = rdd6.reduce(lambda a, b: (a[0] + b[0], a[1] + b[1]))

print("dataReduce Record : {}".format(totalWordCount[0]))

## Action – take(n)

The action take(n) returns n number of elements from RDD. It tries to cut the number of partition it accesses, so it represents a biased collection. We cannot presume the order of the elements. For example, consider RDD {1, 2, 2, 3, 4, 5, 5, 6} in this RDD “take (4)” will give result { 2, 2, 3, 4}


In [None]:
data3 = rdd6.take(3)

# Use foreach to print each element in the first 3 records
for f in data3:
    print("data3 Key: {}, Value: {}".format(f[0], f[1]))

As you can see above, the selection is not random.

## Action – collect

The action collect() is the common and simplest operation that returns our entire RDDs content to driver program. The application of collect() is unit testing where the entire RDD is expected to fit in memory. As a result, it makes easy to compare the result of RDD with the expected result. Action Collect() had a constraint that all the data should fit in the machine, and copies to the driver.

In [None]:
data = rdd6.collect()

# Use foreach to print each element in the collected list
for f in data:
    print("Key: {}, Value: {}".format(f[0], f[1]))

## union(dataset)

Get the elements of both RDDs in new RDD. The key rule of this function is that the two RDDs should be of the same type. For example, the elements of RDD1 are (Spark, Spark, Hadoop, Flink) and that of RDD2 are (Big data, Spark, Flink) so the resultant rdd1.union(rdd2) will have elements (Spark, Spark, Spark, Hadoop, Flink, Flink, Big data).


In [None]:
#Get the SparkContext from the SparkSession
sc = spark.sparkContext

# Create RDDs
rdd1 = sc.parallelize([(1, "jan", 2016), (3, "nov", 2014), (16, "feb", 2014)])
rdd2 = sc.parallelize([(5, "dec", 2014), (17, "sep", 2015)])
rdd3 = sc.parallelize([(6, "dec", 2011), (16, "may", 2015)])

# Use union to combine the RDDs
rdd_union = rdd1.union(rdd2).union(rdd3)

# Use foreach to print each element in the combined RDD
rdd_union.foreach(lambda x: print(x)) # might not work
rdd_union.collect()

## intersection(dataset)

In [None]:
# Create RDDs
rdd1 = sc.parallelize([(1, "jan", 2016), (3, "nov", 2014), (16, "feb", 2014)])
rdd2 = sc.parallelize([(5, "dec", 2014), (1, "jan", 2016)])

# Use intersection to find common elements between rdd1 and rdd2
common = rdd1.intersection(rdd2)

# Use foreach to print each common element
common.foreach(lambda x: print(x))
common.collect()

## distinct()

In [None]:
# Create an RDD
rdd1 = sc.parallelize([(1, "jan", 2016), (3, "nov", 2014), (16, "feb", 2014), (3, "nov", 2014)])

# Use distinct to obtain distinct elements
result = rdd1.distinct()

# Collect and print the distinct elements
print(result.collect())

## groupByKey():

When we use groupByKey() on a dataset of (K, V) pairs, the data is shuffled according to the key value K in another RDD. In this transformation, lots of unnecessary data get to transfer over the network.

Spark provides the provision to save data to disk when there is more data shuffled onto a single executor machine than can fit in memory (RDD Caching and Persistence mechanism).


In [None]:
# Create an RDD
data = sc.parallelize([('k', 5), ('s', 3), ('s', 4), ('p', 7), ('p', 5), ('t', 8), ('k', 6)], 3)

# Use groupByKey to group elements by key
group = data.groupByKey().collect()

# Use foreach to print each group
for key, values in group:
    print("Key: {}, Values: {}".format(key, list(values)))

## reduceByKey():

When we use reduceByKey on a dataset (K, V), the pairs on the same machine with the same key are combined, before the data is shuffled.


In [None]:
# Create an array of words
words = ["one", "two", "two", "four", "five", "six", "six", "eight", "nine", "ten"]

# Create an RDD from the array and perform the required transformations
data = sc.parallelize(words).map(lambda w: (w, 1)).reduceByKey(lambda x, y: x + y)

# Use foreach to print each word and its count
# data.foreach(lambda x: print(x))
print(data.collect())

## sortByKey():

In [None]:
# Create an RDD with tuples
data = sc.parallelize([("maths", 52), ("english", 75), ("science", 82), ("computer", 65), ("maths", 85)])

# Use sortByKey to sort the RDD by the key (the first element in each tuple)
sorted_data = data.sortByKey()

# Use foreach to print each element in the sorted RDD
# sorted_data.foreach(lambda x: print(x))
sorted_data.collect()

## join():

The Join is database term. It combines the fields from two table using common values. join() operation in Spark is defined on pair-wise RDD. Pair-wise RDDs are RDD in which each element is in the form of tuples. Where the first element is key and the second element is the value.

The boon of using keyed data is that we can combine the data together. The join() operation combines two data sets on the basis of the key.

Note – The above code will parallelize the Array of String. It will then map each word with count 1, then reduceByKey will merge the count of values having the similar key.


In [None]:
# Create two RDDs
data = sc.parallelize([('A', 1), ('b', 2), ('c', 3)])
data2 = sc.parallelize([('A', 4), ('A', 6), ('b', 7), ('c', 3), ('c', 8)])

# Use join to join the two RDDs based on keys
result = data.join(data2)

# Collect and print the result
print(result.collect())

## coalesce():

To avoid full shuffling of data we use coalesce() function. In coalesce() we use existing partition so that less data is shuffled. Using this we can cut the number of partitions. Suppose, we have four nodes, and we want only two nodes. Then the data of extra nodes will be kept onto nodes which we kept.

The coalesce will decrease the number of partitions of the source RDD to numPartitions define in coalesce argument


In [None]:
# Create an RDD
rdd1 = sc.parallelize(["jan", "feb", "mar", "april", "may", "jun"], 3)

# Use coalesce to reduce the number of partitions to 2
result = rdd1.coalesce(2)

# Use foreach to print each element in the coalesced RDD
for x in result.collect():
    print(x)

## Actions - Fold:

Since RDD’s are partitioned, the fold() function takes full advantage of it by first aggregating elements in each partition and then aggregating results of all partitions to get the final result. The result of this function is the same as this RDD type.

fold() is like reduce(). Besides, it takes “zero value” as input, which is used for the initial call on each partition. But, the condition with zero value is that it should be the identity element of that operation. The key difference between fold() and reduce() is that, reduce() throws an exception for empty collection, but fold() is defined for empty collection.

For example, zero is an identity for addition; one is identity element for multiplication. The return type of fold() is same as that of the element of RDD we are operating on. For example, rdd.fold(0)((x, y) => x + y).


In [None]:
# Create an RDD
list_rdd = sc.parallelize([1, 2, 3, 4, 5, 3, 2])

# Get the number of partitions
print("Partitions: " + str(list_rdd.getNumPartitions()))

# Calculate the total sum
total_sum = list_rdd.fold(0, lambda acc, ele: acc + ele)
print("Total: " + str(total_sum))

# Calculate the sum with an initial value of 2
total_sum_with_init = list_rdd.fold(2, lambda acc, ele: acc + ele)
print("Total with init value 2: " + str(total_sum_with_init))

# Find the minimum
min_value = list_rdd.fold(0, lambda acc, ele: min(acc, ele))
print("Min: " + str(min_value))

# Find the maximum
max_value = list_rdd.fold(0, lambda acc, ele: max(acc, ele))
print("Max: " + str(max_value))

In [None]:
# Create an RDD
input_rdd = sc.parallelize([("Z", 1), ("A", 20), ("B", 30), ("C", 40), ("B", 30), ("B", 60)])

# Calculate the total sum
total_sum = input_rdd.fold(("", 0), lambda acc, ele: ("Total", acc[1] + ele[1]))
print("Total: " + str(total_sum))

# Calculate the minimum
min_value = input_rdd.fold(("", float('inf')), lambda acc, ele: ("Min", min(acc[1], ele[1])))
print("Min: " + str(min_value))

# Calculate the maximum
max_value = input_rdd.fold(("", float('-inf')), lambda acc, ele: ("Max", max(acc[1], ele[1])))
print("Max: " + str(max_value))



In below additionalMarks is an initial value to be used for each partition in folding. This value will be added to the int value of each record in the source RDD.


In [None]:
# Create an RDD
rdd1 = sc.parallelize([("maths", 10), ("science", 20)])

# Print the number of partitions
print("Partitions: " + str(rdd1.getNumPartitions()))

# Print the elements in each partition
print("Elements in each partition: " + str(rdd1.glom().collect()))

# Define additional marks
additional_marks = ("extra", 1)

# Calculate the sum of marks including additional marks
sum_result = rdd1.fold(additional_marks, lambda acc, marks: ("total", acc[1] + marks[1]))

# Print the result
print(sum_result)

*In above, 1 is added to both 10 and 20 to yield 11 and 21. 1 is also added for 6 partitions which are empty. This gives total: 11+21+6=38. Then, 1 is again added to the final aggregation of the partitions to yield 39.*

## CountByValue()

It returns the count of each unique value in an RDD as a local Map (as a Map to driver program) (value, countofvalues) pair

Care must be taken to use this API since it returns the value to driver program so it’s suitable only for small values.

For example, RDD has values {1, 2, 2, 3, 4, 5, 5, 6} in this RDD “rdd.countByValue()” will give the result {(1,1), (2,2), (3,1), (4,1), (5,2), (6,1)}


In [None]:
# Create an RDD
rdd1 = sc.parallelize([("HR", 5), ("RD", 4), ("ADMIN", 5), ("SALES", 4), ("SER", 6), ("MAN", 8)])

# Use countByValue to get the count of each unique value
result = rdd1.countByValue()

# Print the result
for key, count in result.items():
    print("{}: {}".format(key, count))

## Aggregate()

Since RDD’s are partitioned, the aggregate takes full advantage of it by first aggregating elements in each partition and then aggregating results of all partition to get the final result, and the result could be any type than the type of your RDD.


In [None]:
# Create an RDD
list_rdd = sc.parallelize([1, 2, 3, 4, 5, 3, 2])

# Define the zero value and two functions for aggregation
zero_value = 0
param0 = lambda acc, v: acc + v
param1 = lambda acc1, acc2: acc1 + acc2

# Use aggregate to calculate the sum of elements
result = list_rdd.aggregate(zero_value, param0, param1)

# Print the result
print("output 1 =>", result)



## Closing Spark Session <font color='red'>Important!</font> foo

In [None]:
# Stoping Spark Session
spark.stop()