# Resilient Distributed Datasets (RDDs)

Spark revolves around the concept of a resilient distributed dataset (RDD), which is a<b> fault-tolerant collection of elements that can be operated on in parallel.</b> <br>There are two ways to create RDDs: <ul><li>parallelizing an existing collection in your driver program</li>, <li> referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.</li></ul>

<br><br>
<b>Parallelized collections</b> are created by calling SparkContext’s parallelize method on an existing iterable or collection in your driver program. The elements of the collection are copied to form a distributed dataset that can be operated on in parallel

For example, here is how to create a parallelized collection holding the numbers 1 to 5:

In [1]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import StringType
from pyspark import SQLContext
from itertools import islice
from pyspark.sql.functions import col

In [2]:
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)

Once created, the distributed dataset (distData) can be operated on in parallel. <br>For example, we can call distData.reduce(lambda a, b: a + b) to add up the elements of the list;

In [3]:
distData.reduce(lambda a, b: a + b) 

15

# External Datasets

PySpark can create distributed datasets from any storage source supported by Hadoop, including your local file system, HDFS, Cassandra, HBase, Amazon S3, etc. Spark supports text files, SequenceFiles, and any other Hadoop InputFormat.

Text file RDDs can be created using SparkContext’s textFile method. This method takes an URI for the file (either a local path on the machine, or a hdfs://, s3n://, etc URI) and reads it as a collection of lines. Below is an example invocation using sc that's loaded with spark by default

In [4]:
file = sc.textFile("file.txt")

<b>NOTE:</b> <br>
   The above code defines a base RDD from an external file. This dataset is not loaded in memory or otherwise acted on: lines is merely a pointer to the file.

In [5]:
file.take(10) #take the first 10 lines of that file

['<?xml version=\\"1.0\\" encoding=\\"utf-8\\"?>',
 '<rss version=\\"2.0\\" xmlns:content=\\"http://purl.org/rss/1.0/modules/content/\\" xmlns:dc=\\"http://purl.org/dc/elements/1.1/\\" xmlns:media=\\"http://search.yahoo.com/mrss/\\" xmlns:snf=\\"http://www.smartnews.be/snf\\">',
 '<channel>',
 '<title>Kyodo News</title>',
 '<link>https://this.kiji.is/-/units/288215928582407265</link>',
 '<description>Kyodo News</description>',
 '<pubDate>Tue, 19 Nov 2019 15:55:37 +0000</pubDate>',
 '<language>en</language>',
 '<copyright>(c) 一般社団法人共同通信社</copyright>',
 '<ttl>1</ttl>']

# <b>Count all the words in this text file.</b>

In [6]:
counts = file.flatMap(lambda line:line.split(" ")).map(lambda word: (word,1)).reduceByKey(lambda a, b: a + b)

To view the result, again  <span style="color:green">use take! </span> 

In [7]:
counts.take(40)

[('<?xml', 1),
 ('version=\\"1.0\\"', 1),
 ('<rss', 1),
 ('version=\\"2.0\\"', 1),
 ('xmlns:content=\\"http://purl.org/rss/1.0/modules/content/\\"', 1),
 ('xmlns:dc=\\"http://purl.org/dc/elements/1.1/\\"', 1),
 ('xmlns:snf=\\"http://www.smartnews.be/snf\\">', 1),
 ('<channel>', 1),
 ('<title>Kyodo', 1),
 ('News</title>', 1),
 ('<link>https://this.kiji.is/-/units/288215928582407265</link>', 1),
 ('<description>Kyodo', 1),
 ('19', 28),
 ('2019', 50),
 ('<language>en</language>', 1),
 ('<copyright>(c)', 1),
 ('一般社団法人共同通信社</copyright>', 1),
 ('<ttl>1</ttl>', 1),
 ('', 7205),
 ('Laos,', 4),
 ('Thailand,', 3),
 ('Vietnam', 4),
 ('warn</title>', 1),
 ('<guid>http://this.kiji.is/569551319185638497</guid>', 1),
 ('Severe', 1),
 ('extreme', 2),
 ('drought', 6),
 ('is', 84),
 ('four', 6),
 ('lower', 22),
 ('Basin', 2),
 ('countries', 3),
 ('of', 257),
 ('Cambodia...', 1),
 ('<pubDate>Wed,', 2),
 ('20', 6),
 ('class=\\"ma__p\\">Severe', 1),
 ('Cambodia,', 2),
 ('Thailand', 6),
 ('now', 11)]

Once created, distFile can be acted on by dataset operations. For example, we can add up the sizes of all the lines using the map and reduce operations as follows: distFile.map(lambda s: len(s)).reduce(lambda a, b: a + b).

Some notes on reading files with Spark:

If using a path on the local filesystem, the file must also be accessible at the same path on worker nodes. Either copy the file to all workers or use a network-mounted shared file system.

All of Spark’s file-based input methods, including textFile, support running on directories, compressed files, and wildcards as well. For example, you can use textFile("/my/directory"), textFile("/my/directory/*.txt"), and textFile("/my/directory/*.gz").

The textFile method also takes an optional second argument for controlling the number of partitions of the file. By default, Spark creates one partition for each block of the file (blocks being 128MB by default in HDFS), but you can also ask for a higher number of partitions by passing a larger value. Note that you cannot have fewer partitions than blocks.

Apart from text files, Spark’s Python API also supports several other data formats:

SparkContext.wholeTextFiles lets you read a directory containing multiple small text files, and returns each of them as (filename, content) pairs. This is in contrast with textFile, which would return one record per line in each file.

RDD.saveAsPickleFile and SparkContext.pickleFile support saving an RDD in a simple format consisting of pickled Python objects. Batching is used on pickle serialization, with default batch size 10.

SequenceFile and Hadoop Input/Output Formats

Note this feature is currently marked Experimental and is intended for advanced users. It may be replaced in future with read/write support based on Spark SQL, in which case Spark SQL is the preferred approach.

# SparkContext Example – PySpark

For the 1st example we'll count the number of lines with words 'news' or 'Kyodo' in file.txt. So let's say if there are 5 lines in a file and 3 lines have the word 'news', then the output will be:<br> $\rightarrow$ Line with 'news': 3. <br>likewise for 'Kyodo'<br><br>
<b>Note − We are not creating any SparkContext object in the following example because by default, <span style="color:blue">Spark automatically creates the <span style="color:green">SparkContext object</span> named<span> <span style="color:green"> sc,</span>  when PySpark shell starts.</span>.</span> <br><br> In case you try to create another SparkContext object, you will get the following error  – <span style="color:red"> "ValueError: Cannot run multiple SparkContexts at once".</b></span> 

In [8]:
file

file.txt MapPartitionsRDD[3] at textFile at NativeMethodAccessorImpl.java:0

In [9]:
file.cache()

file.txt MapPartitionsRDD[3] at textFile at NativeMethodAccessorImpl.java:0

In [10]:
numAs = file.filter(lambda s: 'news' in s).count()
numBs = file.filter(lambda s: 'Kyodo' in s).count()

In [11]:
print("Lines with a: %s Lines with b: %s" % (str(numAs)+",\n",str(numBs)))

Lines with a: 6,
 Lines with b: 4


# Counting elements in RDD

In [12]:
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)

