## LAB 1 - INTRO TO SPARK RDD

####  **1. Creating a `SparkContext`**

**1.1 Create a `SparkContext`**

* `conf = SparkConf().setAppName(appName).setMaster(master)`
* `sc = SparkContext(conf=conf)`

**1.2 Check if there is a `SparkContext`**

In [0]:
sc #this stands for spark context. Databricks creates a spark context automtically.

**1.3 Check `SparkContext` attributes**

In [0]:
# List sc's attributes
dir(sc)

**1.4 How to Get help**

In [0]:
# Use help to obtain more detailed information
#help(sc)
help(map)

In [0]:
# After reading the help we've decided we want to use sc.version to see what version of Spark we are running
sc.version

#### **2. Using RDDs & Chaining Together Transformations & Actions**

**2.1 Create your first RDD**

In [0]:
# Parallelize data using 8 partitions
# This operation is a transformation of data into an RDD
# Spark uses lazy evaluation, so no Spark jobs are run at this point. Jobs will be run when you show or collect results
import numpy as np # imports the numpy library 
import random # imports the random library
random.seed(10000) # sets the random seed to 10000 so we all can get the same random numbers
data = np.random.uniform(0,999,10000) # generates 10000 random numbers between 0 and 999
data_RDD = sc.parallelize(data, 8) # this will divide the datainto 8 partitions and store them in a distributed manner
data_RDD # shows what dara_rdd is. this will not display what the rdd holds because of lazy evaluations


In [0]:
data_RDD.collect() # collect() is an action. by calling an action on the rdd, we activate all the previous methods. 
                   # this will display what data_rdd holds

In [0]:
def divide_by_5(x): # creates a function named divide_by_5
  return x/5

In [0]:
def is_even(number): # creates a function named is_even
  if number%2 == 0: # checks if number is even
    return "Even" # if even returns, the string "even"
  else:
    return "Odd" # if not even, return the string "odd"

In [0]:
data_RDD.map(divide_by_5).collect() # this will distribute the divide_by_5 function to each element withing data_RDD
                                    # in other words, divide_by_5 function will be executed for each number withing data_RDD
                                    # then the results will be displayed because we use collect()
                                    # data_RDD.map(lambda x: x/5).collect()

**2.2 Get type of Python object**

In [0]:
# Let's see what type sc.parallelize() returned
type(data_RDD) # tells us what the data type of data_rdd is. 

**2.3 Get id of RDD**

In [0]:
# Each RDD will be assigned to an id
data_new =  np.random.uniform(-999,999,10000) # creates 10000 random numbers between -999 and 999
datanew_RDD = sc.parallelize(data_new, 8) # distributes the data into different computers within the cluster
# Each RDD gets a unique ID
print(data_RDD.id()) # prints the id of the data_RDD
print(datanew_RDD.id()) # prints the id of the datanew_RDD

**2.4 Name RDDs and get number of partitions**

In [0]:
# We can name each newly created RDD using the setName() method
data_RDD.setName('sales_amount') # sets the name for data_RDD
print(data_RDD.name()) # prints the name of data_RDD
# Let's see how many partitions the RDD will be split into by using the getNumPartitions()
print(data_RDD.getNumPartitions()) # prints how many partitions the dataset is split and distributed.

**2.5: Calculate the sales tax with `map`**

In [0]:
# Create sub function to subtract 1
def get_tax(value): # compute the tax of a price (tax rate is assumes to be 6%)
  """"Multiplies with 0.06

    Args:
       value (int): A number.

    Returns:
        int: `value` times 0.06, which is tax paid.
    """
  return (value*0.06)

In [0]:
# Because map is a transformation and Spark uses lazy evaluation, no jobs, stages, or tasks will be launched when we run this code.
tax_RDD = data_RDD.map(get_tax) # distributes the get_tax function into all the elements of the data_RDD
# Let's see the RDD transformation hierarchy
#print(tax_RDD.toDebugString())

**2.6 Perform action `collect` to view results **

In [0]:
# Let's collect the data
print(tax_RDD.collect()) # displays the tax_RDD. recall that you are not recommened to call the collect() function so often due to memory constraints. 

**2.7 Perform action `count` to view counts **

In [0]:
print(tax_RDD.count()) # prints the number of elements (numbers) in the tax_RDD.
print(data_RDD.count()) # prints the number of elements (numbers) in the  data_RDD

**2.8 Apply transformation `filter` and view results with `collect` **

In [0]:
# Define a function to filter a single value
def islessthan50(value): # creates a function that checks a number to see if it is less than a certain number
    """Return whether value is below ten.

    Args:
        value (int): A number.

    Returns:
        bool: Whether `value` is less than ten.
    """
    threshold = 50 # assigns 50 to threshold
    if (value < threshold): # checks if the value is less than the threshold
        return True # if yes, returns true
    else:
        return False # OW, returns false.

