# Spark Daily Show RDD
(c) Miradiz Rakhmatov


These are my notes and experiments running Spark on my local machine.

In [1]:
import findspark

In [2]:
findspark.init('/users/miradiz/Downloads/spark-3.1.2-bin-hadoop3.2/')

In [3]:
## Python API for interpretting Spark which was written in Scala
import pyspark

In [4]:
## A SparkContext represents the connection to a Spark cluster, and can be used to create RDDs
sc = pyspark.SparkContext()

In [5]:
## Let's read the tsv file into RDD object
## The RDD object raw_data resembles a list of string objects, with one object for each line in the data set

raw_data = sc.textFile("daily_show.tsv")

In [6]:
## Let's show the first 5 rows of RDD
## tsv files have "\t" delimiter 

raw_data.take(5)

['YEAR\tGoogleKnowlege_Occupation\tShow\tGroup\tRaw_Guest_List',
 '1999\tactor\t1/11/99\tActing\tMichael J. Fox',
 '1999\tComedian\t1/12/99\tComedy\tSandra Bernhard',
 '1999\ttelevision actress\t1/13/99\tActing\tTracey Ullman',
 '1999\tfilm actress\t1/14/99\tActing\tGillian Anderson']

## Note:
So, if an RDD resembles a Python list, why don't we just use bracket notation to access elements in the RDD?

The answer is that Spark distributes RDD objects across many partitions, and the RDD specifically handles distributed data. We can't rely on the standard implementation of a list for these reasons.

Spark offers many advantages over regular Python. For example, thanks to RDD abstraction, you can run Spark locally on your own computer. Spark will simulate distributing your calculations over many machines by automatically slicing your computer's memory into partitions.

Spark's RDD implementation also lets us evaluate code "lazily," meaning we can postpone running a calculation until absolutely necessary. Above, Spark waited to load the TSV file into an RDD until raw_data.take(5) executed. When our code called raw_data = sc.textFile("dail_show.tsv"), Spark created a pointer to the file, but it didn't actually read it into raw_data until raw_data.take(5) needed that variable to run its logic.

The advantage of "lazy" evaluation is that we can build up a queue of tasks and let Spark optimize the workflow in the background. In regular Python, the interpreter can't do much workflow optimization.

In [7]:
## As you see it is not list. It looks like Python list but it is a RDD object
type(raw_data)

pyspark.rdd.RDD

## Transformations and Actions

There are two types of methods in Spark:

1. Transformations — map(), reduceByKey()
2. Actions — take(), reduce(), saveAsTextFile(), collect()
Transformations are lazy operations that always return a reference to an RDD object. Spark doesn't actually run the transformations, however, until an action needs to use the RDD resulting from a transformation. Any function that returns an RDD is a transformation, and any function that returns a value is an action. These concepts will become clear as we work through this lesson and practice writing PySpark code.

In [8]:
## Let's use map with a function that splits each string line into a list
daily_show = raw_data.map(lambda line: line.split('\t'))
daily_show.take(5)

[['YEAR', 'GoogleKnowlege_Occupation', 'Show', 'Group', 'Raw_Guest_List'],
 ['1999', 'actor', '1/11/99', 'Acting', 'Michael J. Fox'],
 ['1999', 'Comedian', '1/12/99', 'Comedy', 'Sandra Bernhard'],
 ['1999', 'television actress', '1/13/99', 'Acting', 'Tracey Ullman'],
 ['1999', 'film actress', '1/14/99', 'Acting', 'Gillian Anderson']]

## Immutability
RDD objects are immutable, meaning that we can't change their values once we've created them. In Python, list and dictionary objects are mutable (we can change their values), while tuple objects are immutable. The only way to modify a tuple object in Python is to create a new tuple object with the necessary updates. Spark uses the immutability of RDDs to improve calculation speeds. That why we had to assign the result of map into a new variable called daily_show

In [9]:
## As you see map function changed the RDD into PipelinedRDD
type(daily_show)

pyspark.rdd.PipelinedRDD

## Not iterable with for loops
RDD objects cannot be iterated with for looks directly like Python list. We can use map() or other transformation functions to go through each element (element wise). Let's see how we can create a frequency table like pandas.Series.value_counts() in RDD objects. Mainly, we want to see how many guests visited the show in each year

