<center>
    <img src="https://gitlab.com/ibm/skills-network/courses/placeholder101/-/raw/master/labs/module%201/images/IDSNlogo.png" width="300" alt="cognitiveclass.ai logo"  />
</center>


# **Getting Started with Apache Spark on IBM Cloud**


Estimated time needed: **15** minutes


![](http://spark.apache.org/images/spark-logo.png)


## Objectives


In this lab, we will cover some data partioning strategies and methods in Apache Spark and PySpark. We will start with creating the SparkContext and SparkSession. We then create an RDD and apply data partioning. Finally we demonstrate data partioning with dataframes and SparkSQL.

After this lab you will be able to:

*   Create the SparkContext and SparkSession
*   Create an RDD and apply data partioning to it
*   Create a dataframe and apply data partioning to it


***


## Setup


For this lab, we are going to be using Python and Spark (PySpark). These libraries should be installed in your lab environment or in SN Labs.


In [3]:
# PySpark is the Spark API for Python. In this lab, we use PySpark to initialize the spark context. 
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

## Exercise 1 -  Spark Context and Spark Session


In this exercise, you will create the Spark Context and initialize the Spark session needed for SparkSQL and DataFrames.
SparkContext is the entry point for Spark applications and contains functions to create RDDs such as `parallelize()`. SparkSession is needed for SparkSQL and DataFrame operations.


#### Task 1: Creating the spark session and context


In [4]:
# Creating a spark context class
sc = SparkContext()

# Creating a spark session
spark = SparkSession \
    .builder \
    .appName("Python Spark IBM Cloud Example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=pyspark-shell, master=spark://jkg-deployment-5cbb3952-d467-45e6-bf30-2052fea47df1-54fc7btns9k:7077) created by getOrCreate at /opt/ibm/kernelScript/python/shell.py:69 

#### Task 2: Initialize Spark session

To work with dataframes we just need to verify that the spark session instance has been created.
Feel free to click on the "Spark UI" button to explore the Spark UI elements.


In [5]:
spark

## Exercise 2: RDDs

In this exercise we work with Resilient Distributed Datasets (RDDs). RDDs are Spark's primitive data abstraction and we use concepts from functional programming to create and manipulate RDDs.


#### Task 1: Create an RDD.

For demonstration purposes, we create an RDD here by calling `sc.parallelize()`\
We create an RDD which has integers from 1 to 10.

We then get the number of partitions using the `getNumPartitions()` function and the partitions using the `glom()` function.


In [6]:
nums = [i for i in range(10)]

rdd = sc.parallelize(nums)
    
print("Number of partitions: {}".format(rdd.getNumPartitions()))
print("Partitioner: {}".format(rdd.partitioner))
print("Partitions structure: {}".format(rdd.glom().collect()))

Number of partitions: 2
Partitioner: None
Partitions structure: [[0, 1, 2, 3, 4], [5, 6, 7, 8, 9]]


In the above cell we can see the default partitions done for the RDD. However, we can change that by passing in an optional second argument to the `parallelize` function.
Let us now try with 2 and 15 partitions and see how they look like in memory.


In [7]:
rdd = sc.parallelize(nums, 2)
    
print("Default parallelism: {}".format(sc.defaultParallelism))
print("Number of partitions: {}".format(rdd.getNumPartitions()))
print("Partitioner: {}".format(rdd.partitioner))
print("Partitions structure: {}".format(rdd.glom().collect()))

Default parallelism: 2
Number of partitions: 2
Partitioner: None
Partitions structure: [[0, 1, 2, 3, 4], [5, 6, 7, 8, 9]]


In [8]:
rdd.count()

10

In [9]:
rdd = sc.parallelize(nums, 15)

print("Number of partitions: {}".format(rdd.getNumPartitions()))
print("Partitioner: {}".format(rdd.partitioner))
print("Partitions structure: {}".format(rdd.glom().collect()))

Number of partitions: 15
Partitioner: None
Partitions structure: [[], [0], [1], [], [2], [3], [], [4], [5], [], [6], [7], [], [8], [9]]


Anoter way to partition data is by using the `partitionBy()` function. In this case, the dataset needs to be a tuple with a key/value pair as the default partioner uses a hash for the key to assign elements to a parition.


In [10]:
rdd = sc.parallelize(nums) \
        .map(lambda el: (el, el)) \
        .partitionBy(2) \
        .persist()
    
print("Number of partitions: {}".format(rdd.getNumPartitions()))
print("Partitioner: {}".format(rdd.partitioner))
print("Partitions structure: {}".format(rdd.glom().collect()))

j=0
for i in rdd.glom().collect():
    j+=1
    print("partition: " + str(j) + " "+ str(i))

Number of partitions: 2
Partitioner: <pyspark.rdd.Partitioner object at 0x7f87cbadcad0>
Partitions structure: [[(6, 6), (8, 8), (0, 0), (2, 2), (4, 4)], [(1, 1), (3, 3), (5, 5), (7, 7), (9, 9)]]
partition: 1 [(6, 6), (8, 8), (0, 0), (2, 2), (4, 4)]
partition: 2 [(1, 1), (3, 3), (5, 5), (7, 7), (9, 9)]


You can see that now the elements are distributed differently. A few interesting things happened:

```
parallelize(nums) - we are transforming Python array into RDD with no partitioning scheme,
map(lambda el: (el, el)) - transforming data into the form of a tuple,
partitionBy(2) - splitting data into 2 chunks using default hash partitioner
```

Explicit assignment of partition locations makes the hashing strategy more apparent. The use of the % function assigns it to the correct partition.


Let us now create a more practical dataset of transactions. We have 8 transactions from 4 different geographies as shown below.


In [11]:
transactions = [
    {'name': 'Bob', 'amount': 100, 'country': 'United Kingdom'},
    {'name': 'James', 'amount': 15, 'country': 'United Kingdom'},
    {'name': 'Marek', 'amount': 51, 'country': 'Poland'},
    {'name': 'Johannes', 'amount': 200, 'country': 'Germany'},
    {'name': 'Thomas', 'amount': 30, 'country': 'Germany'},
    {'name': 'Paul', 'amount': 75, 'country': 'Poland'},
    {'name': 'Pierre', 'amount': 120, 'country': 'France'},
    {'name': 'Frank', 'amount': 180, 'country': 'France'}
]

We know that further analysis will be performed analyzing many similar records within the same country. To optimize network traffic it seems to be a good idea to put records from one country in one node. To meet this requirement, we will need a custom partitioner: Custom partitioner — function returning an integer for given object (tuple key).


In [13]:
# Dummy implementation assuring that data for each country is in one partition
def country_partitioner(country):
    return hash(country)% (10**7+1)
    #return portable_hash(country)
    

# Validate results
print(country_partitioner("Poland"))
print(country_partitioner("Germany"))
print(country_partitioner("United Kingdom"))
print(country_partitioner("France"))

5901469
1535451
6933473
2199756


We can see that our custom partitioner creates a unique hash for each country name so it can be used to `partitionBy` our dataset.


In [14]:
rdd = sc.parallelize(transactions) \
        .map(lambda el: (el['country'], el)) \
        .partitionBy(5, country_partitioner)
    
print("Number of partitions: {}".format(rdd.getNumPartitions()))
print("Partitioner: {}".format(rdd.partitioner))
print("Partitions structure: {}".format(rdd.glom().collect()))

print("\n--\n")
for i, j in enumerate(rdd.glom().collect()):
    print("\npartition: " + str(i+1) + "\n"+ str(j))

Number of partitions: 5
Partitioner: <pyspark.rdd.Partitioner object at 0x7f87cbb969d0>
Partitions structure: [[('United Kingdom', {'name': 'Bob', 'amount': 100, 'country': 'United Kingdom'}), ('United Kingdom', {'name': 'James', 'amount': 15, 'country': 'United Kingdom'})], [('France', {'name': 'Pierre', 'amount': 120, 'country': 'France'}), ('France', {'name': 'Frank', 'amount': 180, 'country': 'France'})], [('Germany', {'name': 'Thomas', 'amount': 30, 'country': 'Germany'}), ('Germany', {'name': 'Johannes', 'amount': 200, 'country': 'Germany'})], [('Poland', {'name': 'Marek', 'amount': 51, 'country': 'Poland'}), ('Poland', {'name': 'Paul', 'amount': 75, 'country': 'Poland'})], []]

--


partition: 1
[('United Kingdom', {'name': 'Bob', 'amount': 100, 'country': 'United Kingdom'}), ('United Kingdom', {'name': 'James', 'amount': 15, 'country': 'United Kingdom'})]

partition: 2
[('France', {'name': 'Pierre', 'amount': 120, 'country': 'France'}), ('France', {'name': 'Frank', 'amount': 18

Using the partitioning scheme, we can now carry out calculations such as total revenue/sales as shown below.


In [15]:
def sum_sales(iterator):
    yield sum(transaction[1]['amount'] for transaction in iterator)

In [16]:
by_country = sc.parallelize(transactions) \
        .map(lambda el: (el['country'], el)) \
        .partitionBy(5, country_partitioner)
    
print("Partitions structure: {}".format(by_country.glom().collect()))

# Sum sales in each partition
sum_amounts = by_country \
    .mapPartitions(sum_sales) \
    .collect()

print("Total sales for each partition: {}".format(sum_amounts))

Partitions structure: [[('United Kingdom', {'name': 'Bob', 'amount': 100, 'country': 'United Kingdom'}), ('United Kingdom', {'name': 'James', 'amount': 15, 'country': 'United Kingdom'})], [('France', {'name': 'Pierre', 'amount': 120, 'country': 'France'}), ('France', {'name': 'Frank', 'amount': 180, 'country': 'France'})], [('Germany', {'name': 'Johannes', 'amount': 200, 'country': 'Germany'}), ('Germany', {'name': 'Thomas', 'amount': 30, 'country': 'Germany'})], [('Poland', {'name': 'Marek', 'amount': 51, 'country': 'Poland'}), ('Poland', {'name': 'Paul', 'amount': 75, 'country': 'Poland'})], []]
Total sales for each partition: [115, 300, 230, 126, 0]


## Exercise 2: DataFrames

In this exercise we work with DataFrames.


#### Task 1: Create the DataFrame

We will now create a DataFrame from the previous "transactions" list we created.


In [17]:
df = spark.createDataFrame(transactions)



In [18]:
df.show()

+------+--------------+--------+
|amount|       country|    name|
+------+--------------+--------+
|   100|United Kingdom|     Bob|
|    15|United Kingdom|   James|
|    51|        Poland|   Marek|
|   200|       Germany|Johannes|
|    30|       Germany|  Thomas|
|    75|        Poland|    Paul|
|   120|        France|  Pierre|
|   180|        France|   Frank|
+------+--------------+--------+



In [19]:
print("Number of partitions: {}".format(df.rdd.getNumPartitions()))
print("Partitioner: {}".format(rdd.partitioner))
print("Partitions structure: {}".format(df.rdd.glom().collect()))

Number of partitions: 2
Partitioner: <pyspark.rdd.Partitioner object at 0x7f87cbb969d0>
Partitions structure: [[Row(amount=100, country='United Kingdom', name='Bob'), Row(amount=15, country='United Kingdom', name='James'), Row(amount=51, country='Poland', name='Marek'), Row(amount=200, country='Germany', name='Johannes')], [Row(amount=30, country='Germany', name='Thomas'), Row(amount=75, country='Poland', name='Paul'), Row(amount=120, country='France', name='Pierre'), Row(amount=180, country='France', name='Frank')]]


In [20]:
for i, j in enumerate(df.rdd.glom().collect()):
    print("partition: " + str(i+1) + "\n"+ str(j))

partition: 1
[Row(amount=100, country='United Kingdom', name='Bob'), Row(amount=15, country='United Kingdom', name='James'), Row(amount=51, country='Poland', name='Marek'), Row(amount=200, country='Germany', name='Johannes')]
partition: 2
[Row(amount=30, country='Germany', name='Thomas'), Row(amount=75, country='Poland', name='Paul'), Row(amount=120, country='France', name='Pierre'), Row(amount=180, country='France', name='Frank')]


For dataframes we can repartition the dataset by column using the `repartition()` function. The cell below shows how we can partition the dataset by country.


In [21]:
# Repartition by column
df2 = df.repartition(10,"country")

print("\nAfter 'repartition()'")
print("Number of partitions: {}".format(df2.rdd.getNumPartitions()))
print("Partitioner: {}".format(df2.rdd.partitioner))
print("Partitions structure: {}".format(df2.rdd.glom().collect()))


After 'repartition()'
Number of partitions: 10
Partitioner: None
Partitions structure: [[Row(amount=120, country='France', name='Pierre'), Row(amount=180, country='France', name='Frank')], [], [Row(amount=200, country='Germany', name='Johannes'), Row(amount=30, country='Germany', name='Thomas')], [], [Row(amount=51, country='Poland', name='Marek'), Row(amount=75, country='Poland', name='Paul')], [Row(amount=100, country='United Kingdom', name='Bob'), Row(amount=15, country='United Kingdom', name='James')], [], [], [], []]


In [22]:
for i, j in enumerate(df2.rdd.glom().collect()):
    print("partition: " + str(i+1) + "\n"+ str(j))

partition: 1
[Row(amount=120, country='France', name='Pierre'), Row(amount=180, country='France', name='Frank')]
partition: 2
[]
partition: 3
[Row(amount=200, country='Germany', name='Johannes'), Row(amount=30, country='Germany', name='Thomas')]
partition: 4
[]
partition: 5
[Row(amount=51, country='Poland', name='Marek'), Row(amount=75, country='Poland', name='Paul')]
partition: 6
[Row(amount=100, country='United Kingdom', name='Bob'), Row(amount=15, country='United Kingdom', name='James')]
partition: 7
[]
partition: 8
[]
partition: 9
[]
partition: 10
[]


These are some of the partitioning strategies for dataframes and RDDs. As the data size increases, the paritioning strategies become more important and can yield significant performance benefits.


***


## Authors


[Karthik Muthuraman](https://www.linkedin.com/in/karthik-muthuraman/?utm_medium=Exinfluencer&utm_source=Exinfluencer&utm_content=000026UJ&utm_term=10006555&utm_id=NA-SkillsNetwork-Channel-SkillsNetworkCoursesIBMBD0225ENSkillsNetwork25716109-2021-01-01)


### Other Contributors


[Jerome Nilmeier](https://github.com/nilmeier)


## Change Log


| Date (YYYY-MM-DD) | Version | Changed By | Change Description |
| ----------------- | ------- | ---------- | ------------------ |
| 2021-07-02        | 0.2     | Karthik    | Beta launch        |
| 2021-06-30        | 0.1     | Karthik    | First Draft        |


Copyright © 2021 IBM Corporation. All rights reserved.