In [0]:
# islessthan50 function will transform the data in a way that it will drop values if they are above 50
# Pass the function ten to the filter transformation
# Filter is a transformation so no tasks are run
filteredRDD = tax_RDD.filter(islessthan50) # it will distribute the islessthan5o function to each value in tax_RDD and then filter it based on the true false recall that nothing will be displayed because there is no collect()

# get the count of transactions where tax is less than $50
print(filteredRDD.count()) # it will count the number of elements in filtered RDD. In other words, it will count the number of elements in tax_RDD less than the threshold. 

# View the results using collect()
# Collect is an action and triggers the filter transformation to run
print(filteredRDD.collect())

#### **3: Using Lambda Function**

**3.1 Use lambda function to filter**

In [0]:
filteredRDD = tax_RDD.filter(lambda x: x > 50) # instead of creating custom functions, just use lambda. lambda is similar to def (creating functions).
                                # lambda functions is just used once. This lambda function basically checks if x is less than 50. and distributed this                                 # to every element in taxt_RDD via filter. 
print(filteredRDD.collect())

**3.2 Use lambda with a function **

In [0]:
# Define a function to filter a single value
def isless(value, threshold): # this is the improved version of the is_less_than_50 function. T
    """Return whether value is below ten.

    Args:
        value (int): A number.

    Returns:
        bool: Whether `value` is less than ten.
    """
    
    if (value < threshold): # checks if a value is less than the threshold 
        return True  # if yes, returns true
    else:
        return False # OW, returns False.

In [0]:
filteredRDD = tax_RDD.filter(lambda value: isless(value, 500)) # this will call the isless function for each value where threhold is set to be 500.
# View the results using collect()
# Collect is an action and triggers the filter transformation to run
print(filteredRDD.collect())

#### ** 4. Other Common Actions **

In [0]:
filteredRDD.collect()

**4.1 Apply first(), take(n), takeOrdered(n), top(n) functions **

In [0]:
# Let's get the first element
print(filteredRDD.first())

# The first n
print(filteredRDD.take(4))

# Retrieve the n smallest elements
print(filteredRDD.takeOrdered(3))

# the largest n numbers
print(filteredRDD.top(4))

# Pass a lambda function to takeOrdered to reverse the order (this will be the same with top)
print(filteredRDD.takeOrdered(4,  lambda x: -x))

**4.2 Apply reduce() functions **

In [0]:
# [4, 3, 5, 6] accumsum = 4 number = 3
# [7, 5, 6] accumsum = 7 number = 5
# [12, 6] accumsum = 12 number = 6
# 18

In [0]:
# Obtain Python's add function
from operator import add

# Efficiently sum the RDD using reduce
print(filteredRDD.reduce(add)) # reduce will combine all the values and return one number. You are reducing the elements of a dataframe to one result.

# reduce() aggregate action function is used to reduce the elements of an RDD using the specified commutative and associative binary operator. 

# Sum using reduce with a lambda function
print(filteredRDD.reduce(lambda accumsum, number: accumsum + number))

# print(filteredRDD.reduce(lambda a, b: a + b))
# print(filteredRDD.reduce(lambda a, b: a * b))

# you can repartition the data first and then add too
print(filteredRDD.repartition(3).reduce(lambda accumsum, number: accumsum + number))

** 4.3 Apply takeSample() and countByValue() actions **

In [0]:
# takeSample reusing or without reusing elements
print(filteredRDD.takeSample(withReplacement=True, num=6, seed=500)) #It will sample the filtered dataset by taking 6 random samples with replacement

print('\n') # prints an empty line

# Create new base RDD to show countByValue
repetitiveRDD = sc.parallelize([1, 2, 3, 1, 2, 3, 1, 2, 1, 2, 3, 3, 3, 4, 5, 4, 6]) # parallelize the list

print(repetitiveRDD.countByValue()) # it counts how many times a particular element occurs in an RDD

In [0]:
# Let's create a new base RDD to work from
wordsList = ['cat', 'elephant', 'rat', 'rat', 'cat'] # Creates a list 

wordsRDD = sc.parallelize(wordsList, 4) # distributes the list

# Use map
singularAndPluralWordsRDDMap = wordsRDD.map(lambda x: (x, x + 's')) # creates a tuple for each element in the RDD. The first guy in the tuple is the # original word, the secon guy is the plural version of the same word.

# Use flatMap
singularAndPluralWordsRDD = wordsRDD.flatMap(lambda x: (x, x + 's')) # this will also make the words plural but instead storing each singular plural pair in a tuple, it flattens the RDD. 

