# Learn the basics about notebooks and Apache Spark

This notebook introduces you to the basics of analytics notebooks and explains what Apache Spark is and how to use Spark in notebooks. The notebook shows you how to load data into the notebook, parse and explore the data, run queries on the data to extract information, plot your analysis results, and save your result in Object Storage.

This notebook runs on Python 3.9 with Spark 3.2, and Cloud Object Storage.

## Table of contents
- [What is Apache Spark](#apache_spark)
- [Employee Attrition Data](#Employee_Attrition_Data)
- [Load data](#load_data)
- [Working with RDD](#RDD)
- [Work with DataFrame](#DataFrame)
- [Use Spark SQL](#use_spark_sql)

<a id="apache_spark"></a>
## What is Apache Spark

[Spark](http://spark.apache.org/) is a fast open-source engine for large-scale data processing. It is built for speed and ease of use. Through the advanced DAG execution engine that supports cyclic data flow and in-memory computing, programs can run up to 100 times faster than Hadoop MapReduce in memory, or 10 times faster on disc.

Spark consists of the following components:

* Spark Core is the underlying computation engine with the fundamental programming abstraction called resilient distributed datasets (RDDs)
* Spark SQL provides a new data abstraction called DataFrame for structured data processing with SQL and domain-specific language
* MLlib is a scalable machine learning framework for delivering fast distributed algorithms for mining big data
* Streaming leverages Spark's fast scheduling capability to perform real-time analysis on streams of new data
* GraphX is the graph processing framework for the analysis of graph structured data

<img src='https://github.com/carloapp2/SparkPOT/blob/master/spark.png?raw=true' width="50%" height="50%"></img>

The Apache Spark driver application uses the predefined SparkContext object to allow the interaction with the driver application. The SparkContext object tells Spark how and where to access a cluster.

To check the Spark version, run the sc.version command

In [None]:
sc.version

In [None]:
#Current date and time
import time

print (time.asctime( time.localtime(time.time())) );

<a id="Employee_Attrition_Data"></a>
## Employee Attrition data
In this notebook, you will focus on Spark Core and Spark SQL by using the Python API. You will analyze the employee attrition data at https://www.kaggle.com/analystanand/employee-attrition/data.

`[, 0]	ID                     Employee Number`  
`[, 1]	satisfaction_level     Job Satisfaction Level`  
`[, 2]	last_eval_rating	   Time since last evaluation in years`  
`[, 3]	projects_worked        Number of projects completed while at work`  
`[, 4]	average_monthly_hours  Average number of working hours per month`  
`[, 5]	time_spend_company     Time spent at the company in years`  
`[, 6]	work_accident          Whether the employee had a workplace accident`  
`[, 7]	promotion_last_5year   Was an employee promoted over the last 5 years`  
`[, 8]	Department             Department in which an employee works`  
`[, 9]	salary                 Salary level (low, medium, high)`  
`[,10]	Attrition              0=Stayed  1=left the workplace`

<a id="load_data"></a>
## Load data
To load the CSV file to your notebook: 

1. Click the **Data** icon on the notebook action bar. 
2. Click on **browse** to add empattrition.csv file.

The data file is now listed on the **Files** tab of the **Data** panel and is stored in Object Storage.
Place the cursor in the code cell below.  Click on an arrow next to the file name in the right panel and select Insert SparkSession DataFrame.

**Navigate to Manage -> Environmental Runtimes and record 1) Total account capacity unit hours used and 2) Remaining account capacity unit hours on the Spark Tutorial answer sheet.**

# <a id="RDD"></a>
## Working with RDD

We use `SparkContext` to load the data into a `Spark RDD` named `ea`.
Resilient Distributed Dataset (RDD) is a collection of elements that can be operated on in parallel. RDDs are immutable.  An update requires creating a new RDD.  The Spark driver application distributes the work across the cluster.

You can construct RDDs by parallelizing existing Python collections (lists), by manipulating RDDs, or by manipulating files in HDFS or any other storage system.

You can run these types of methods on RDDs: 
 - Actions: query the data and return values
 - Transformations: manipulate data values and return pointers to new RDDs. 

Find more information on Python methods in the <a href="http://spark.apache.org/docs/latest/api/python/pyspark.html" target="_blank" rel="noopener noreferrer">PySpark documentation</a>.

In [None]:
#Replace 'BUCKET' with the bucket name for your project
ea = sc.textFile(cos.url('empattrition.csv', 'BUCKET'))
ea.take(5)

### Instantiate RDD

You can now access the data by using the preconfigured `SparkContext` function in your notebook.

The RDD you created is a collection of strings corresponding to the individual lines in the raw data file. It is also important to remember that the RDD is defined but not instantiated. By applying an action like `count` to the RDD, you effectively instantiate the RDD.

In [None]:
# We subtract 1 from the count to obtain the number of employees because the first row is the column headings
print ("Total number of employee records:", ea.count()-1)

Apply another action to the same RDD.  Read the first row.

In [None]:
print ("The first row:", ea.first())

### Parse Data

To begin working with the data, you need to parse it into columns. You can do this by mapping each line in the RDD to a function that splits the line by commas.

map(func): transformation function that returns a new RDD with the results of running the specified function on each element

The lambda notation in Python is used to create anonymous functions which are not bound to a name. The function is passed as a parameter to the `map` function. The anonymous function splits each line from the `ea` RDD  at comma boundaries. Hence, the new `eaParse` RDD is a list of sub-lists. Each parent list in `eaParse` corresponds to a line in `ea`. The strings in each sub-list are the individual row elements.

In [None]:
eaParse = ea.map(lambda line : line.split(","))

In [None]:
# The first row in eaParse RDD contains the column names
eaParse.first()

The row elements are numbered starting from zero.  Display the first element in the first row.

In [None]:
eaParse.first()[0]

Display the third element in the first row

In [None]:
eaParse.first()[2]

Display the last element.  -1 is the position of the last element

In [None]:
eaParse.first()[-1]

### Filtering Data
filter(func): transformation function that returns a new RDD with the elements for which the specified function is true

In [None]:
# eaSatisfaction is a copy of eaParse with the first row removed
#You need to specify a new RDD name because RDDs are immutable
eaSatisfaction = eaParse.filter(lambda x: x[0] !="ID")
eaSatisfaction.take(5)

### Calculate the average job satisfaction per department

The `eaSatisfaction` RDD contains a list of pairs (v1, v2), where v1 is a department name and v2 is a job satisfaction for one employee. Table 1. illustrates this structure.

#### Table 1.

<table border=1 style="width:80%" align="left">
  <tr>
    <th>Key</th><th>Value</th>
  </tr>
  <tr>
    <td>Sales</td><td>Value 1</td>
  </tr>
  <tr>
    <td> IT</td><td>Value 2</td>
  </tr>
    <tr>
    <td>Sales</td><td>Value 3</td>
  </tr>
    <tr>
    <td>IT</td><td>Value 4</td>
  </tr>
    <tr>
    <td>hr</td><td>Value 5</td>
  </tr>
  <tr>
    <td>...</td><td>...</td>
  </tr>
</table>
<p>

Transform (map) this data set into a new one where each row (data pair) is augmented with the value `1`. Table 2. shows this new structure.

In [None]:
eaCountByKey = eaSatisfaction.map(lambda x : (x[8], (float(x[1]), 1)))
eaCountByKey.take(5)

#### Table 2.

<table border="1" style="width:80%" align="left">
  <tr>
    <th>Key</th><th>Value</th>
  </tr>
  <tr>
    <td>Sales</td><td>(Value 1,1)</td>
  </tr>
  <tr>
    <td>IT</td><td>(Value 2,1)</td>
  </tr>
    <tr>
    <td>Sales</td><td>(Value 3,1)</td>
  </tr>
    <tr>
    <td>IT</td><td>(Value 4,1)</td>
  </tr>
    <tr>
    <td>hr</td><td>(Value 5,1)</td>
  </tr>
  <tr>
    <td>...</td><td>...</td>
  </tr>
</table>
<p>

Reduce the data representation in Table 2 into the Table 3 representation.

In [None]:
eaAddByKey = eaCountByKey.reduceByKey(lambda v1,v2 : (v1[0]+v2[0], v1[1]+v2[1]))
eaAddByKey.take(10)

#### Table 3.

<table border="1" style="width:80%" align="left">
  <tr>
    <th>Key</th><th>Value</th>
  </tr>
  <tr>
    <td>Sales</td><td>(Value 1 + Value 3,2)</td>
  </tr>
  <tr>
    <td>IT</td><td>(Value 2 + Value 4,2)</td>
  </tr>
    <tr>
    <td>hr</td><td>(Value 5,1)</td>
  </tr>
  <tr>
    <td>...</td><td>...</td>
  </tr>
</table>
<p>

Compute the average job satisfaction per department. Create the `eaAverages` RDD by mapping the `eaAddByKey` RDD through a function that divides the sum of satisfaction scores by number employees in the department.

In [None]:
eaAverages = eaAddByKey.map(lambda k: (k[0], k[1][0] / float(k[1][1] ) ) )
eaAverages.take(10)

Print the department names and corresponding average job satisfaction

In [None]:
for pair in eaAverages.top(10):
    print ("Department: ", pair[0], " Average Satisfaction Score: ", pair[1])

### Plot the results

In [None]:
department=[]
satisfaction=[]
for pair in eaAverages.top(10):
    department.append(pair[0])
    satisfaction.append(pair[1])

In [None]:
#import the matplotlib pyplot module and specify display plots inline
%matplotlib inline
import numpy as np
import matplotlib.pyplot as plt

N = 10
index = np.arange(N)  
bar_width = 0.5

plt.bar(index, satisfaction, bar_width,
                 color='r')
plt.xlabel('Department')
plt.ylabel('Average Satisfaction')
plt.title('Average Job Satisfaction per Department')
plt.xticks(index + bar_width/2, department, rotation=90)
plt.show()

<a id="DataFrame"></a>
## Working with DataFrame
A DataFrame is a two-dimensional data structure organized into rows and named columns.
We will explore the data loaded in df_data_1 DataFrame.  We start from inspecting the data.

### Inspecting Data

In [None]:
# Number of rows in a DataFrame
print('Number of employees:', df_data_1.count())

In [None]:
# The columns list
df_data_1.columns

In [None]:
# Number of columns
len(df_data_1.columns)

In [None]:
# Returns the columns and their data types
df_data_1.printSchema()

In [None]:
# Print the (logical and physical) plans
df_data_1.explain()

In [None]:
#returns first observation
df_data_1.head()

In [None]:
#Returns the first 2 observations
df_data_1.head(2)

In [None]:
#Returns the first 2 rows.  The data is displayed under the column names
df_data_1.show(2)

In [None]:
#summary statistics for 1 variable
df_data_1.describe( 'satisfaction_level').show()

In [None]:
#summary statistics for 2 variables
df_data_1.describe( 'satisfaction_level', 'time_spend_company').show()

In [None]:
#Describe for non-numeric column -  Minimum and maximum is based on ASCI value
df_data_1.describe( 'Department').show()

## <span style="color:red">Graded Exercise 1</span>
Write and run the code to show the summary statistics for average_monthly_hours

### Selecting Data

In [None]:
#Show the specified columns
df_data_1.select('Department', 'satisfaction_level', 'salary', 'Attrition').show(10)

### <span style="color:red">Graded Exercise 2</span>
Write and run the code to select ID, department, salary, Attrition and show the first 5 rows.

In [None]:
# Rename the column in the output.  For instance, abbreviate satisfaction_level as sl 
df_data_1.select('Department', 'satisfaction_level', 'salary', 'Attrition').withColumnRenamed('satisfaction_level', 'sl').show(10)

In [None]:
# List the distinct values.  For instance, list the department names
df_data_1.select('Department').distinct().show()

### <span style="color:red">Graded Exercise 3</span>
Write and run the code to display the distinct salary values.

In [None]:
#Return the number of distinct values in a column.  For instance, return the number of departments
df_data_1.select('Department').distinct().count()

In [None]:
# First 20 employees with the job satisfaction above 9.8
df_data_1.select('Department', 'satisfaction_level', 'salary', 'Attrition').filter(df_data_1['satisfaction_level']>9.8).show()

### <span style="color:red">Graded Exercise 4</span>
Modify and run the code from Graded exercise 2 to return the ID, department, salary, Attrition for the first 5 employees in hr department.

In [None]:
# Department name ends with 'ing'
df_data_1.select('Department', 'satisfaction_level', 'salary', 'Attrition').filter(df_data_1.Department.endswith("ing")).show()

In [None]:
#Satisfaction between 0.1 and 1
df_data_1.select('department', 'satisfaction_level', 'salary', 'Attrition').filter(df_data_1.satisfaction_level.between(0.1, 1)).show()

### Pairwise Frequency

In [None]:
#Pairwise Frequency for categorical variables
df_data_1.crosstab('Department', 'salary').show()

In [None]:
df_data_1.crosstab('attrition', 'salary').show()

In [None]:
df_data_1.crosstab('attrition', 'promotion_last_5years').show()

### GroupBy

In [None]:
#Number of employees in each department
df_data_1.groupby('Department').count().show()

In [None]:
#sort the output by count descending
df_data_1.groupby('Department').count().sort('count', ascending=False).show()

In [None]:
# Average job satisfaction in each department
df_data_1.groupby(['Department'])\
.agg({"satisfaction_level": "AVG"}).show()
# Average job satisfaction in each department per salary level
df_data_1.groupby(['Department', 'salary'])\
.agg({"satisfaction_level": "AVG"}).show()

### <span style="color:red">Graded Exercise 5</span>
Write and run the code to return the average job satisfaction for each salary level.  Display the salary level and corresponding average satisfaction.

### Plot Average Job Satisfaction per department

In [None]:
dasl=df_data_1.groupby(['Department'])\
.agg({"satisfaction_level": "AVG"}).collect()

depart=[]
satisfaction=[]
for row in dasl:
     depart.append(row["Department"])
     satisfaction.append(row["avg(satisfaction_level)"])

In [None]:
%matplotlib inline
import numpy as np
import matplotlib.pyplot as plt

N = 10
index = np.arange(N)  
bar_width = 0.5

plt.bar(index, satisfaction, bar_width,
                 color='b')
plt.xlabel('Department')
plt.ylabel('Average Satisfaction')
plt.title('Average Job Satisfaction per Department')
plt.xticks(index + bar_width/2, depart, rotation=90)
plt.show()


<a id="use_spark_sql"></a>
## Use Spark SQL

`Spark SQL` lets you query structured data, for example, data in a relational table and can be a very powerful tool for performing complex aggregations.

To create a relational table that you can query using `Spark SQL` and fill it with employee data, you'll use the `Row` class from the `pyspark.sql` package. You will use every line in the `df_data_1` DataFrame to create a row object. Each of the row's attributes will be used to access the value of each column.

In [None]:
from pyspark.sql import SQLContext, Row
sqlContext = SQLContext(sc)
df_data_1.createOrReplaceTempView("ea")
sqlContext.cacheTable("ea")

In [None]:
# Column 1 - Department
# Column 2 - Number of employees who worked for the department in column 1 and left
empl1 = sqlContext.sql("SELECT department, COUNT(*) AS numemp FROM ea WHERE Attrition=1 GROUP BY department ORDER BY upper(department)").collect()
for row in empl1:
    print (row)

In [None]:
# Column 1 - Department
# Column 2 - Number of retained employees from the department in column 1
empl0 = sqlContext.sql("SELECT department, COUNT(*) AS numemp FROM ea WHERE Attrition=0 GROUP BY department ORDER BY upper(department)").collect()

In [None]:
for row in empl0:
    print (row)

In [None]:
department=[]
Attrition0=[]
Attrition1=[]
for row in empl0:
     department.append(row.department)
     Attrition0.append(row.numemp)

for row in empl1:
    Attrition1.append(row.numemp)

Use the bar() function to plot a vertical bar chart.

In [None]:
%matplotlib inline
import matplotlib
import numpy as np
import matplotlib.pyplot as plt

N=10
ind=np.arange(N)
width = 0.35
ea0 = plt.bar(ind, Attrition0, width, color='g', label='Retained')
ea1 = plt.bar(ind+width, Attrition1, width, color='y', label='Left')
plt.ylabel('Number of Employees')
plt.xlabel('Department')
plt.title('Number of Retained and Not Retained Employees per Department', fontsize=15)
plt.xticks(ind+width/2, department, rotation='vertical')
plt.legend()
plt.show()

You can create a pie chart with the pie() function. The function does not display legends or label names by default. You can use the input parameters to change the default colors, set legends, labels, the starting angle, shadowing, and whether any section is exploded.

The default starting angle is 0, which starts the first slice on the x-axis. If you set startangle=90, the first slice starts on the positive y-axis.

In [None]:
import matplotlib.pyplot as plt
%matplotlib inline
import numpy as np

data = Attrition1  
labels = department
plt.figure(1, figsize=(6,6))  # make it square
colors = ['yellowgreen', 'gold', 'lightskyblue', 
          'lightcoral', 'green', 'red', 'purple', 'cyan', 'magenta', 'orange']
plt.pie(data, labels=labels, colors= colors, startangle=90)
plt.title('Attrition by Department')
plt.show()

### <span style="color:red">Graded Exercise 6</span>
Use Spark SQL to return <br>
Column 1 - salary level <br>
Column 2 - number of employees with a salary level in column 1

**Navigate to Manage -> Environmental Runtimes and record 1) Total account capacity unit hours used and 2) Remaining account capacity unit hours on the Spark Tutorial answer sheet. 3) Answer the follow up questions on the assignment answer sheet**

### <span style="color:Blue">Ungraded Exercise 1</span>
Use Spark SQL to return <br> 
Column 1 - department name <br>
Column 2 - salary level  <br>
Column 3 - number of retained employees who work for the department in column 1 and have a salary level in column 2 <br>
Sort the retuned data by department name

### <span style="color:Blue">Ungraded Exercise 2</span>
Use Spark SQL to return <br> 
Column 1 - department name <br>
Column 2 - salary level  <br>
Column 3 - attrition<br>
Column 4 - number of employees who work/worked for the department in column 1 with a salary level in column 2 and attrition status in column 3 <br>
Sort the retuned data by department name

### <span style="color:Blue">Ungraded Exercise 3</span>
Use Spark SQL to return <br> 
Column 1 - department name <br>
Column 2 - number of retained employees from department in column 1 <br>
Column 3 - number of employees who left from department in column 1 <br>
Column 4 - total number of employees from department in column 1 <br>
Sort the retuned data by department name