# Spark Tutorial: Learning Apache Spark
## Part 1: Basic notebook usage and Python integration
### (1a) Notebook usage

In [None]:
# This is a python cell. You can run normal Python code here.
print('hello world')

In [None]:
# Here is another Python cell.
x = 42
if x > 40:
    print('x is {0}. It is larger than 40.'.format(x))

### (1b) Notebook state

In [None]:
# This cell relies on x being defined in the previous cell.
# If we didn't run the previous cell, then this code would fail.
print(x * 2)

### (1c) Library imports

In [None]:
# Import the regular expresion library.
import re
m = re.search('(?<=abc)def', 'abcdef')
m.group(0)

In [None]:
# Import the datetime library
import datetime
print('This was last run on : {0}'.format(datetime.datetime.now()))

## Part 2: An introduction to using Apache Spark with the PySpark SQL API running in a notebook
### Spark Context
In Spark, communication occurs between a driver and executors. The driver has Spark jobs that it needs to run and these jobs are split into tasks that are submitted to the executors for completion. The results from these tasks are delivered back to the driver.

In part 1, we saw that normal Python code can be executed via cells. When using Databricks this code gets executed in the Spark driver's Java Virtual Machine (JVM) and not in an executor's JVM, and when using an Jupyter notebook it is executed within the kernel associated with the notebook. Since no Spark functionality is actually being used, no tasks are launched on the executors.

In order to use Spark and its DataFrame API we will need to use a SQLContext. When running Spark, you start a new Spark application by creating a SparkContext. You can then create a SQLContext from the SparkContext. When the SparkContext is created, it asks the master for some cores to use to do work. The master sets these cores aside just for you; they won't be used for other applications. When using Databricks, both a SparkContext and a SQLContext are created for you automatically. sc is your SparkContext, and sqlContext is your SQLContext.

### (2a) SparkContext type

In [None]:
# Display the type of the spark sqlContext
type(sqlContext)

### (2b) SparkContext attributes 

In [None]:
# List sqlContext's attributes
dir(sqlContext)

### (2c) Geeting help

In [None]:
help(sqlContext)

In [None]:
# Get what version of spark we are using
sc.version

In [None]:
# Help can be used on any Python object
help(map)

## Part 3: Using DataFrames and chaining together transformations and actions 

### (3a) Create a Python collection of 10,000 people
We will use a third-party Python testing library called fake-factory to create a collection of fake person records.

In [None]:
# Create a fake name
from faker import Factory
fake = Factory.create()
fake.seed(4321)
fake.name()

In [None]:
# Each entry consists of last_name, first_name, ssn, job, and age (at least 1)
from pyspark.sql import Row
def fake_entry():
    name = fake.name().split()
    return Row(name[1], name[0], fake.ssn(), fake.job(), abs(2017-fake.date_time().year)+1)

In [None]:
# A helper function to call a function repeatedly
def repeat(times, func, *args, **kwargs):
    for _ in xrange(times):
        yield func(*args, **kwargs)

In [None]:
# Generate fake data of 10,000 people
data = list(repeat(10000, fake_entry))

In [None]:
# Print one entry
data[0][0],data[0][1],data[0][2],data[0][3],data[0][4]

In [None]:
# Number of entries
len(data)

### (3b) Distributed data and using a collection to create a DataFrame 

In [None]:
# Create a DataFrame in Spark
dataDF = sqlContext.createDataFrame(data,('last_name', 'first_name','ssn','occupation','age'))

In [None]:
# Check the type of the new DataFrame 
print('type of dataDF: {0}'.format(type(dataDF)))

In [None]:
# Check the DataFrame's schema
dataDF.printSchema()

In [None]:
# Register the new DataFrame as a named table
sqlContext.registerDataFrameAsTable(dataDF, 'dataframe')

In [None]:
# Display 20 lines of the new table
sqlContext.sql("select * from dataframe").show()