# flatMap() is a transformation operation that flattens the RDD/DataFrame (array/map DataFrame columns) after applying the function on every element and returns a new PySpark RDD/DataFrame.

# View the results
print(singularAndPluralWordsRDDMap.collect())
print(singularAndPluralWordsRDD.collect())

# View the number of elements in the RDD
print(singularAndPluralWordsRDDMap.count())
print(singularAndPluralWordsRDD.count())

#### ** 5: Additional RDD transformations **

** 5.1 Apply `groupByKey()` and `reduceByKey()` functions **

In [0]:
from operator import add

pairRDD = sc.parallelize([('a', 1), ('a', 2), ('b', 1)]) # create a list of tuples and parallelize it

# mapValues only used to improve format for printing
print(pairRDD.groupByKey().mapValues(lambda x: list(x)).collect()) # group the keys, then map the list function to the values of the keys

# Using mapValues, which is recommended when the key doesn't change
print(pairRDD.groupByKey().mapValues(lambda x: sum(x)).collect()) # group the keys, then map the sum function to the values of the keys

# reduceByKey is more efficient / scalable
print(pairRDD.reduceByKey(add).collect()) #reducebykey will return as many elements as the number of keys. It will reduce the unique keys by summing its values

print(pairRDD.reduceByKey(lambda x, y: x+y).collect())  # same as below instead using add function, we use lambda function and add the values

# to see what groupByKey does
pairRDD.groupByKey().mapValues(lambda x: x).collect() # it gives the key and the pyspark object for its value

** 5.2 Apply `mapPartitions()` and `mapPartitionsWithIndex()` functions  **

In [0]:
# mapPartitions takes a function that takes an iterator and returns an iterator
print(wordsRDD.collect())

itemsRDD = wordsRDD.mapPartitions(lambda x: [','.join(x)]) #It processes the complete partition in one go.
# You can return from the function only once after processing the whole partition.
# All intermediate results needs to be held in memory till you process the whole partition.
print(itemsRDD.collect())

In [0]:
"""
mapPartitions and mapPartitionsWithIndex work on the partitions, not on the elements (all elements will be processed). 
These methods will create the parser instance once for each partition. And as you have only x partitions, the parser instance will be created x times (much less than map). 
But the function you will pass to these methods should take an Iterator object (to take all the elements of a partition at once as input). So in case of mapPartitions and mapPartitionsWithIndex the parser instance will be created, all elements for the current partition will be processed, you will notice that they can improve the performance of your application significantly.

"""


itemsByPartRDD = wordsRDD.mapPartitionsWithIndex(lambda index, iterator: [(index, list(iterator))]) 

# We can see that three of the (partitions) workers have one element and the fourth worker has two
# elements, although things may not bode well for the rat...
print(itemsByPartRDD.collect())

# Rerun without returning a list (acts more like flatMap)
itemsByPartRDD = wordsRDD.mapPartitionsWithIndex(lambda index, iterator: (index, list(iterator)))
print(itemsByPartRDD.collect())

#### ** 6. Caching RDDs and storage options **

** 6.1 Caching RDDs **

In [0]:
# Name the RDD

# Caching RDDs in Spark: It is one mechanism to speed up applications that access the same RDD multiple times. An RDD that is not cached, nor checkpointed, is re-evaluated again each time an action is invoked on that RDD. 

filteredRDD.setName('My Filtered RDD')

# Cache the RDD
filteredRDD.cache()

# Trigger an action
filteredRDD.collect()

# Is it cached
print(filteredRDD.is_cached)

** 6.2 Unpersist and storage options **

In [0]:
# Note that toDebugString also provides storage information
#Basically, we can learn about an Spark RDD lineage graph with the help of this method.

# If we are done with the RDD we can unpersist it so that its memory can be reclaimed
# Whenever a series of transformations are performed on an RDD, they are not evaluated immediately, but lazily.
#When a new RDD has been created from an existing RDD, that new RDD contains a pointer to the parent RDD. Similarly, all the dependencies between the RDDs will be logged in a graph, rather than the actual data. This graph is called the lineage graph.

# http://slideplayer.com/10638371/36/images/41/RDD+Lineage+Graph+%28RDD+operator+graph%29.jpg

print(filteredRDD.toDebugString())

In [0]:
# Get the RDD's current storage level, or StorageLevel.
# Storage level for a non cached RDD
filteredRDD.unpersist()
print(filteredRDD.getStorageLevel())

# Storage level for a cached RDD
filteredRDD.cache()
print(filteredRDD.getStorageLevel())