In [13]:
counts = words.count()

In [14]:
print("Number of elements in RDD: %i" % (counts))

Number of elements in RDD: 8


In [15]:
lineLengths = file.map(lambda s: len(s))
totalLength = lineLengths.reduce(lambda a, b: a + b)

NOTE: <br>
    In the above code the first line defines a base RDD from an external file. This dataset is not loaded in memory or otherwise acted on: lines is merely a pointer to the file. <b>The second line defines lineLengths as the result of a map transformation. Again, lineLengths is not immediately computed, due to laziness. </b><span style="color:green">Finally, we run reduce, which is an action.</span> <span style="color:red">At this point Spark breaks the computation into tasks to run on separate machines, and each machine runs both its part of the map and a local reduction, returning only its answer to the driver program.</span>

If we also wanted to use lineLengths again later, we could add: lineLengths.persist()

In [16]:
lineLengths.persist()

PythonRDD[15] at RDD at PythonRDD.scala:53

# Using the "Collect" method

<u>Collect() – Retrieve data from Spark RDD/DataFrame</u>

Spark collect() and collectAsList() are action operation that is used to retrieve all the elements of the RDD/DataFrame/Dataset (from all nodes) to the driver node. We should use the collect() on smaller dataset usually after filter(), group(), count() e.t.c. Retrieving on larger dataset results in out of memory