In [None]:
# Check how many partitions the DataFrame will be split into.
dataDF.rdd.getNumPartitions()

In [None]:
# Check operations about DataFrame
newDF = dataDF.distinct().select('*')
newDF.explain(True)

### (3c) Subtract one from each value using `select`

In [None]:
# Subtract 1 from age of each entry using select transformation. A new column will be created by subtract 1 from 'age'
# column and it will be renamed as 'age'.
subDF = dataDF.select('last_name', 'first_name', 'ssn', 'occupation', (dataDF.age-1).alias('age'))

In [None]:
# Query plan
subDF.explain(True)

### (3d) Use `collect` to view results

In [None]:
# Using collect to gather data from multiple partitions
results = subDF.collect()
# Because the gathered data may be too large to display, using show() to display a small amount of data
# print(results)
subDF.show()

### (3e) Use `count` to get total

In [None]:
print(dataDF.count())
print(subDF.count())

### (3f) Apply transformation `filter` and view results with `collect`
Each task makes a new partition with entries from the original partition that have an "age" column value less than 10. 

In [None]:
from pyspark.sql.types import IntegerType
subDF.age.cast(IntegerType())
filteredDF = subDF.filter(subDF.age < 10)
filteredDF.show(truncate=False)
filteredDF.count()

## Part 4: Python Lambda functions and User Defined Functions
We can define a lambda function and then register it as a Spark *User Defined Function* (UDF) to filter data.

In [None]:
from pyspark.sql.types import BooleanType
from pyspark.sql.functions import udf
less_ten = udf(lambda s: s<10, BooleanType())
lambdaDF = subDF.filter(less_ten(subDF.age))
lambdaDF.show()
lambdaDF.count()

## Part 5: Additional DataFrame actions

###  `first()` & `take()`
We can look at the first a few entries to get rough idea about the data. *first()* returns the first entry and *take(n)* returns the first n entries.

In [None]:
print("first: {0}\n".format(filteredDF.first()))

In [None]:
print("Four of them: {0}\n".format(filteredDF.take(4)))

## Part 6: Additional DataFrame transformations

### (6a) `orderBy()`
`orderBy()` can be used to sort a DataFrame by one or more columns, producing a new DataFrame.
To access a `Column` object, we have two notations on the DataFrame:
* Pandas-style notation: `filteredDF.age`
* Subscript notation: `filteredDF['age']`
However, Pandas-style may have side effects. Using Subscript notation is always safe. Both syntaxes return a `Column`, which may have additional methods such as `desc()` and `asc()`.

In [None]:
# Sort by age in ascending order; returns a new DataFrame
dataDF.orderBy(dataDF.age).show(n=5)

# Sort by last name in descending order
dataDF.orderBy(dataDF.last_name.desc()).show(n=5)

## (6b) `distinct()` and `dropDuplicates()`
`distinct()` filters out duplicate rows, and it will consider all columns. `dropDuplicates()` is similar as `distinct()`, except that it allows to define specific column to compare.

Total number of original DataFrame and DataFrame with distinct entries. Because the DataFrame here is generated randomly, it is unlikely to have duplicated entries.

In [None]:
print(dataDF.count())
print(dataDF.distinct().count())

We create a dummy DataFrame to show the effect of `distinct()` and `dropDuplicates()`.

In [None]:
# A dummy DataFrame with duplicated entries.
tempDF = sqlContext.createDataFrame([("Joe", 1), ("Joe", 1), ("Anna", 15), ("Anna", 12), ("Ravi", 5)], ('name', 'score'))
# Original DataFrame
tempDF.show()
# Distinct DataFrame
tempDF.distinct().show()

We can use `dropDuplicates()` to remove duplicated entries of specific columns.

In [None]:
print(tempDF.count())
print(tempDF.dropDuplicates(['name']).count())
tempDF.dropDuplicates(['name']).show()

### (6c) `drop()`
We can use `drop()` to drop some columns from a DataFrame.