In [0]:
# more on caching
from pyspark import SparkContext
import pyspark
data_RDD.unpersist()
print(data_RDD.getStorageLevel())
data_RDD.persist(pyspark.StorageLevel.MEMORY_AND_DISK_2)
print(data_RDD.getStorageLevel())
data_RDD.unpersist()

#### ** 7. Debugging Spark applications and lazy evaluation **

** 7.1 Challenges with lazy evaluation using transformations and actions **
 
* Spark's use of lazy evaluation can make debugging more difficult because code is not always executed immediately. To see an example of how this can happen, let's first define a broken filter function.
Next we perform a `filter()` operation using the broken filtering function.  No error will occur at this point due to Spark's use of lazy evaluation.
 
* The `filter()` method will not be executed *until* an action operation is invoked on the RDD.  We will perform an action by using the `collect()` method to return a list that contains all of the elements in this RDD.

In [0]:
def brokenTen(value):
    """Incorrect implementation of the ten function.

    Note:
        The `if` statement checks an undefined variable `val` instead of `value`.

    Args:
        value (int): A number.

    Returns:
        bool: Whether `value` is less than ten.

    Raises:
        NameError: The function references `val`, which is not available in the local or global
            namespace, so a `NameError` is raised.
    """
    if (number < 10):
        return True
    else:
        return False

brokenRDD = data_RDD.filter(brokenTen)

In [0]:
# Now we'll see the error
brokenRDD.collect()

** 7.2 Finding the bug **
 
* When the `filter()` method is executed, Spark evaluates the RDD by executing the `parallelize()` and `filter()` methods. Since our `filter()` method has an error in the filtering function `brokenTen()`, an error occurs.
 
* Scroll through the output "Py4JJavaError     Traceback (most recent call last)" part of the cell and first you will see that the line that generated the error is the `collect()` method line. There is *nothing wrong with this line*. However, it is an action and that caused other methods to be executed. Continue scrolling through the Traceback and you will see the following error line:
 
* `NameError: global name 'val' is not defined`
 
* Looking at this error line, we can see that we used the wrong variable name in our filtering function `brokenTen()`.

#### ** 8. Your Turn **

** 8.1 Convert temperature from Celcius to Fahrenheit **

In [0]:
# use the map function and use the formula T(°F) = T(°C) × 9/5 + 32 (remember you need to parallelize the temp_c first)
temp_c = [10, 3, -5, 25, 1, 9, 29, -10, 5]



In [0]:
# use the map function and transform kg to lbs (lbs = kg*2.205) (remember you need to parallelize the weight_kg first)
weight_kg = [52, 85, 62,48, 74]


In [0]:
# use the map function and formula area = r*r*pi (remember you need to parallelize r)
from math import pi
r = [7, 4, 3, 2, 6]



In [0]:
# use the reduce function to find the maximum value in data (remember you need to parallelize data first)
data = [5, 1, 29, 10, 6, 30, 25, 100, -5]


** 8.2 Dealing with Text data **

In [0]:
# Creating a sqlContext and read the data.
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc) 
review_text_score = sqlContext.sql("Select Score, Text from amazon_reviews limit 1000").rdd
review_text_score.collect()

In [0]:
# let's create a key - value pair (key score and value is text)
# Combine text based on score
# tokinize the reviews for each score (split the text into words)
# Get the word count for each review score

In [0]:
# let's create a key - value pair (key score and value is text)
review_text_score = review_text_score.map(lambda x: (x[0], x[1]))
review_text_score.collect()

In [0]:
# Combine text based on score
combined_by_key = review_text_score.reduceByKey(lambda a, b: a+ " " + b)
combined_by_key.collect()

In [0]:
# tokinize the reviews for each score (split the text into words)
split_by_key = combined_by_key.map(lambda x: (x[0], x[1].split()))

In [0]:
# Get the word count for each review score
aa = split_by_key.map(lambda x: (x[0], len(x[1]))) #reduceByKey(lambda a, b: a+b)
aa.collect()

In [0]:
from operator import add
count_=review_text_score.map(lambda x: (x[0], 1)).reduceByKey(add)
count_.collect()

In [0]:
# get the number of reviews in each score - another way
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
review_text_score = sqlContext.sql("Select Score, Text from amazon_reviews limit 1000").rdd
review_text_score = review_text_score.map(lambda x: (x[0], x[1]))
bb = review_text_score.reduceByKey(lambda a, b: a+"___"+b).map(lambda x: (x[0], len(x[1].split("___"))))
bb.collect()

In [0]:
cc = aa.join(count_)
cc.collect()

In [0]:
# average word count per score
cc = aa.join(count_)
cc.map(lambda x: (x[0], x[1][0]/x[1][1])).collect()