In [17]:
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
coll = words.collect()
print("Elements in RDD: %s" % (coll))

Elements in RDD: ['scala', 'java', 'hadoop', 'spark', 'akka', 'spark vs hadoop', 'pyspark', 'pyspark and spark']


# Using <b>"Filter"</b><br>


A new RDD is returned containing the elements, which satisfies the function inside the filter<br>
in example below, filter out the strings not containing "spark";

In [18]:
words = sc.parallelize(
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
words_filter = words.filter(lambda x: 'spark' in x)
filtered = words_filter.collect()
print("Fitered RDD: %s" % (filtered))

Fitered RDD: ['spark', 'spark vs hadoop', 'pyspark', 'pyspark and spark']


# Saving and Loading SequenceFiles

In [19]:
rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x))

In [20]:
rdd.saveAsSequenceFile("myNewSeqFile") ### save sequence to file

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.saveAsSequenceFile.
: org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory file:/Users/okara/Downloads/PySpark_for_mentees/myNewSeqFile already exists
	at org.apache.hadoop.mapred.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:131)
	at org.apache.spark.internal.io.HadoopMapRedWriteConfigUtil.assertConf(SparkHadoopWriter.scala:289)
	at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:71)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopDataset$1(PairRDDFunctions.scala:1090)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1088)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$4(PairRDDFunctions.scala:1061)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1026)
	at org.apache.spark.api.python.PythonRDD$.saveAsHadoopFile(PythonRDD.scala:604)
	at org.apache.spark.api.python.PythonRDD$.saveAsSequenceFile(PythonRDD.scala:573)
	at org.apache.spark.api.python.PythonRDD.saveAsSequenceFile(PythonRDD.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:64)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:564)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:832)


In [None]:
sorted(sc.sequenceFile("myNewSeqFile").collect()) ### load sequence from file and collect

# Spark's API relies heavily on passing functions in the driver program to run on the cluster. There are three recommended ways to do this:


<ul>
<li>Lambda expressions, for simple functions that can be written as an expression. (Lambdas do not support multi-statement functions or statements that do not return a value.)</li>
<li>Local defs inside the function calling into Spark, for longer code.</li>
<li>Top-level functions in a module.</li>
</ul><br>
For example, to pass a longer function than can be supported using a lambda, consider the code below:


In [None]:
"""MyScript.py"""
if __name__ == "__main__":
    def myFunc(s):
        words = s.split(" ")
        return len(words)

    sc.textFile("filenew.txt").map(myFunc)

Note that while it is also possible to pass a reference to a method in a class instance (as opposed to a singleton object), this requires sending the object that contains that class along with the method. For example, consider:

In [None]:
class MyClass(object):
    def func(self, s):
        return s
    def doStuff(self, rdd):
        return rdd.map(self.func)

Here, if we create a new MyClass and call doStuff on it, the map inside there references the func method of that MyClass instance, so the whole object needs to be sent to the cluster.

In a similar way, accessing fields of the outer object will reference the whole object:

In [None]:
class MyClass(object):
    def __init__(self):
        self.field = "Hello"
    def doStuff(self, rdd):
        return rdd.map(lambda s: self.field + s)

To avoid this issue, the simplest way is to copy field into a local variable instead of accessing it externally:

In [None]:
def doStuff(self, rdd):
    field = self.field
    return rdd.map(lambda s: field + s)