# **Note:**

# **This notebook is a demo of some spark RDD commands from Ch 11 of Asllani's Big Data Technologies for Business.**

In [8]:
!pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/45/b0/9d6860891ab14a39d4bddf80ba26ce51c2f9dc4805e5c6978ac0472c120a/pyspark-3.1.1.tar.gz (212.3MB)
[K     |████████████████████████████████| 212.3MB 72kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 48.2MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.1-py2.py3-none-any.whl size=212767604 sha256=c956e3b5cba63dceba7e749684f4e645779389565f72646680a6104451bd7a14
  Stored in directory: /root/.cache/pip/wheels/0b/90/c0/01de724414ef122bd05f056541fb6a0ecf47c7ca655f8b3c0f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.1


**Creating SparkSession object**

Spark session objects can be created by using SparkSession.builder.getorCreated(). You also have to specify APP_NAME for your sparkSession that you want to work on.

In [9]:
from pyspark.sql import SparkSession
APP_NAME = "Ch11_RDD_Example"
spark = SparkSession.builder.appName(APP_NAME).getOrCreate()
spark

**Mounting with your google drive**

Mount the notebook to the drive to access the data files that are stored in your google drive.

In [4]:
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


**Import necessary libararies**

SparkSession is used to create spark dataframe

In [4]:
#from pyspark.sql import SparkSession

Checking the files, that are existing in the path that we want to access.

In [10]:
import os
os.listdir('./gdrive/My Drive/Asllani/ch11_data')

['myBlog.txt',
 'SalesRecords.csv',
 'EmployeePT.tsv',
 'customerFiles.json',
 '.DS_Store',
 'items',
 'items2']

#**Loading Data into RDDs and Saving RDDs to Files**

RDDs have several data sources. RDDs can read data from files in text or other formats. They also can capture data from memory, other RDDs, or DataFrames and Datasets. You can use SparkContext.textFile to read a text file where each line is terminated with a line break.

Load the text file

The commands create an RDD named myBlogRDD with elements that are the rows of the text file myBlog.txt. To show the results of myBlogRDD, use the following print commands


In [11]:
myBlogRDD = spark.sparkContext.textFile('./gdrive/My Drive/Asllani/ch11_data/myBlog.txt')
print(myBlogRDD.collect())

['Big Data techniques offer several advantages over traditional techniques.', 'Hadoop is one Big Data analytics platform.  Hadoop has several ecosystems to perform Big Data Analytics.', 'Spark is one of such ecosystems.  Spark can also run on Hadoop for Big Data Analitycs.']


Using wholeTextFiles syntax, as follows, you map the content of the file into RDD elements, where each element contains the information for one customer

For files with a multiple-line input format, such as JSON or XML, you can use **wholeTextFiles** instead of textFile.

In [7]:
customerRDD = spark.sparkContext.wholeTextFiles('./gdrive/My Drive/Asllani/ch11_data/customerFiles.json')
customerRDD.collect()

[('file:/content/gdrive/My Drive/Asllani/ch11_data/customerFiles.json',
  '{\n"FirstName":"John",\n"LastName":"McDonalds",\n"Phone":"+1(223)-324-1212"\n}\n{\n"FirstName":"Mary",\n"LastName":"Jones",\n"Phone":"+355-69-234653"\n}\n')]

You can also create RDDs from collections, espicially when generating data programmatically. For example the following example can be used to create an array of strings called MyNames.

In [8]:
myNames = ["Frank","Steve","Lisa","Mark"]
myNames

['Frank', 'Steve', 'Lisa', 'Mark']

PySpark **parallelize()** is a function in SparkContext and is used to create an RDD from a list collection.

Then, by using the parallelize command, you can create an RDD that contains the names:

Spark **collect()** and collectAsList() are action operation that is used to retrieve all the elements of the RDD/DataFrame/Dataset (from all nodes) to the driver node.


In [9]:
myNamesRDD = spark.sparkContext.parallelize(myNames)
myNamesRDD.collect()
#myNamesRDD

['Frank', 'Steve', 'Lisa', 'Mark']

You can use the following command to save the myNamesRDD as a text file into the myData directory. The destination directory cannot already exist.

In [11]:
myNamesRDD.saveAsTextFile("./gdrive/My Drive/Asllani/rdd5.txt")

In [12]:
os.listdir('./gdrive/My Drive/Asllani')

['ch10_data',
 'ch11_data',
 'rdd4.txt',
 'ch12_example.py',
 'ch12_data',
 'address_list3',
 'ch10_DF_example.ipynb',
 'ch11_RDD_example.ipynb',
 'rdd5.txt']

#**Transforming Data with RDDs**

Load the data from the salesrecords csv file.

**collect()** is an action operation and returns an arrary of all 

---

elements.

In [13]:
allSalesRDD = spark.sparkContext.textFile("./gdrive/My Drive/Asllani/ch11_data/SalesRecords.csv")
allSalesRDD.collect()

['Region,Country,Item Type,Sales Channel,Order Priority,Order Date,Order ID,Ship Date,Units Sold,Unit Price,Unit Cost,Total Revenue,Total Cost,Total Profit',
 'Australia and Oceania,Tuvalu,Baby Food,Offline,H,5/28/10,669165933,6/27/10,9925,255.28,159.42,2533654,1582243.5,951410.5',
 'Central America and the Caribbean,Grenada,Cereal,Online,C,8/22/12,963881480,9/15/12,2804,205.7,117.11,576782.8,328376.44,248406.36',
 'Europe,Russia,Office Supplies,Offline,L,5/2/14,341417157,5/8/14,1779,651.21,524.96,1158502.59,933903.84,224598.75',
 'Sub-Saharan Africa,Sao Tome and Principe,Fruits,Online,C,6/20/14,514321792,7/5/14,8102,9.33,6.92,75591.66,56065.84,19525.82',
 'Sub-Saharan Africa,Rwanda,Office Supplies,Offline,L,2/1/13,115456712,2/6/13,5062,651.21,524.96,3296425.02,2657347.52,639077.5',
 'Australia and Oceania,Solomon Islands,Baby Food,Online,C,2/4/15,547995746,2/21/15,2974,255.28,159.42,759202.72,474115.08,285087.64',
 'Sub-Saharan Africa,Angola,Household,Offline,M,4/23/11,135425221,4/27/

Print the first most data from the csv file

In [14]:
allSalesRDD.first()

'Region,Country,Item Type,Sales Channel,Order Priority,Order Date,Order ID,Ship Date,Units Sold,Unit Price,Unit Cost,Total Revenue,Total Cost,Total Profit'

The following command should generate an integer (Long) with the number of records in the file:

In [15]:
allSalesRDD.count()

101

In [16]:
header = allSalesRDD.first()
salesRDD = allSalesRDD.filter (lambda line: line != header)
salesRDD.count()

100

In [17]:
offlineSalesRDD =salesRDD.filter(lambda line : "Offline" in line)
offlineSalesRDD.count()


50

In [18]:
First5offlineSalesRDD = offlineSalesRDD.take(5)
print(First5offlineSalesRDD)

['Australia and Oceania,Tuvalu,Baby Food,Offline,H,5/28/10,669165933,6/27/10,9925,255.28,159.42,2533654,1582243.5,951410.5', 'Europe,Russia,Office Supplies,Offline,L,5/2/14,341417157,5/8/14,1779,651.21,524.96,1158502.59,933903.84,224598.75', 'Sub-Saharan Africa,Rwanda,Office Supplies,Offline,L,2/1/13,115456712,2/6/13,5062,651.21,524.96,3296425.02,2657347.52,639077.5', 'Sub-Saharan Africa,Angola,Household,Offline,M,4/23/11,135425221,4/27/11,4187,668.27,502.54,2798046.49,2104134.98,693911.51', 'Sub-Saharan Africa,Republic of the Congo,Personal Care,Offline,M,7/14/15,770463311,8/25/15,6070,81.73,56.67,496101.1,343986.9,152114.2']


Use map() to covert each line as an element of an array

In [19]:
arraySalesRDD = salesRDD.map (lambda line: line.split(","))
arraySalesRDD.count()

100

In [20]:
First5ArraySalesRDD = arraySalesRDD.take(5)
print(First5ArraySalesRDD)

[['Australia and Oceania', 'Tuvalu', 'Baby Food', 'Offline', 'H', '5/28/10', '669165933', '6/27/10', '9925', '255.28', '159.42', '2533654', '1582243.5', '951410.5'], ['Central America and the Caribbean', 'Grenada', 'Cereal', 'Online', 'C', '8/22/12', '963881480', '9/15/12', '2804', '205.7', '117.11', '576782.8', '328376.44', '248406.36'], ['Europe', 'Russia', 'Office Supplies', 'Offline', 'L', '5/2/14', '341417157', '5/8/14', '1779', '651.21', '524.96', '1158502.59', '933903.84', '224598.75'], ['Sub-Saharan Africa', 'Sao Tome and Principe', 'Fruits', 'Online', 'C', '6/20/14', '514321792', '7/5/14', '8102', '9.33', '6.92', '75591.66', '56065.84', '19525.82'], ['Sub-Saharan Africa', 'Rwanda', 'Office Supplies', 'Offline', 'L', '2/1/13', '115456712', '2/6/13', '5062', '651.21', '524.96', '3296425.02', '2657347.52', '639077.5']]


Create a new RDD that just stores item names, by picking up only the 3rd string that is holding the items

In [21]:
itemsRDD = salesRDD.map(lambda line: line.split(",")[2])
itemsRDD.take(10)

['Baby Food',
 'Cereal',
 'Office Supplies',
 'Fruits',
 'Office Supplies',
 'Baby Food',
 'Household',
 'Vegetables',
 'Personal Care',
 'Cereal']

In [22]:
itemsRDD.saveAsTextFile("./gdrive/My Drive/Asllani/ch11_data/items2")
os.listdir('./gdrive/My Drive/Asllani/ch11_data')

['myBlog.txt',
 'SalesRecords.csv',
 'EmployeePT.tsv',
 'customerFiles.json',
 '.DS_Store',
 'items',
 'items2']

#**Creating a DataFrame from an RDD**

Start by invoking the Spark session and importing several libraries that contain spark.sql functions that are needed to create the DataFrames. You can import the needed functions by using the following commands:

In [23]:
#Import libraries that contain SQL fucntions
from pyspark.sql.types import *

Since RDDs have no previously defined structures, you need to define a schema to transform an RDD into a DataFrame. You will define a schema, called mySchema, that has three arrays: Country of type string, Item also of type string, and Quantity of type long. You can create the schema by using the following commands:

In [30]:
#Define a schema to add to RDD
mySchema = (StructType([StructField("Country", StringType (), True), 
                        StructField("Item", StringType(), True),
                        StructField("Quantity", IntegerType(), True)]))
mySchema

StructType(List(StructField(Country,StringType,true),StructField(Item,StringType,true),StructField(Quantity,IntegerType,true)))

Now create the RDD named allSalesRDD. As shown below, you first read the data  SalesRecords.csv file. After reading the data, you remove the header and proceed only with the rows that contain data. You can do that using the following commands:

In [2]:
allSalesRDD = spark.sparkContext.textFile("./gdrive/My Drive/Asllani/ch11_data/SalesRecords.csv")
header = allSalesRDD.first()
salesRDD = allSalesRDD.filter(lambda line: line != header)

NameError: ignored

In [1]:
allSalesRDD = sc.textFile("./gdrive/My Drive/Asllani/ch11_data/SalesRecords.csv")
header = allSalesRDD.first()
salesRDD = allSalesRDD.filter(lambda line: line != header)

NameError: ignored

Spark map() is a **transformation operation** that is used to apply the transformation on every element of RDD, DataFrame, and Dataset and finally returns a new RDD/Dataset respectively.

Then you apply two map functions: first, you map the line into a set of fields by splitting it at each comma. Second, you map the values of each line to load only the values in the second, third, and ninth fields (values(1), values(2), and values(8), respectively). Since the quantity field is an integer type, you apply the toLong function to store the value as a long integer when using Scala, and parse int when using Python:



In [34]:
customSalesRDD = (salesRDD.map(lambda line: line.split(","))
                          .map(lambda values: [str(values[1]), str(values[2]),int(values[8])]))

In [35]:
customSalesRDD.collect()

[['Tuvalu', 'Baby Food', 9925],
 ['Grenada', 'Cereal', 2804],
 ['Russia', 'Office Supplies', 1779],
 ['Sao Tome and Principe', 'Fruits', 8102],
 ['Rwanda', 'Office Supplies', 5062],
 ['Solomon Islands', 'Baby Food', 2974],
 ['Angola', 'Household', 4187],
 ['Burkina Faso', 'Vegetables', 8082],
 ['Republic of the Congo', 'Personal Care', 6070],
 ['Senegal', 'Cereal', 6593],
 ['Kyrgyzstan', 'Vegetables', 124],
 ['Cape Verde', 'Clothes', 4168],
 ['Bangladesh', 'Clothes', 8263],
 ['Honduras', 'Household', 8974],
 ['Mongolia', 'Personal Care', 4901],
 ['Bulgaria', 'Clothes', 1673],
 ['Sri Lanka', 'Cosmetics', 6952],
 ['Cameroon', 'Beverages', 5430],
 ['Turkmenistan', 'Household', 3830],
 ['East Timor', 'Meat', 5908],
 ['Norway', 'Baby Food', 7450],
 ['Portugal', 'Baby Food', 1273],
 ['Honduras', 'Snacks', 2225],
 ['New Zealand', 'Fruits', 2187],
 ['Moldova ', 'Personal Care', 5070],
 ['France', 'Cosmetics', 1815],
 ['Kiribati', 'Fruits', 5398],
 ['Mali', 'Fruits', 5822],
 ['Norway', 'Beverag

Now, use the createDataFrame function to transform the RDD into a DataFrame. The function has two input parameters: salesRDD and mySchema. That means the newly created DataFrame named salesDF uses the data recorded in the RDD named salesRDD and stores it using the schema named mySchema.

You can show the first five records of the DataFrame by using the following commands:

In [None]:
salesDF = spark.createDataFrame(customSalesRDD, mySchema)
salesDF.show(5)

+--------------------+---------------+--------+
|             Country|           Item|Quantity|
+--------------------+---------------+--------+
|              Tuvalu|      Baby Food|    9925|
|             Grenada|         Cereal|    2804|
|              Russia|Office Supplies|    1779|
|Sao Tome and Prin...|         Fruits|    8102|
|              Rwanda|Office Supplies|    5062|
+--------------------+---------------+--------+
only showing top 5 rows



Now you can create an RDD from a DataFrame. The process is straightforward, since the RDDs do have a static schema. The following commands create the RDD from the DataFrame and print the records of the newly created RDD:

In [None]:
# Create an RDD from a DF
reCreatedSalesRDD = salesDF.rdd
reCreatedSalesRDD.take(5)

[Row(Country='Tuvalu', Item='Baby Food', Quantity=9925),
 Row(Country='Grenada', Item='Cereal', Quantity=2804),
 Row(Country='Russia', Item='Office Supplies', Quantity=1779),
 Row(Country='Sao Tome and Principe', Item='Fruits', Quantity=8102),
 Row(Country='Rwanda', Item='Office Supplies', Quantity=5062)]

#Pair RDDs and MapReduce

A pair RDD is a special RDD in the form of (key, value) pair, which is useful for MapReduce algorithm.

The formatting can be achieved using several functions, such as map, flatMap, flatMapValues, or keyBy. For example, the following commands use the map function to create a pair RDD from a tab-delimited file named EmployeePT.tsv that contains the records id, name, age, and rating

As a result, the pair RDD named empRDD contains the key-value pairs

In [None]:
empRDD = spark.sparkContext.textFile("./gdrive/My Drive/Asllani/ch11_data/EmployeePT.tsv").map(lambda line: line.split('\t')).map(lambda fields: (str(fields[0]),str(fields[1])))
print(empRDD.collect())

[('01', 'Amanda'), ('02', 'Benjamin'), ('03', 'Chris'), ('04', 'James'), ('05', 'Amanda'), ('06', 'Andy'), ('07', 'Gabriel')]


Assume that you want to create another pair RDD that contains the same key, but the value is a combination of both name and age, separated by a dash. In that case, you would execute the following commands

In [None]:
empAgeRDD = spark.sparkContext.textFile("./gdrive/My Drive/Asllani/ch11_data/EmployeePT.tsv").map(lambda line: line.split('\t')).map(lambda fields: (str(fields[0]),str(fields[1])+ "-" + str(fields[2])))
print(empAgeRDD.collect())

[('01', 'Amanda-21'), ('02', 'Benjamin-'), ('03', 'Chris-21'), ('04', 'James-24'), ('05', 'Amanda-19'), ('06', 'Andy-23'), ('07', 'Gabriel-24')]


#**WordCount with Pair RDDs**

You want to find the most used words in the myBlog.txt document. To do that, you perform a chain of RDD transformations. Start by reading the document and storing it in the WCreadRDD. Then, check whether the file has been read correctly or not by printing the newly created RDD, as shown here:

In [None]:
WCread = spark.sparkContext.textFile("./gdrive/My Drive/Asllani/ch11_data/myBlog.txt")
WCread.collect()

['Big Data techniques offer several advantages over traditional techniques.',
 'Hadoop is one Big Data analytics platform.  Hadoop has several ecosystems to perform Big Data Analytics.',
 'Spark is one of such ecosystems.  Spark can also run on Hadoop for Big Data Analitycs.']

The following commands use the flatMap function, which splits each word separated by a single space and creates a new line for it. This RDD transformation creates a new RDD named WCsplit. For demonstration purposes, use take(10) to show only the first ten lines of the new RDD

In [None]:
#Word count program using RDD
WC = (WCread.flatMap(lambda line: str(line).split(' '))
            .map(lambda word: (word, 1))
            .reduceByKey(lambda v1,v2: v1+v2)
            .sortBy((lambda x: x[1]), False))
WC.collect()

[('Big', 4),
 ('Data', 4),
 ('Hadoop', 3),
 ('several', 2),
 ('is', 2),
 ('', 2),
 ('Spark', 2),
 ('one', 2),
 ('techniques', 1),
 ('advantages', 1),
 ('traditional', 1),
 ('analytics', 1),
 ('platform.', 1),
 ('ecosystems', 1),
 ('perform', 1),
 ('Analytics.', 1),
 ('of', 1),
 ('run', 1),
 ('offer', 1),
 ('over', 1),
 ('techniques.', 1),
 ('has', 1),
 ('to', 1),
 ('such', 1),
 ('ecosystems.', 1),
 ('can', 1),
 ('also', 1),
 ('on', 1),
 ('for', 1),
 ('Analitycs.', 1)]

In [None]:
WCsplit = WCread.flatMap(lambda line: str(line).split(' '))
WCsplit.take(10)

['Big',
 'Data',
 'techniques',
 'offer',
 'several',
 'advantages',
 'over',
 'traditional',
 'techniques.',
 'Hadoop']

In this step, you can use a map function to transform each line of the RDD into a pair RDD, where the key is the word itself, and the value is the number 1. The new RDD is a pair RDD named WCword, and the result of the take(10)

In [None]:
WCword = WCsplit.map(lambda word: (word, 1))
WCword.take(10)

[('Big', 1),
 ('Data', 1),
 ('techniques', 1),
 ('offer', 1),
 ('several', 1),
 ('advantages', 1),
 ('over', 1),
 ('traditional', 1),
 ('techniques.', 1),
 ('Hadoop', 1)]

The reduceByKey function in this step searches for identical keys and generates a new key-value element with the same key and with the value equal to the sum of the first two values (v1+v2). This process continues until all identical keys are found, so the new pair RDD named WCreduce contains a list of all words (keys) and their frequency (the sum of all 1s)

In [None]:
WCreduce = WCword.reduceByKey(lambda v1,v2: v1+v2)
WCreduce.take(10)

[('techniques', 1),
 ('several', 2),
 ('advantages', 1),
 ('traditional', 1),
 ('is', 2),
 ('analytics', 1),
 ('platform.', 1),
 ('', 2),
 ('ecosystems', 1),
 ('perform', 1)]

Finally, in this step, you will apply the sort function to create a new pair RDD named finalResult, which orders the elements of the previous RDD by the value element in the key-value structure. By default, the sort function orders the list in ascending order, so by providing false (or False for Python) as a parameter, the list is sorted in descending order.

In [None]:
finalResult = WCreduce
finalResult.collect().sortBy((lambda x: x[1]), False)

[('Big', 4),
 ('Data', 4),
 ('Hadoop', 3),
 ('several', 2),
 ('is', 2),
 ('', 2),
 ('Spark', 2),
 ('one', 2),
 ('techniques', 1),
 ('advantages', 1),
 ('traditional', 1),
 ('analytics', 1),
 ('platform.', 1),
 ('ecosystems', 1),
 ('perform', 1),
 ('Analytics.', 1),
 ('of', 1),
 ('run', 1),
 ('offer', 1),
 ('over', 1),
 ('techniques.', 1),
 ('has', 1),
 ('to', 1),
 ('such', 1),
 ('ecosystems.', 1),
 ('can', 1),
 ('also', 1),
 ('on', 1),
 ('for', 1),
 ('Analitycs.', 1)]