In [None]:
dataDF.drop('occupation').drop('age').show()

### (6d) `groupBy()`
`groupBy()` allows to perform aggregation on a DataFrame. It returns a special GroupedData object that can apply various aggregation operations such as `count()`, `sum()`, `max()`, and `avg()`. These aggregation functions typically create a new Column and return a new DataFrame.

In [None]:
# Count total number of people of each occupation.
dataDF.groupBy('occupation').count().show(truncate=False)

In [None]:
# Compute average age.
dataDF.groupBy().avg('age').show(truncate=False)

In [None]:
# Compute max and min ages.
print('Maximum age: {0}'.format(dataDF.groupBy().max('age').first()[0]))
print('Minimum age: {0}'.format(dataDF.groupBy().min('age').first()[0]))

### (6e) `sample()`
`sample()` returns a random sample of DataFrame. `withReplacement` argument can specify whether sample with or without replacement. `fraction` parameter specifies the fraction elements to be returned. (`fraction=0.2` returns 20% of the elements in the DataFrame.) `seed` is used to set the seed for RNG.

In [None]:
sampledDF = dataDF.sample(withReplacement=False, fraction=0.10)
print(sampledDF.count())
sampledDF.show()

In [None]:
print(dataDF.sample(withReplacement=False,fraction=0.05).count())

## Part 7: Caching DataFrames and storage options

### (7a) Caching DataFrames
`cache()` can cache data in Spark. Caching can improve efficiency when you plan to use a DataFrame multiple times. However, an action on the DataFrame should be triggered before the caching will occur because of the lazy mechanism in Spark.

In [None]:
# Cache the DataFrame
filteredDF.cache()
# Trigger an action
print(filteredDF.count())
# The DataFrame is indeed cached
print(filteredDF.is_cached)

### (7b) Unpersist and storage options

Use `unpersist()` to reclaim the memory used by previously cached DataFrame.

In [None]:
filteredDF.unpersist()
print(filteredDF.is_cached)

## Part 8: Debugging Spark applications and lazy evaluation
### How Python is Executed in Spark
Internally, Spark executes using a Java Virtual Machine (JVM). pySpark runs Python code in a JVM using Py4J. Py4J enables Python programs running in a Python interpreter to dynamically access Java objects in a Java Virtual Machine. Methods are called as if the Java objects resided in the Python interpreter and Java collections can be accessed through standard Python collection methods. Py4J also enables Java programs to call back Python objects.

### (8a) Challenges with lazy evalutaion using transformations and actions

The `brokenTen` function has a mistake in `if` statement. However, due to the lazy evaluation in Spark, the code will not be actually executed until an *action* is called on the DataFrame. Notice that `fitler()` will not trigger execution of the code.

In [None]:
def brokenTen(value):
    ''' Check whether a variable is less than ten.
    Input:
        value: the value to be compared with ten
    Output: 
        boolean: True if value < 10 else False
    Note:
        In if statement the variable val is undefined, which will throw an error.
    
    '''
    if (val < 10):
        return True
    else:
        return False

btUDF = udf(brokenTen)
brokenDF = subDF.filter(btUDF(subDF.age)==True)

When an action is called on the DataFrame, `brokenTen` function will be called and an error will occur.

In [None]:
brokenDF.count()

In [None]:
# We may use lambda function to improve readability and conciseness
myUDF = udf(lambda v: v < 10, BooleanType())
subDF.filter(myUDF(subDF.age) == True).show(truncate=False)

### (8d) Readability and code style
To make coding style more readable, statement with multiple methods, transformations, and actions can be enclosed in parentheses and each method, transformation, and action on a separate line.

In [None]:
from pyspark.sql.functions import *
(dataDF
    .filter(dataDF.age > 20)
    .select(concat(dataDF.first_name, lit(' '), dataDF.last_name), dataDF.occupation)
    .show(truncate=False)
)