#Homework: Spark SQL

In this homework you will gain a mastery of using Spark SQL. The homework can be run locally or on an EMR cluster.  The current version is for running locally.  

The goal of the homework will be to create a training dataset for a Random Forest Machine learning model. The training data set will contain the monthly number of employees hired by companies in `linkedin.json` and their corresponding closing stock prices over a 10+ year period (1970-2018 `stock_prices.csv`). We will try and predict, based on this data, if the company will have a positive or negative growth in stock in the first quarter of the next year. Who's ready to make some money?

## Notes
Before we begin here are some important notes to keep in mind,

1. You are **required** to use Spark SQL queries to handle the data in the assignment. Mastering SQL is more beneficial than being able to use Spark commands (functions) as it will show up in more areas of programming and data science/analytics than just Spark. Use the following [function list](https://spark.apache.org/docs/latest/api/sql/index.html#) to see all the SQL functions avaliable in Spark.

2. There are portions of this homework that are _very_ challenging. 


In [0]:
%%capture
!apt update
!apt install gcc python-dev libkrb5-dev
!pip install sparkmagic
!pip install pyspark

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql import SQLContext
import json
import urllib.request

from datetime import datetime

try:
    if(spark == None):
        spark = SparkSession.builder.appName('Graphs').getOrCreate()
        sqlContext=SQLContext(spark)
        
except NameError:
    spark = SparkSession.builder.appName('Graphs').getOrCreate()
    sqlContext=SQLContext(spark)
        
from pyspark.sql.types import *

## Step 1: Data Cleaning and Shaping

The data you will use is stored in an S3 bucket, a cloud storage service. You now need to download it onto the nodes of your [EMR cluster](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-what-is-emr.html). 

### Step 1.1: The Stupendous Schema

When loading data, Spark will try to infer the structure. This process is faulty because it will sometimes infer the type incorrectly. JSON documents, like the one we will use, can have nested types, such as: arrays, arrays of dictionaries, dictionaries of dictionaries, etc. Spark's ability to determine these nested types is not reliable, thus you will define a schema for `linkedin.json`.

A schema is a description of the structure of data. You will be defining an explicit schema for `linkedin.json`. In Spark, schemas are defined using a `StructType` object. This is a collection of data types, termed `StructField`s, that specify the structure and variable type of each component of the dataset. For example, suppose we have the following simple JSON object,


```
{
 "student_name": "Leonardo Murri",
 "GPA": 1.4,
 "courses": [
    {"department": "Computer and Information Science",
     "course_id": "CIS 545",
     "semester": "Fall 2018"},
    {"department": "Computer and Information Science",
     "course_id": "CIS 520",
     "semester": "Fall 2018"},
    {"department": "Electrical and Systems Engineering",
     "course_id": "ESE 650",
     "semester": "Spring 2018"}
 ],
 "grad_year": 2019
 }
```

We would define its schema as follows,

```       
schema = StructType([
           StructField("student_name", StringType(), nullable=True),
           StructField("GPA", FloatType(), nullable=True),
           StructField("courses", ArrayType(
                StructType([
                  StructField("department", StringType(), nullable=True),
                  StructField("course_id", StringType(), nullable=True),
                  StructField("semester", StringType(), nullable=True)
                ])
           ), nullable=True),
           StructField("grad_year", IntegerType(), nullable=True)
         ])
```


Each `StructField` has the following structure: `(name, type, nullable)`. The `nullable` flag defines that the specified field may be empty. Your first task is to define the `schema` of `linkedin.json`. 

_Note_: In `linkedin.json` the field `specilities` is spelled incorrectly. This is **not** a typo. 


In [0]:
# TODO: Define [linkedin.json] schema
# YOUR CODE HERE
### BEGIN SOLUTION
schema = StructType([
    StructField("_id", StringType(), nullable=True),
    StructField("education", ArrayType(
      StructType([
          StructField("start", StringType(), nullable=True),
          StructField("major", StringType(), nullable=True),
          StructField("end", StringType(), nullable=True),
          StructField("name", StringType(), nullable=True),
          StructField("desc", StringType(), nullable=True),
          StructField("degree", StringType(), nullable=True)
      ])
    ), nullable=True),
    StructField("group", StructType([
          StructField("affilition", ArrayType(StringType()), nullable=True),
          StructField("member", StringType(), nullable=True)
    ]), nullable=True),
    StructField("name", StructType([
        StructField("family_name", StringType(), nullable=True),
        StructField("given_name", StringType(), nullable=True)
    ]), nullable=True),
    StructField("locality", StringType(), nullable=True),
    StructField("skills", ArrayType(StringType()), nullable=True),
    StructField("industry", StringType(), nullable=True),
    StructField("interval", IntegerType(), nullable=True),
    StructField("experience", ArrayType(
      StructType([
          StructField("org", StringType(), nullable=True),
          StructField("title", StringType(), nullable=True),
          StructField("end", StringType(), nullable=True),
          StructField("start", StringType(), nullable=True),
          StructField("desc", StringType(), nullable=True)
      ])), nullable=True),
    StructField("summary", StringType(), nullable=True),
    StructField("interests", StringType(), nullable=True),
    StructField("overview_html", StringType(), nullable=True),
    StructField("specilities", StringType(), nullable=True),
    StructField("homepage", ArrayType(StringType()), nullable=True),
    StructField("honors", ArrayType(StringType()), nullable=True),
    StructField("url", StringType(), nullable=True),
    StructField("also_view", ArrayType(
      StructType([
          StructField("id", StringType(), nullable=True),
          StructField("url", StringType(), nullable=True)
      ])
    ), nullable=True),
    StructField("events", ArrayType(
      StructType([
          StructField("from", StringType(), nullable=True),
          StructField("to", StringType(), nullable=True),
          StructField("title1", StringType(), nullable=True),
          StructField("start", IntegerType(), nullable=True),
          StructField("title2", StringType(), nullable=True),
          StructField("end", IntegerType(), nullable=True)
      ])), nullable=True)
])
  ### END SOLUTION

### Step 1.2: The Laudable Loading

Load the `linkedin.json` dataset into a Spark dataframe (sdf) called `raw_data_sdf`. If you have constructed `schema` correctly `spark.read.json()` will read in the dataset. ***You do not need to edit this cell***.

In [0]:
import urllib
#url = 'X'
url = 'https://upenn-bigdataanalytics.s3.amazonaws.com/linkedin.zip'
filehandle, _ = urllib.request.urlretrieve(url,filename='local.zip')

In [0]:
import zipfile
zip_file_object = zipfile.ZipFile(filehandle, 'r')
zip_file_object.extractall()

In [0]:
!rm -f local.zip

In [0]:
raw_data_sdf = spark.read.json('linkedin.json', schema=schema)

The cell below shows how to run SQL commands on Spark tables. Use this as a template for all your SQL queries in this notebook. ***You do not need to edit this cell***.

In [0]:
# Create SQL-accesible table
raw_data_sdf.createOrReplaceTempView("raw_data")

# Declare SQL query to be excecuted
query = '''SELECT * 
           FROM raw_data'''

# Save the output sdf of spark.sql() as answer_sdf
answer_sdf = spark.sql(query)

# Display the first 10 rows
answer_sdf.show(10)

+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------+--------------------+--------------------+--------------------+--------------------+--------------------+--------+--------------------+--------------------+--------------------+--------------------+
|               _id|           education|               group|                name|            locality|              skills|            industry|interval|          experience|             summary|           interests|       overview_html|         specilities|homepage|              honors|                 url|           also_view|              events|
+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------+--------------------+--------------------+--------------------+--------------------+--------------------+--------+--------------------+--

### Step 1.3: The Extravagent Extraction

In our training model, we are interested in when individuals began working at a company.  From creating the schema, you should notice that the collection of companies inviduals worked at are contained in the `experience` field as an array of dictionaries. You should use the `org` for the company name and `start` for the start date. Here is an example of an `experience` field,

```
{
   "experience": [
     {
        "org": "The Walt Disney Company", 
        "title" : "Mickey Mouse",
        "end" : "Present",
        "start": "November 1928",
        "desc": "Sailed a boat."
     },
     {
        "org": "Walt Disney World Resort",
        "title": "Mickey Mouse Mascot",
        "start": "January 2005",
        "desc": "Took pictures with kids."
     }
   ]
}
```

Your task is to extract each pair of company and start date from these arrays. In Spark, this is known as "exploding" a row. An explode will seperate the elements of an array into multiple rows.

Create an sdf called `raw_start_dates_sdf` that contains the company and start date for every experience of every individual in `raw_data_sdf`. Drop any row that contains a `null` in either column with `dropna()`. You can sort the elements however you wish (you don't need to if you don't want to). The sdf should look as follows:

```
+--------------------------+---------------+
|org                       |start_date     |
+--------------------------+---------------+
|Walt Disney World Resort  |January 2005   | 
|The Walt Disney Company   |November 1928  |
|...                       |...            |
+--------------------------+---------------+
```

_Hint_: You may want to do two seperate explodes for `org` and `start`. In an explode, the position of the element in the array can be extracted as well, and used to merge two seperate explodes. Reference the [function list](https://spark.apache.org/docs/2.3.0/api/sql/index.html).

_Note_: Some of the entires in `org` are "weird", i.e. made up of non-english letters and characters. Keep them. **DO NOT** edit any name in the original dataframe unless we specify. **DO NOT** drop any row unless there is a `null` value as stated before. This goes for the rest of the homework as well, unless otherwise specified.

In [0]:
# TODO: Create [raw_start_dates_sdf]

  ##YOUR ANSWER HERE
  ### BEGIN SOLUTION

# POSEXPLODE() will explode a specified column of an sdf and return two rows
# corresponding to the index of an element in an array "pos" and the element
# itself.

# Explode the org array in experience
query = '''SELECT _id AS id,
                  POSEXPLODE(experience.org) AS (pos, org)
           FROM raw_data'''

# Save as a SQL-accesible table called "orgs"
spark.sql(query).dropna().createOrReplaceTempView("orgs")

# Explode the start_date array in experience
query = '''SELECT _id AS id,
                  POSEXPLODE(experience.start) AS (pos, start_date)
           FROM raw_data'''
spark.sql(query).dropna().createOrReplaceTempView("start_dates")

# Join each explode based on a person's id and the index of each element in the
# original array
query = '''SELECT o.org AS org,
                  s.start_date AS start_date
           FROM orgs AS o
           JOIN start_dates AS s
           ON o.id = s.id AND o.pos = s.pos'''

# Define and save raw_start_dates_sdf
raw_start_dates_sdf = spark.sql(query)
raw_start_dates_sdf.createOrReplaceTempView("raw_start_dates")
  ### END SOLUTION

In [0]:
raw_start_dates_sdf.show(4)

+--------------------+--------------+
|                 org|    start_date|
+--------------------+--------------+
|             Spot.us|September 2009|
|Pfizer Ltd. (WPO ...|          2005|
|              ALSTOM|          2008|
|DSM i-Nutrition B...|  January 2009|
+--------------------+--------------+
only showing top 4 rows



### Step 1.4: The Fortuitous Formatting

There are two issues with the values in our `date` column. First, the values are saved as strings, not datetime types. This keeps us from running functions such as `ORDER BY` or `GROUP BY` on common months or years. Second, some values do not have both month and year information or are in other languages. Your task is to filter out and clean the `date` column. We are interested in only those rows that have date in the following format "(month_name) (year)", e.g. "October 2010".

Create an sdf called `filtered_start_dates_sdf` from `raw_start_dates_sdf` with the `date` column filtered in the manner above. Keep only those rows with a start date between January 2000 to December 2011, inclusive. Ensure that any dates that are not in our desired format are ommitted. Drop any row that contains a `null` in either column. The format of the sdf is shown below:
```
+--------------------------+---------------+
|org                       |start_date     |
+--------------------------+---------------+
|Walt Disney World Resort  |2005-01-01     | 
|...                       |...            |
+--------------------------+---------------+
```
_Hint_: Refer to the [function list](https://spark.apache.org/docs/2.3.0/api/sql/index.html) to format the `date` column. In Spark SQL the date format we are interested in is `"MMM y"`.

_Note_: Spark will return the date in the format above, with the day as `01`. This is ok, since we are interested in the month and year each individual began working and all dates will have `01` as their day.

In [0]:
# TODO: Create [filtered_start_dates_sdf]

## YOUR ANSWER HERE
### BEGIN SOLUTION
# TO_DATE() will convert a string to a datetime object. The string's format,
# i.e. "MMM y" must be provided. Any string that does not have the specified
# format will be returned as null.

# Use TO_DATE() to convert the start_date column from a string to a datetime
# object. Keep only dates that are between January 2000 and December 2011,
# inclusive.
query = '''SELECT org,
                  TO_DATE(start_date, "MMM y") AS start_date
           FROM raw_start_dates
           WHERE TO_DATE(start_date, "MMM y") >= "2000-01-01"
                 AND TO_DATE(start_date, "MMM y") <= "2011-12-01"'''

# Define and save filtered_start_dates_sdf
filtered_start_dates_sdf = spark.sql(query)
filtered_start_dates_sdf.createOrReplaceTempView("filtered_start_dates")
### END SOLUTION

### Step 1.5 The Gregarious Grouping

We now want to collect the number of individuals that started in the same month and year for each company. Create an sdf called `start_dates_sdf` that has the total number of employees who began working at the same company on the same start date. The format of the sdf is shown below:

```
+--------------------------+---------------+---------------+
|org                       |start_date     |num_employees  |
+--------------------------+---------------+---------------+
|Walt Disney World Resort  |2005-01-01     |1              |
|...                       |...            |...            |
+--------------------------+---------------+---------------+
```

In [0]:
# TODO: Create [start_dates_sdf]

## YOUR ANSWER HERE
### BEGIN SOLUTION
# GROUP BY on org and start_date, in that order.
query = '''SELECT org,
                  start_date,
                  COUNT(*) AS num_employees
           FROM filtered_start_dates
           GROUP BY org, start_date'''

# Define and save start_dates_sdf
start_dates_sdf = spark.sql(query)
start_dates_sdf.createOrReplaceTempView("start_dates")
### END SOLUTION

## Step 2: Hiring Trends Analysis

Now we will analyze `start_dates_sdf` to find monthly and annual hiring trends.

### Step 2.1: The Marvelous Months

Your task is to answer the question: "On average, what month do most employees start working?" Create an sdf called `monthly_hires_sdf` which contains the total number of employees that started working on a specific month, at any company and on any year. The `month` column should be of type `int`, i.e. 1-12. The format of the sdf is shown below:

```
+---------------+---------------+
|month          |num_employees  |
+---------------+---------------+
|1              |...            |
|2              |...            |
|3              |...            |
|...            |...            |
+---------------+---------------+
```

Find the month in which the most employees start working and save its number as an integer to the variable `most_common_month`.

_Hint_: Be careful. The start dates we have right now have both month and year. We only want the common months. See if you can find something in the [function list](https://spark.apache.org/docs/2.3.0/api/sql/index.html) that will help you do this.

In [0]:
# TODO: Create [monthly_hire_sdf] and find the most common month people were
# hired. Save its number as an integer to [most_common_month]

## YOUR ANSWER HERE
### BEGIN SOLUTION
# MONTH() will return the month for any datetime object.

# GROUP BY on the month of start_date using MONTH(). Sum the num_employees
# column to find the total number of employees that started on that month
query = '''SELECT MONTH(start_date) AS month,
                  SUM(num_employees) AS num
           FROM start_dates
           GROUP BY MONTH(start_date)'''

# Define and save monthly_hires_sdf
monthly_hires_sdf = spark.sql(query)
monthly_hires_sdf.createOrReplaceTempView("monthly_hires")

most_common_month = 1
### END SOLUTION

### Step 2.2: The Preposterous Percentages

The next question we will answer is "What is the percentage change in hires between 2010 and 2011 for each company?" Create an sdf called `percentage_change_sdf` that has the percentage change between 2010 and 2011 for each company. The sdf should look as follows:

```
+---------------------------+--------------------+
|org                        |percentage_change   |
+---------------------------+--------------------+
|Walt Disney World Resort   |12.3                |
|...                        |...                 |
+---------------------------+--------------------+
```

_Note_: A percentage change can be positive or negative depending 
on the difference between the two years.The formula for percent change is given below,

$$\text{% change} = \frac{P_f-P_i}{P_f} \times 100$$

Here, $P_f$ is the final element (in this case the number of hires in 2011) and $P_i$ is initial element (the number of hires in 2010).

_Hint_: This is a **difficult** question. We recommend using a combination of `GROUP BY` and `JOIN`. Keep in mind that operations between columns in SQL dataframes are often easier than those between rows. 

In [0]:
# TODO: Create [percentage_change_sdf]

## YOUR ANSWER HERE
### BEGIN SOLUTION
# YEAR() will return the year of a datetime object.

# The column percentage_change is calculated by doing a JOIN. We join an sdf
# "y1" created by doing a GROUP BY on start_dates by org and the year of
# start_date where the year is 2010. We sum the number of employees. That sdf is
# joined with a similar sdf, "y2", but for the case when year is 2011. Then,
# using the formula in the description above we find the percentage_change of
# the two sum columns.
query = '''SELECT y1.org AS org,
                  (y2.num_employees-y1.num_employees)/(y2.num_employees)*100
                      AS percentage_change
           FROM (SELECT org,
                        YEAR(start_date) AS year,
                        SUM(num_employees) AS num_employees
                 FROM start_dates
                 WHERE YEAR(start_date) = 2010
                 GROUP BY org, YEAR(start_date)) AS y1
           JOIN (SELECT org,
                        YEAR(start_date) AS year,
                        SUM(num_employees) AS num_employees
                 FROM start_dates
                 WHERE YEAR(start_date) = 2011
                 GROUP BY org, YEAR(start_date)) AS y2
           ON y1.org = y2.org'''

# Define and save percentage_change_sdf
percentage_change_sdf = spark.sql(query)
percentage_change_sdf.createOrReplaceTempView("percentage_change")
### END SOLUTION

## Step 3: Formatting the Training Data


Our overaching goal is to train a machine learning (ML) model that will use the monthly hiring trends of a company to predict a positive or negative gain in the company's stock in the first quarter of the following year. A ML model is trained on a set of observations. Each observation contains a set of features, `X`, and a label, `y`. The goal of the ML model is to create a function that takes any `X` as an input and outputs a predicted `y`. 

The machine learning model we will use is a [Random Forest Classifier](https://builtin.com/data-science/random-forest-algorithm). Each observation we will pass in will have 24 features (columns). These are the number of people hired from Jan to Dec and the company stock price on the last day of each month. The label will be the direction of the company's stock percentage change (positive, `1`, or negative, `-1`) in the first quarter of the following year. Each observation will correspond to a specified company's trends on a specified year. The format of our final training sdf is shown below. The first 26 columns define our observations, `X`, and the last column the label, `y`.
```
+----+-----+----------+---------+----------+----------+---------+----------+-------------+
|org |year |jan_hired |   ...   |dec_hired |jan_stock |   ...   |dec_stock |stock_result |
+----+-----+----------+---------+----------+----------+---------+----------+-------------+
|IBM |2008 |...       |   ...   |...       |...       |   ...   |...       |1            |
|IBM |2009 |...       |   ...   |...       |...       |   ...   |...       |-1           |
|... |...  |...       |   ...   |...       |...       |   ...   |...       |...          |
+----+-----+----------+---------+----------+----------+---------+----------+-------------+
```

_Note_: We will use the first three letters of each month in naming, i.e. `jan, feb, mar, apr, may, jun, jul, aug, sep, oct, nov, dec`



### Step 3.1: The Harmonious Hires

Your first task is to create the first half of the training table, i.e. the `jan_hired` through `dec_hired` columns. This will involve reshaping `start_dates_sdf`. Currently, `start_dates_sdf` has columns `org`, `start_date`, and `num_employees`. We want to group the rows together based on common `org` and years and create new columns for the number of employees that started working in each month of that year.

Create an sdf called `raw_hirings_for_training_sdf` that has for a single company and a single year, the number of hires in Jan through Dec, and the total number of hires that year. Note that for each company you will have several rows corresponding to years between 2000 and 2011. It is ok if for a given company you don't have a given year. However, ensure that for a given company and given year, each month column has an entry, i.e. if no one was hired the value should be `0`. The format of the sdf is shown below: 
```
+----+-----+----------+---------+----------+----------+
|org |year |jan_hired |   ...   |dec_hired |total_num |
+----+-----+----------+---------+----------+----------+
|IBM |2008 |...       |   ...   |...       |...       |
|IBM |2009 |...       |   ...   |...       |...       |
|... |...  |...       |   ...   |...       |...       |
+----+-----+----------+---------+----------+----------+
```
_Hint_: This is a **difficult** question. The tricky part is creating the additional columns of monthly hires, specifically when there are missing dates. In our dataset, if a company did not hire anybody in a given date, it will not appear in `start_dates_sdf`. We suggest you look into `CASE` and `WHEN` statements in the [function list](https://spark.apache.org/docs/2.3.0/api/sql/index.html).

In [0]:
# TODO: Create [raw_hire_train_sdf]

## YOUR SOLUTION HERE
### BEGIN SOLUTION
# CASE() statements are SQL's equivalent of if else statements. WHEN a CASE is
# true THEN we define a function. ELSE we do another function and then END the
# statement.

# The query is a GROUP BY. We group data based on the same company and year, as
# in the previous step. We then do a CASE statement. This will seperate out the
# sets of data corresponding to the same month using MONTH() in the WHEN clause.
# If we have a piece of data, it will be the number of employees that started
# working at a given company on a given year and a given month and we will save
# it with a corresponding column name. If there is no piece of data here, as per
# the question, we need to add a 0. This is the ELSE clause. Lastly, we do a
# SUM() to find total_num
query = '''SELECT org, 
                  YEAR(start_date) AS year,
                  MAX((CASE WHEN (MONTH(start_date) == 1) 
                                  THEN num_employees
                                  ELSE 0 END)) AS jan_hired,
                  MAX((CASE WHEN (MONTH(start_date) == 2)
                                  THEN num_employees
                                  ELSE 0 END)) AS feb_hired,
                  MAX((CASE WHEN (MONTH(start_date) == 3)
                                  THEN num_employees
                                  ELSE 0 END)) AS mar_hired,
                  MAX((CASE WHEN (MONTH(start_date) == 4)
                                  THEN num_employees
                                  ELSE 0 END)) AS apr_hired,
                  MAX((CASE WHEN (MONTH(start_date) == 5)
                                  THEN num_employees 
                                  ELSE 0 END)) AS may_hired,
                  MAX((CASE WHEN (MONTH(start_date) == 6)
                                  THEN num_employees
                                   ELSE 0 END)) AS jun_hired,
                  MAX((CASE WHEN (MONTH(start_date) == 7)
                                  THEN num_employees 
                                  ELSE 0 END)) AS jul_hired,
                  MAX((CASE WHEN (MONTH(start_date) == 8)
                                  THEN num_employees
                                  ELSE 0 END)) AS aug_hired,
                  MAX((CASE WHEN (MONTH(start_date) == 9)
                                  THEN num_employees
                                  ELSE 0 END)) AS sep_hired,
                  MAX((CASE WHEN (MONTH(start_date) == 10)
                                  THEN num_employees
                                  ELSE 0 END)) AS oct_hired,
                  MAX((CASE WHEN (MONTH(start_date) == 11)
                                  THEN num_employees
                                  ELSE 0 END)) AS nov_hired,
                  MAX((CASE WHEN (MONTH(start_date) == 12)
                                  THEN num_employees
                                  ELSE 0 END)) AS dec_hired,
                  SUM(num_employees) AS total_num
           FROM start_dates
           GROUP BY org, YEAR(start_date)'''

# Define and save raw_hire_train_sdf
raw_hire_train_sdf = spark.sql(query)
raw_hire_train_sdf.createOrReplaceTempView("raw_hire_train")
### END SOLUTION

### Step 3.2: The Formidable Filters

Create an sdf called `hire_train_sdf` that contains all the observations in `raw_hire_train_sdf` with `total_num` greater than or equal to 500. The format of the sdf is shown below:

```
+----+-----+----------+---------+----------+----------+
|org |year |jan_hired |   ...   |dec_hired |total_num |
+----+-----+----------+---------+----------+----------+
|IBM |2008 |...       |   ...   |...       |...       |
|IBM |2009 |...       |   ...   |...       |...       |
|... |...  |...       |   ...   |...       |...       |
+----+-----+----------+---------+----------+----------+
```


In [0]:
# TODO: Create [hire_train_sdf]

##YOUR SOLUTION HERE
### BEGIN SOLUTION
# Keep all rows where total_num >= 500
query = '''SELECT *
           FROM raw_hire_train
           WHERE total_num >= 500'''

# Define and save hire_train_sdf
hire_train_sdf = spark.sql(query)
hire_train_sdf.createOrReplaceTempView("hire_train")
### END SOLUTION

### Step 3.3: The Stupendous Stocks

Now we are ready for the stock data. The stock data we will use is saved in the same S3 bucket as `linkedin.json`. Load the data into the EMR cluster. Run the cell below. ***You do not need to edit this cell***.

In [0]:
from google.colab import drive
drive.mount('/content/gdrive')

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&response_type=code&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly

Enter your authorization code:
··········
Mounted at /content/gdrive


In [0]:
# Load stock data
raw_stocks_sdf = spark.read.format("csv") \
              .option("header", "true") \
              .load("./gdrive/My Drive/Colab Notebooks/stock_prices.csv")
              # .load("s3a://545emr/stock_prices.csv")

# Creates SQL-accesible table
raw_stocks_sdf.createOrReplaceTempView('raw_stocks')

# Display the first 10 rows
query = '''SELECT *
           FROM raw_stocks'''
spark.sql(query).show(10)


+------+-----------------+----------+
|ticker|    closing_price|      date|
+------+-----------------+----------+
|   AHH| 8.49315452575684|2013-05-08|
|   AHH| 8.47115135192871|2013-05-09|
|   AHH| 8.50782203674316|2013-05-10|
|   AHH| 8.54449367523193|2013-05-13|
|   AHH|8.456483840942381|2013-05-14|
|   AHH| 8.50782203674316|2013-05-15|
|   AHH| 8.61050128936768|2013-05-16|
|   AHH|8.625171661376951|2013-05-17|
|   AHH| 8.60316944122314|2013-05-20|
|   AHH|8.676511764526369|2013-05-21|
+------+-----------------+----------+
only showing top 10 rows



Run the cell below to see the types of the columns in our data frame. These are not correct. We could have defined a schema when reading in data but we will handle this issue in another manner. You will do this in Step 3.4.2.

In [0]:
# Print types of SDF
raw_stocks_sdf.dtypes

[('ticker', 'string'), ('closing_price', 'string'), ('date', 'string')]

### Step 3.4 The Clairvoyant Cleaning

We now want to format the stock data set into the second half of the training table. We will then merge it with `hire_train` based off the common `org` and `year` fields.

#### Step 3.4.1 The Ubiquitous UDF

The companies in our stock dataset are defined by their stock tickers. Thus, we would not be able to merge it with the `org` field in `hire_train_sdf`. We must convert them to that format. Often times when using Spark, there may not be a built-in SQL function that can do the operation we desired. Instead, we can create one on our own with a user-defined function (udf).

A udf is defined as a normal Python function and then registered to be used as a Spark SQL function. Your task is to create a udf, `TICKER_TO_NAME()` that will convert the ticker field in `raw_stocks` to the company's name. This will be done using the provided `ticker_to_name_dict` dictionary. We are only interested in the companies in that dictionary.

Fill out the function `ticker_to_name()` below. Then use `spark.udf.register()` to register it as a SQL function. The command is provided. ***You do not need to edit it***. Note, we have defined the udf as returning `StringType()`. Ensure that your function returns this. You must also deal with any potential `null` cases.

In [0]:
# TODO: Fill out [ticker_to_name()] and register it as a udf.

## YOUR SOLUTION HERE
### BEGIN SOLUTION
# Dictionary linking stock ticker's to their name
ticker_to_name_dict = {'NOK': 'Nokia',
                       'UN': 'Unilever',
                       'BP': 'BP',
                       'JNJ': 'Johnson & Johnson',
                       'TCS': 'Tata Consultancy Services',
                       'SLB': 'Schlumberger',
                       'NVS': 'Novartis',
                       'CNY': 'Huawei',
                       'PFE': 'Pfizer',
                       'ACN': 'Accenture',
                       'DELL': 'Dell',
                       'MS': 'Morgan Stanley',
                       'ORCL': 'Oracle',
                       'BAC': 'Bank of America',
                       'PG': 'Procter & Gamble',
                       'CGEMY': 'Capgemini',
                       'GS': 'Goldman Sachs',
                       'C': 'Citi',
                       'IBM': 'IBM',
                       'CS': 'Credit Suisse',
                       'MDLZ': 'Kraft Foods',
                       'WIT': 'Wipro Technologies',
                       'CSCO': 'Cisco Systems',
                       'PWC': 'PwC',
                       'GOOGL': 'Google',
                       'CTSH': 'Cognizant Technology Solutions',
                       'HSBC': 'HSBC',
                       'DB': 'Deutsche Bank',
                       'MSFT': 'Microsoft',
                       'HPE': 'Hewlett-Packard',
                       'ERIC': 'Ericsson',
                       'BCS': 'Barclays Capital',
                       'GSK': 'GlaxoSmithKline'}

# Fill out ticker_to_name()

# In UDFs we have to cover all possible output cases, or else the function will
# crash. Specifically, this means we need to handle the case when "ticker" is
# not in "ticker_to_name_dict". We use a try and except statement to return null
# for this case.
def ticker_to_name(ticker):
  try:
    return ticker_to_name_dict[ticker]
  except:
    return None

### END SOLUTION
# Register udf as a SQL function. DO NOT EDIT
spark.udf.register("TICKER_TO_NAME", ticker_to_name, StringType())


<function __main__.ticker_to_name>

#### Step 3.4.2: The Fastidious Filters

With our new `TICKER_TO_NAME()` function we will begin to wrangle `raw_stocks_sdf`.

Create an sdf called `filter_1_stocks_sdf` as follows. Convert all the ticker names in `raw_stocks_sdf` to the company names and save it as `org`. Next, convert the `date` field to a datetime type. As explained before this will help order and group the rows in future steps. Then, convert the type of the values in `closing_price` to `float`. This will take care of the `dtypes` issue we saw in Step 3.3.

Drop any company names that do not appear in `ticker_to_name_dict`. Keep any date between January 1st 2001 and December 4th 2012 inclusive, in the format shown below (note this is a datetime object not a string):

```
+----+------------+--------------+
|org |date        |closing_price |
+----+------------+--------------+
|IBM |2000-01-03  |...           |
|... |...         |...           |
+----+------------+--------------+
```
_Hint_: You will use a similar function to filter the dates as in Step 1.4. In Spark SQL the format for the `date` field in `raw_stocks_sdf` is `"yyyy-MM-dd"`.

In [0]:
# TODO: Create [filter_1_stocks_sdf]

## YOUR SOLUTION HERE
### BEGIN SOLUTION
# Format the "ticker" column using our UDF, TICKER_TO_NAME. Use TO_DATE() to
# convert the string date column to datetime object and filter on this in the
# same way as Step 1.4
query = '''SELECT TICKER_TO_NAME(ticker) AS org,
                  TO_DATE(date, "yyyy-MM-dd") AS date,
                  CAST(closing_price AS FLOAT)
           FROM raw_stocks
           WHERE TO_DATE(date, "yyyy-MM-dd") >= "2001-01-01"
                 AND TO_DATE(date, "yyyy-MM-dd") <= "2012-12-04"'''

# Define and save filter_1_stocks_sdf
filter_1_stocks_sdf = spark.sql(query).dropna()
filter_1_stocks_sdf.createOrReplaceTempView("filter_1_stocks")
### END SOLUTION

#### Step 3.4.3: The Magnanimous Months

The data in `filter_1_stocks_sdf` gives closing prices on a daily basis. Since we are interested in monthly trends, we will only keep the closing price on the **last trading day of each month**.

Create an sdf `filter_2_stocks_sdf` that contains only the closing prices for the last trading day of each month. Note that a trading day is not simply the last day of each month, as this could be on a weekend when the market is closed . The format of the sdf is shown below:

```
+----+------------+--------------+
|org |date        |closing_price |
+----+------------+--------------+
|IBM |2000-01-31  |...           |
|... |...         |...           |
+----+------------+--------------+
```

  _Hint_: It may be helpful to create an intermediate dataframe that will help you filter out the specific dates you desire.

In [0]:
# TODO: Create [filter_2_stocks_sdf]

## YOUR SOLUTION HERE
### BEGIN SOLUTION
# Create sdf that has for each company, the closing day for each month. We need
# to preform a GROUP BY on three features, org, YEAR(date), and MONTH(date).
# This will give us aggregations of the closing stock price for every day of a
# specified month and a specified year. Since these are all datetime objects,
# taking MAX() will give us the highest, i.e. last, one.
query = '''SELECT org,
                  MAX(date) AS close_date
           FROM filter_1_stocks
           GROUP BY org, YEAR(date), MONTH(date)'''

# Save as a temporary sdf, desired_months
spark.sql(query).createOrReplaceTempView("desired_months")

# Merge desired_months with filter_1_stocks. This will allow us to keep the
# closing prices for only those dates that were the closing date for a given
# month.
query = '''SELECT f.org AS org,
                  m.close_date AS date,
                  f.closing_price AS closing_price
           FROM filter_1_stocks AS f
           JOIN desired_months AS m
           ON f.org = m.org AND f.date = m.close_date'''

# Define and save filter_2_stocks_sdf
filter_2_stocks_sdf = spark.sql(query)
filter_2_stocks_sdf.createOrReplaceTempView("filter_2_stocks")
### END SOLUTION

#### Step 3.4.4: The Rambunctious Reshape

Now, we will begin to shape our dataframe into the format of the final training sdf.

Create an sdf `filter_3_stocks_sdf` that has for a single company and a single year, the closing stock price for the last trading day of each month in that year. This is similar to the table you created in Step 3.1. In this case since we cannot make a proxy for the closing price if the data is not avaliable, drop any rows containing any `null` values, in any column. The format of the sdf is shown below:

```
+----+-----+----------+---------+----------+
|org |year |jan_stock |   ...   |dec_stock |
+----+-----+----------+---------+----------+
|IBM |2008 |...       |   ...   |...       |
|IBM |2009 |...       |   ...   |...       |
|... |...  |...       |   ...   |...       |
+----+-----+----------+---------+----------+
```


In [0]:
# TODO: Create [filter_3_stocks_sdf]

## YOUR SOLUTION HERE
### BEGIN SOLUTION
# We will do the same operation we did in Step 3.1. In this case, however, as
# the question specifies, any missing entry in a given month are set to null.
query = '''SELECT org, 
                  YEAR(date) AS year,
                  MAX((CASE WHEN (MONTH(date) == 1) 
                                  THEN closing_price
                                  ELSE NULL END)) AS jan_stock,
                  MAX((CASE WHEN (MONTH(date) == 2)
                                  THEN closing_price
                                  ELSE NULL END)) AS feb_stock,
                  MAX((CASE WHEN (MONTH(date) == 3)
                                  THEN closing_price
                                  ELSE NULL END)) AS mar_stock,
                  MAX((CASE WHEN (MONTH(date) == 4)
                                  THEN closing_price
                                  ELSE NULL END)) AS apr_stock,
                  MAX((CASE WHEN (MONTH(date) == 5)
                                  THEN closing_price 
                                  ELSE NULL END)) AS may_stock,
                  MAX((CASE WHEN (MONTH(date) == 6)
                                  THEN closing_price
                                   ELSE NULL END)) AS jun_stock,
                  MAX((CASE WHEN (MONTH(date) == 7)
                                  THEN closing_price 
                                  ELSE NULL END)) AS jul_stock,
                  MAX((CASE WHEN (MONTH(date) == 8)
                                  THEN closing_price
                                  ELSE NULL END)) AS aug_stock,
                  MAX((CASE WHEN (MONTH(date) == 9)
                                  THEN closing_price
                                  ELSE NULL END)) AS sep_stock,
                  MAX((CASE WHEN (MONTH(date) == 10)
                                  THEN closing_price
                                  ELSE NULL END)) AS oct_stock,
                  MAX((CASE WHEN (MONTH(date) == 11)
                                  THEN closing_price
                                  ELSE NULL END)) AS nov_stock,
                  MAX((CASE WHEN (MONTH(date) == 12)
                                  THEN closing_price
                                  ELSE NULL END)) AS dec_stock
           FROM filter_2_stocks
           GROUP BY org, YEAR(date)
           ORDER BY org, year ASC'''

# Define and save filter_3_stocks_sdf
filter_3_stocks_sdf = spark.sql(query).dropna()
filter_3_stocks_sdf.createOrReplaceTempView("filter_3_stocks")

### END SOLUTION

#### Step 3.4.5: The Decisive Direction

The final element in our training set is the binary output for each case, i.e. the `y` label. 

Create an sdf `stocks_train_sdf` from `filter_3_stocks_sdf` with an additional column `direction`. This should be the direction of percentage change in the closing stock price, i.e. `1` for positive or `-1` for negative, in the first quarter of a given year. The quarter of a year begins in January and ends in April, inclusive. We want to know the percent change between these two months. Reference Step 2.2 for the percent change formula. The format of the sdf is shown below:

```
+----+-----+----------+---------+----------+-------------+
|org |year |jan_stock |   ...   |dec_stock |direction    |
+----+-----+----------+---------+----------+-------------+
|IBM |2008 |...       |   ...   |...       |1.0          |
|IBM |2009 |...       |   ...   |...       |-1.0         |
|... |...  |...       |   ...   |...       |...          |
+----+-----+----------+---------+----------+-------------+
```

In [0]:
# TODO: Create [stocks_train_sdf]

 ## YOUR SOLUTION HERE
 ### BEGIN SOLUTION
# SIGN() will return -1 if the input is negative, 0 if the input is zero, and 1
# if the input is positive.

# Keep all rows in filter_3_stocks and add another based on the sign of the
# percentage change in stock
query = '''SELECT *,
                  SIGN(apr_stock-jan_stock) AS direction
           FROM filter_3_stocks'''

# Define and save stocks_train_sdf
stocks_train_sdf = spark.sql(query)
stocks_train_sdf.createOrReplaceTempView("stocks_train")
### END SOLUTION

### Step 3.5: The Capricious Combination

Now that we have individually created the two halfs of our training data we will merge them together to create the final training sdf we showed in the beginning of Step 3.

Create an sdf called `training_sdf` in the format of the one shown at the beginning of Step 3. Note that in our definition for the `stock_result` column, the `stock_result` value for a particular year corresponds to the direction of the stock percentage change in the **following** year. For example, the stock_result in the `2008` row for `IBM` will contain the direction of IBM's stock in the first quarter of 2009. The format of the sdf is shown below:
```
+----+-----+----------+---------+----------+----------+---------+----------+-------------+
|org |year |jan_hired |   ...   |dec_hired |jan_stock |   ...   |dec_stock |stock_result |
+----+-----+----------+---------+----------+----------+---------+----------+-------------+
|IBM |2008 |...       |   ...   |...       |...       |   ...   |...       |-1.0         |
|IBM |2009 |...       |   ...   |...       |...       |   ...   |...       |1.0          |
|... |...  |...       |   ...   |...       |...       |   ...   |...       |...          |
+----+-----+----------+---------+----------+----------+---------+----------+-------------+
```

In [0]:
# TODO: Create [training_sdf]

## YOUR SOLUTION HERE
### BEGIN SOLUTION
# Our merge will consist of two joins. The first will use filter_3_stocks to
# join the monthly hiring rates and closing prices. The next join will be with
# stock_train and to find stock_result. This join will be done such that the
# correct years are matched between hire_train and stocks_train
query = '''SELECT t1.*,
                  s.direction AS stock_result
           FROM (SELECT h.org,
                        h.year,
                        h.jan_hired,
                        h.feb_hired,
                        h.mar_hired,
                        h.apr_hired,
                        h.may_hired,
                        h.jun_hired,
                        h.jul_hired,
                        h.aug_hired,
                        h.sep_hired,
                        h.oct_hired,
                        h.nov_hired,
                        h.dec_hired,
                        f.jan_stock,
                        f.feb_stock,
                        f.mar_stock,
                        f.apr_stock,
                        f.may_stock,
                        f.jun_stock,
                        f.jul_stock,
                        f.aug_stock,
                        f.sep_stock,
                        f.oct_stock,
                        f.nov_stock,
                        f.dec_stock
                 FROM hire_train AS h
                 JOIN filter_3_stocks AS f
                 ON h.org = f.org AND h.year = f.year) AS t1
           JOIN stocks_train AS s
           ON t1.org = s.org AND s.year = (t1.year + 1)'''

# Define and save training_sdf
training_sdf = spark.sql(query)
training_sdf.createOrReplaceTempView("training")
### END SOLUTION