In [10]:
## Use Map followed by reduceByKey step by step
tally = daily_show.map(lambda x: (x[0], 1)).reduceByKey(lambda x,y: x+y)
tally

PythonRDD[8] at RDD at PythonRDD.scala:53

As you see above it didn't return the histogram we were hoping for. Because of lazy evaluation, PySpark delays executing the map and reduceByKey steps until we actually need them. Lets use action method to see our results.

In [11]:
## Use take() action to see the output
tally = daily_show.map(lambda x: (x[0], 1)).reduceByKey(lambda x,y: x+y)
tally.take(tally.count())

[('YEAR', 1),
 ('2002', 159),
 ('2003', 166),
 ('2004', 164),
 ('2007', 141),
 ('2010', 165),
 ('2011', 163),
 ('2012', 164),
 ('2013', 166),
 ('2014', 163),
 ('2015', 100),
 ('1999', 166),
 ('2000', 169),
 ('2001', 157),
 ('2005', 162),
 ('2006', 161),
 ('2008', 164),
 ('2009', 163)]

### Remove headers:
Unlike pandas, Spark knows nothing about column headers, and didn't set them aside. We need a way to remove the element ('YEAR', 1) from our collection. We'll need a workaround, though, because RDD objects are immutable once we create them. The only way to remove that tuple is to create a new RDD object that doesn't have it. Just like Python's filter(), Spark comes with filter(f) which is applied against RDD and take a function. It returns the elements that meet the criteria of inside (f)

In [12]:
## line[0] is the first element in tuple 
no_header = tally.filter(lambda line: line[0] != "YEAR")

## Can define a function instead of lambda as well
def filter_year(line):
    if line[0] != "YEAR":
        return True

no_header.take(no_header.count())    

[('2002', 159),
 ('2003', 166),
 ('2004', 164),
 ('2007', 141),
 ('2010', 165),
 ('2011', 163),
 ('2012', 164),
 ('2013', 166),
 ('2014', 163),
 ('2015', 100),
 ('1999', 166),
 ('2000', 169),
 ('2001', 157),
 ('2005', 162),
 ('2006', 161),
 ('2008', 164),
 ('2009', 163)]

## Succession
We can perform successive transformations on RDD objects without saving each outcome into a new variable.
To demonstrate Spark's capability, let's see how to chain together a series of data transformations into a pipeline, and then we'll observe Spark managing everything in the background. The developers who wrote Spark specifically designed this functionality and optimized it for running tasks in succession.

Before Spark came along, running many tasks in succession in Hadoop was incredibly time-consuming. Hadoop had to write intermediate results to disk, and it wasn't aware of the full pipeline.

In [13]:
daily_show.take(5)

[['YEAR', 'GoogleKnowlege_Occupation', 'Show', 'Group', 'Raw_Guest_List'],
 ['1999', 'actor', '1/11/99', 'Acting', 'Michael J. Fox'],
 ['1999', 'Comedian', '1/12/99', 'Comedy', 'Sandra Bernhard'],
 ['1999', 'television actress', '1/13/99', 'Acting', 'Tracey Ullman'],
 ['1999', 'film actress', '1/14/99', 'Acting', 'Gillian Anderson']]

In [14]:
## Let's transform the above RDD
daily_show.filter(lambda line: line[1] != '') \
                   .map(lambda line: (line[1].lower(), 1)) \
                   .reduceByKey(lambda x,y: x+y) \
                   .take(10)

# filter() -> shows the lines where "occupation" is not blank ("")
# map() -> creates a tuple with e.g (occupation_name, 1)
# reduceByKey() -> adds up each corresponding number (1) for each occupation 

[('actor', 596),
 ('film actress', 21),
 ('model', 9),
 ('stand-up comedian', 44),
 ('actress', 271),
 ('television personality', 13),
 ('comic', 2),
 ('musician', 19),
 ('film actor', 19),
 ('journalist', 253)]

## Conclusion:

This is my first experiment with Spark. It was pretty interesting to work with RDDs. It somewhat resembles pandas.DataFrame but yet very scalable object.


# THE END