## Step 0: Set up EMR

Follow the [AWS Academy Getting Started](https://drive.google.com/file/d/1kWReqxb5hfEH3CA-dQYUKWsXsEOuRZGF/view) instructions.

Move on to Step 0.1 after you have completed all the steps in the document.

![ACME GIANT RUBBER BAND](https://pbs.twimg.com/media/DRqbJh7UMAE2z4o?format=jpg&name=4096x4096)


### 0.1: The Superfluous Setup

Run the following two cells. These will allow your colab notebook to connect to and use your EMR.

In [None]:
#%%capture
!apt install libkrb5-dev
!pip install sparkmagic
!pip install -i https://test.pypi.org/simple/ penn-grader==0.5.0

Reading package lists... Done
Building dependency tree       
Reading state information... Done
libkrb5-dev is already the newest version (1.17-6ubuntu4.3).
libkrb5-dev set to manually installed.
0 upgraded, 0 newly installed, 0 to remove and 23 not upgraded.
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting sparkmagic
  Downloading sparkmagic-0.20.4.tar.gz (44 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m44.3/44.3 KB[0m [31m1.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting hdijupyterutils>=0.6
  Downloading hdijupyterutils-0.20.4.tar.gz (5.1 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting autovizwidget>=0.6
  Downloading autovizwidget-0.20.4.tar.gz (9.0 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting requests_kerberos>=0.8.0
  Downloading requests_kerberos-0.14.0-py2.py3-none-any.whl (11 kB)
Collecting nest_a

In [None]:
%load_ext sparkmagic.magics

### 0.2: The Sharp Spark

Now, connect your notebook to the EMR cluster you created. In the first cell, copy the link to the Master Public DNS specified in the setup document. You will need to add `http://` to the beginning of the address and the auth details to the end.

For example, if my DNS (directly from the AWS EMR console) is `ec2-3-15-237-211.us-east-2.compute.amazonaws.com` my address would be,

`http://ec2-3-15-237-211.us-east-2.compute.amazonaws.com -a cis545-livy -p password1 -t Basic_Access`

Insert this in the `# TODO # below`. For our example, the cell would read,

```
%spark add -s spark_session -l python -u http://ec2-3-15-237-211.us-east-2.compute.amazonaws.com -a cis545-livy -p password1 -t Basic_Access
```

In [None]:
# TODO: Copy the line above, enter your Master Public DNS with the proper formatting and host, and update the password

%spark add -s spark_session -l python -u 		http://ec2-3-91-61-35.compute-1.amazonaws.com -a cis545-livy -p satyajeet -t Basic_Access


Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1,application_1679433775678_0002,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


In [None]:
# If you ever need to restart, you may need to...
#%spark delete -s spark_session
#OR just factory reset runtime under the runtime tab
# %spark delete -s spark_session

## Step 1: Data Wrangling, Cleaning, and Shaping

In this homework we will be working with two datasets - (1) LinkedIn data containing information on their users like education, experience, industry etc. (2) Stock price information of companies over a 10 year period (2000-2011) where these users have worked at.  


The data you will use is stored in an S3 bucket, a cloud storage service. Below, with our help, you will download it onto the nodes of your [EMR cluster](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-what-is-emr.html).

### 1.1: The Stupendous Schema

When loading data, Spark will try to infer its structure on its own. This process is faulty because Spark will sometimes infer the type incorrectly. Spark's ability to determine types is not reliable, thus you will need to define a schema for both the LinkedIn and Stock Prices Datasets.

A schema is a description of the structure of data. In Spark, schemas are defined using a `StructType` object. This is a collection of data types, termed `StructField`'s, that specify the structure and variable type of each component of the dataset. For example, suppose we have the following simple JSON object,


```
{
 "student_name": "Alpha Beta",
 "GPA": 3.6,
 "courses": [
    {"department": "Computer and Information Science",
     "course_id": "CIS 5450",
     "semester": "Fall 2021"},
    {"department": "Computer and Information Science",
     "course_id": "CIS 5550",
     "semester": "Fall 2021"}
 ],
 "grad_year": 2023
 }
```

We would define its schema as follows,

```       
schema = StructType([
           StructField("student_name", StringType(), nullable=True),
           StructField("GPA", FloatType(), nullable=True),
           StructField("courses", ArrayType(
                StructType([
                  StructField("department", StringType(), nullable=True),
                  StructField("course_id", StringType(), nullable=True),
                  StructField("semester", StringType(), nullable=True)
                ])
           ), nullable=True),
           StructField("grad_year", IntegerType(), nullable=True)
         ])
```


Each `StructField` has the following structure: `(name, type, nullable)`. The `nullable` flag defines that the specified field may be empty. Your first task is to define the `schema` of `linkedin_small_real.json`. A smaller version of the JSON dataset can be found [here](https://drive.google.com/a/seas.upenn.edu/file/d/1yZ_0xz6uSJ8lAxhGzn2BVjCpDOjagcqb/view?usp=sharing). Look at how this json dataset looks, the fields and their types.



You will now be defining an explicit schema for the `linkedin_small_real.json` dataset. We have defined most of the fields so you can compare how the schema looks with the JSON dataset. Your Task will be to define the schema for the following 3 fields - `name, experience, events`.

Make sure to use `nullable=True` for all the fields as well as **store dates as a StringType()**.



In [None]:
%%spark

from pyspark.sql.types import *

# TODO: Finish defining the linkedin_small_real.json schema
# We've provided most of the fiddly details, but you'll
# need to fill in **name**, ** experience ** and **events** fields!

schema = StructType([
    StructField("_id", StringType(), nullable=True),

    StructField("education", ArrayType(
        StructType([
          StructField("start", StringType(), nullable=True),
          StructField("major", StringType(), nullable=True),
          StructField("end", StringType(), nullable=True),
          StructField("name", StringType(), nullable=True),
          StructField("degree", StringType(), nullable=True),
          StructField("desc", StringType(), nullable=True)
    ])), nullable=True),

    StructField("group", StructType([
          StructField("affilition", ArrayType(StringType()), nullable=True),
          StructField("member", StringType(), nullable=True)
    ]), nullable=True),

    StructField("locality", StringType(), nullable=True),
    StructField("skills", ArrayType(StringType()), nullable=True),
    StructField("industry", StringType(), nullable=True),
    StructField("interval", IntegerType(), nullable=True),

    StructField("summary", StringType(), nullable=True),
    StructField("interests", StringType(), nullable=True),
    StructField("overview_html", StringType(), nullable=True),
    StructField("specilities", StringType(), nullable=True),
    StructField("homepage", ArrayType(StringType()), nullable=True),
    StructField("honors", ArrayType(StringType()), nullable=True),
    StructField("url", StringType(), nullable=True),
    StructField("also_view", ArrayType(
      StructType([
          StructField("id", StringType(), nullable=True),
          StructField("url", StringType(), nullable=True)
      ])
    ), nullable=True),


    # TODO: fill in the necessary structure for name
    StructField("name",
                StructType([
                  StructField("family_name", StringType(), nullable=True),
                  StructField("given_name", StringType(), nullable=True),
                ])
           , nullable=True),
    # TODO: fill in the necessary structure for experience
    StructField("experience", ArrayType(
                StructType([
                  StructField("org", StringType(), nullable=True),
                  StructField("title", StringType(), nullable=True),
                  StructField("end", StringType(), nullable=True),
                  StructField("start", StringType(), nullable=True),
                  StructField("desc", StringType(), nullable=True),
                ])
           ), nullable=True),
    # TODO: fill in the necessary structure for events
    StructField("events", ArrayType(
                StructType([
                  StructField("from", StringType(), nullable=True),
                  StructField("to", StringType(), nullable=True),
                  StructField("title1", StringType(), nullable=True),
                  StructField("start", IntegerType(), nullable=True),
                  StructField("title2", StringType(), nullable=True),
                  StructField("end", IntegerType(), nullable=True),
                ])
           ), nullable=True),

])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### 1.2: The Langorous Load

#### 1.2.1: Load LinkedIn Dataset

In the following cell, you will load the `linkedin_small_real.json` dataset from your S3 bucket into a Spark dataframe (sdf) called `linkedin_data_sdf`. If you have constructed `schema` correctly then `spark.read.json()` will read in the dataset. ***You do not need to edit this cell***.

If this doesn't work, go back to the prior cell and update your `schema`!

Note that the cell below will load data even if your schema is incomplete and has left out some columns of the data, so be sure to check that you have included all of the fields from the JSON.

In [None]:
%%spark

linkedin_data_sdf = spark.read.json("s3a://penn-cis545-files/linkedin_small_real.json", schema=schema)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
%%spark
# Let's print out the first few rows to see how the data looks like in tabular form
linkedin_data_sdf.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------+--------------------+--------------------+--------------------+--------------------+--------+------+--------------------+--------------------+--------------------+--------------------+--------------------+
|             _id|           education|               group|            locality|              skills|            industry|interval|             summary|           interests|       overview_html|         specilities|homepage|honors|                 url|           also_view|                name|          experience|              events|
+----------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------+--------------------+--------------------+--------------------+--------------------+--------+------+--------------------+--------------------+--------------------+--------------------+--------

In [None]:
import pandas as pd

The cell below shows how to run SQL commands on Spark tables. **Use this as a template for all your SQL queries in this notebook.**

For almost all the questions you will need to create a temporary view using `createOrReplaceTempView`, then write your sql `query` as a string and then run the query on spark using `spark.sql(query)`. To see what your query resulted use `.show()`.

***You do not need to edit this cell***.

In [None]:
%%spark


# Create SQL-accesible table
linkedin_data_sdf.createOrReplaceTempView("linkedin_data")

# Declare SQL query to be excecuted
query = '''SELECT *
           FROM linkedin_data
           ORDER BY _id
           LIMIT 10'''

# Save the output sdf of spark.sql() as answer_sdf
answer_sdf = spark.sql(query)
answer_sdf.show()


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------+--------------------+--------------------+--------------------+--------------------+--------+------+--------------------+--------------------+--------------------+--------------------+--------------------+
|               _id|           education|               group|            locality|              skills|            industry|interval|             summary|           interests|       overview_html|         specilities|homepage|honors|                 url|           also_view|                name|          experience|              events|
+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------+--------------------+--------------------+--------------------+--------------------+--------+------+--------------------+--------------------+--------------------+--------------------+--

#### 1.2.2: SQL refresher


In the next cell, we want you to create `industry_family_name_df` to fetch the data from the `linkedin_data` table created above, returning rows with schema `(_id, industry, family_name)`. Remove all NULLs from the `family_name` and `industry` columns. Sort the columns by `_id, industry, family_name`, all ascending order.  Limit your sdf to 100 rows.

Hint: To access a field inside a json object in SQL you can use the following syntax: `parent_variable.child_variable`

In [None]:
%%spark

#TODO
linkedin_data_sdf.createOrReplaceTempView("linkedin_data")

industry_family_name_df = '''SELECT _id, industry, name.family_name AS family_name
           FROM linkedin_data
           WHERE industry IS NOT NULL AND name.family_name IS NOT NULL
           ORDER BY _id, industry, name.family_name
           LIMIT 100'''


industry_family_name_df = spark.sql(industry_family_name_df)
industry_family_name_df.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+--------------------+-------------+
|                _id|            industry|  family_name|
+-------------------+--------------------+-------------+
|        in-00000001|     Medical Devices|   Mazalu MBA|
|           in-00001|     Pharmaceuticals|     Forslund|
|           in-00006|            Research|      Douglas|
|   in-000montgomery|Information Techn...|     Kilimann|
| in-000vijaychauhan|Aviation & Aerospace| Chauhan, PMP|
|   in-001adambutler|Marketing and Adv...|         Adam|
|       in-001monica|Nonprofit Organiz...|       Andrus|
|  in-001neilpeacock|Computer & Networ...|      Peacock|
|           in-00666|    Telekomünikasyon|BOLUKBAS, PMP|
|        in-00789123|  Telecommunications|     Hoffmann|
|in-007katarzynagola|Zarządzanie dział...|         Gola|
|           in-00911|     Human Resources|      Smekens|
|in-00danielnewman00|   Computer Software|       Newman|
|   in-00jenmiller00|            Internet|       Miller|
|in-00stevensaunders|          

#### 1.2.3: Load Stock Prices Data

Just the way you created a schema for the LinkedIn dataset, now create a schema for the Stock Prices data. The schema should be relatively simple, compared to the LinkedIn schema. A tiny version of the data is [here](https://docs.google.com/spreadsheets/d/1TStiS-bwkCJR1w5rJ18QPlNe3SIK2Z8QS9gK6ltnjJQ/edit?usp=sharing) in csv format, so you can see what the types should be for the different fields (columns in the csv). Store the `Date` field as a String.


In [None]:
%%spark

from pyspark.sql.types import *

# TODO
stocks_schema = StructType([
    StructField("Date", StringType(), nullable=True),
    StructField("Open", FloatType(), nullable=True),
    StructField("High", FloatType(), nullable=True),
    StructField("Low", FloatType(), nullable=True),
    StructField("Close", FloatType(), nullable=True),
    StructField("Volume", IntegerType(), nullable=True),
    StructField("OpenInt", IntegerType(), nullable=True),
    StructField("org", StringType(), nullable=True),

])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In the following cell, you will load the entire `stocks.csv` dataset from your S3 bucket into a Spark dataframe (sdf) called `stocks_sdf`. If you have constructed `schema` correctly then `spark.read.format("csv")` will read in the dataset. **You do not need to edit this cell.**

If this doesn't work, go back to the prior cell and update your schema!

In [None]:
#Do not edit
%%spark

# Load stock data

stocks_sdf = spark.read.format("csv") \
              .option("header", "true") \
              .schema(stocks_schema) \
              .load("s3a://penn-cis545-files/stocks.csv")

# Creates SQL-accesible table
stocks_sdf.createOrReplaceTempView('stocks')

# Display the first 10 rows
query = '''SELECT *
           FROM stocks'''
answer_stocks_sdf = spark.sql(query)
answer_stocks_sdf.show(10)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+------+------+------+------+--------+-------+---+
|      Date|  Open|  High|   Low| Close|  Volume|OpenInt|org|
+----------+------+------+------+------+--------+-------+---+
|1999-11-18|30.713|33.754|27.002|29.702|66277506|      0|  A|
|1999-11-19|28.986|29.027|26.872|27.257|16142920|      0|  A|
|1999-11-22|27.886|29.702|27.044|29.702| 6970266|      0|  A|
|1999-11-23|28.688|29.446|27.002|27.002| 6332082|      0|  A|
|1999-11-24|27.083|28.309|27.002|27.717| 5132147|      0|  A|
|1999-11-26|27.594|28.012|27.509|27.807| 1832635|      0|  A|
|1999-11-29|27.676| 28.65| 27.38|28.432| 4317826|      0|  A|
|1999-11-30| 28.35|28.986|27.634| 28.48| 4567146|      0|  A|
|1999-12-01| 28.48|29.324|28.273|28.986| 3133746|      0|  A|
|1999-12-02|29.532|30.375|29.155|29.786| 3252997|      0|  A|
+----------+------+------+------+------+--------+-------+---+
only showing top 10 rows

#### 1.2.4: Calculate Percentage Change

In the next cell, we want you to display the percentage change in the daily stock prices for each organization. In order to do so, we will need the data from the `stocks_sdf` table created above. Create a new column called `percentage_change` that uses the opening and closing stock prices for each organization, for each day, and calculates the percentage change in the stock price as follows:

\begin{align}
percentage\_change = \frac{close-open}{open}*100.0
\end{align}

In order to avoid nulls, calculate the percentage change for only for those organizations and days where the **opening price is NOT 0.0**. The percentage_change value is a float.

Your final dataframe should include all columns of the original `stocks_sdf`, as well as the new column you create called `percentage_change`. Sort the rows by **Date and org** ascending, in that order.

In [None]:
%%spark
stocks_sdf.createOrReplaceTempView('stocks')
#ToDo
answer_sdf = '''
SELECT Date,	Open,	High,	Low,	Close,	Volume,	OpenInt,	org, (((Close - Open)*100)/Open) AS percentage_change
FROM stocks
WHERE Open != 0.0
ORDER BY  Date, org
'''
answer_sdf = spark.sql(answer_sdf)
answer_sdf.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+------+------+------+------+-------+-------+---+-------------------+
|      Date|  Open|  High|   Low| Close| Volume|OpenInt|org|  percentage_change|
+----------+------+------+------+------+-------+-------+---+-------------------+
|1962-01-02|0.6277|0.6362|0.6201|0.6201|2575579|      0| GE|-1.2107615398186964|
|1962-01-02| 6.413| 6.413|6.3378|6.3378| 467056|      0|IBM|-1.1726193609712994|
|1962-01-03|0.6201|0.6201|0.6122|0.6201|1764749|      0| GE|                0.0|
|1962-01-03|6.3378|6.3963|6.3378|6.3963| 350294|      0|IBM| 0.9230302761273702|
|1962-01-04|0.6201|0.6201|0.6037|0.6122|2194010|      0| GE|-1.2739879610348466|
|1962-01-04|6.3963|6.3963|6.3295|6.3295| 314365|      0|IBM|-1.0443481752999644|
|1962-01-05|0.6122|0.6122|0.5798|0.5957|3255244|      0| GE|-2.6951969289881075|
|1962-01-05|6.3211|6.3211|6.1958|6.2041| 440112|      0|IBM|-1.8509452254730352|
|1962-01-08|0.5957|0.5957|0.5716|0.5957|3696430|      0| GE|                0.0|
|1962-01-08|6.2041|6.2041|6.

### 1.3: Cleaning LinkedIn Data

#### 1.3.1: Adding Experience

In this part, we are interested in when individuals began working at a particular company.  When you created the schema, you might have noticed that the collection of companies an invidual worked at are contained in the `experience` field as an **array of dictionaries**. The company name is stored in the `org` field, and the start date is in the `start` field. Here is an example of an `experience` field:

```
{
   "experience": [
     {
        "org": "The Walt Disney Company",
        "title" : "Mickey Mouse",
        "end" : "Present",
        "start": "November 1928",
        "desc": "Sailed a boat."
     },
     {
        "org": "Walt Disney World Resort",
        "title": "Mickey Mouse Mascot",
        "start": "January 2005",
        "desc": "Took pictures with kids."
     }
   ]
}
```

Our task is to extract each pair of company and start date from these arrays. This is known as "exploding" a row in Spark. If you think about how we used relational data to model a nested list in a separate table -- that's basically what an explode does to the nested data within `linkedin`.

Create an sdf called `raw_start_dates_sdf` that contains the company and start date for every experience of every individual in `linkedin_data_sdf`. Drop any row that contains a `null` in either column.



In [None]:
%%spark

#ToDo

raw_start_dates_sdf = '''
SELECT exp.org, exp.start AS start_date
FROM (
  SELECT explode(experience) AS exp
  FROM linkedin_data
  WHERE experience IS NOT NULL
) temp
WHERE exp.org IS NOT NULL AND exp.start IS NOT NULL
'''

raw_start_dates_sdf = spark.sql(raw_start_dates_sdf)
raw_start_dates_sdf.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------+
|                 org|    start_date|
+--------------------+--------------+
| Johnson and Johnson| November 2009|
|Albert Einstein M...|September 2008|
| Columbia University|   August 2006|
|Memorial Sloan Ke...|  January 2003|
|Sahlgrenska Unive...| November 2001|
|                UCSF|September 2012|
|Wyss Institute fo...|      May 2009|
|<Online Recruitin...| December 2009|
|<Medical Testing ...| December 2009|
|   000Montgomery.Com|  January 2005|
|<Software Consult...|   August 2000|
|<Software Consult...| December 2008|
|<Temperature Sens...| February 2009|
|<Advertising Comp...|      May 2007|
|<Advertising Comp...|    March 2007|
|<Psychology Organ...|  January 2007|
|<Advertising Comp...| February 2007|
|<National Recruit...| February 2006|
| <Employment Agency>|  October 2005|
|<Biometrics Hardw...|     July 2006|
+--------------------+--------------+
only showing top 20 rows

In [None]:
#################     DO NOT EDIT      ##################
%%spark
raw_start_dates_sdf.createOrReplaceTempView("test_1_3_1")
test_1_3_1_sdf = spark.sql("SELECT * FROM test_1_3_1 ORDER BY org ASC, start_date DESC LIMIT 20")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### 1.3.2: Filtering on Date

There are two issues with the values in our `date` column. First, the values are saved as strings, not datetime types. This halts us from running functions such as `ORDER BY` or `GROUP BY` on common months or years. Second, some values do not have both month and year information or are in other languages. Your task is to filter out and clean the `date` column. We are interested in only those rows that have date in the following format `"(month_name) (year)"`, e.g. "October 2010".

Using `raw_start_dates_sdf`, create an sdf called `filtered_start_dates_sdf` with the `date` column filtered in the manner above. **Keep only those rows with a start date between January 2000 ('2000-01-01') to December 2011 ('2011-12-01'), inclusive**.  Ensure that any dates that are not in our desired format are omitted. Drop any row that contains a `null` in either column. The format of the sdf is shown below:
```
+--------------------------+---------------+
|org                       |start_date     |
+--------------------------+---------------+
|Walt Disney World Resort  |2005-01-01     |
|...                       |...            |
+--------------------------+---------------+
```
_Hint_: Refer to the [function list](https://spark.apache.org/docs/2.3.0/api/sql/index.html) to format the `date` column. In Spark SQL the date format we are interested in is `"MMM y"`.

_Note_: Spark will return the date in the format above, with the day as `01`. This is ok, since we are interested in the month and year each individual began working and all dates will have `01` as their day.

In [None]:
%%spark

#Do not remove the line below
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")

#ToDo
raw_start_dates_sdf.createOrReplaceTempView("raw_start_dates_sdf")
filtered_start_dates_sdf = '''
SELECT org, TO_DATE(start_date, 'MMM y') AS start_date
FROM raw_start_dates_sdf
WHERE start_date IS NOT NULL
      AND start_date RLIKE '^[a-zA-Z]+ [0-9]+$'
      AND TO_DATE(start_date, 'MMM y') >= '2000-01-01'
      AND TO_DATE(start_date, 'MMM y') <= '2011-12-01'

'''
filtered_start_dates_sdf = spark.sql(filtered_start_dates_sdf)
filtered_start_dates_sdf.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+----------+
|                 org|start_date|
+--------------------+----------+
| Johnson and Johnson|2009-11-01|
|Albert Einstein M...|2008-09-01|
| Columbia University|2006-08-01|
|Memorial Sloan Ke...|2003-01-01|
|Sahlgrenska Unive...|2001-11-01|
|Wyss Institute fo...|2009-05-01|
|<Online Recruitin...|2009-12-01|
|<Medical Testing ...|2009-12-01|
|   000Montgomery.Com|2005-01-01|
|<Software Consult...|2000-08-01|
|<Software Consult...|2008-12-01|
|<Temperature Sens...|2009-02-01|
|<Advertising Comp...|2007-05-01|
|<Advertising Comp...|2007-03-01|
|<Psychology Organ...|2007-01-01|
|<Advertising Comp...|2007-02-01|
|<National Recruit...|2006-02-01|
| <Employment Agency>|2005-10-01|
|<Biometrics Hardw...|2006-07-01|
|<Product Manufact...|2001-08-01|
+--------------------+----------+
only showing top 20 rows

In [None]:
#################     DO NOT EDIT      ##################
%%spark
filtered_start_dates_sdf.createOrReplaceTempView("test_1_3_2")
test_1_3_2_sdf = spark.sql("SELECT * FROM ((SELECT org, DATE_FORMAT(start_date, 'yyyy-MM-dd') AS start_date FROM test_1_3_2 ORDER BY start_date DESC, org DESC LIMIT 10) UNION (SELECT org, DATE_FORMAT(start_date, 'yyyy-MM-dd') AS start_date FROM test_1_3_2 ORDER BY start_date ASC, org ASC LIMIT 10)) ORDER BY start_date ASC, org ASC")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### 1.4: Cleaning Stock Data

#### 1.4.1: Adding Company Names

In part 4 of this homework, we have to merge the stocks and linkedin dataframes. This would be difficult to do directly, as the companies in our stock dataset are defined by their stock tickers instead of the full names. Thus, we would not be able to merge it with the `org` field in `hire_train_sdf`. We must convert them to that format. For this purpose, we can create a user-defined function (udf) to achieve the mentioned conversion.

A udf is defined as a normal Python function and then registered to be used as a Spark SQL function. Your task is to create a udf, `TICKER_TO_NAME()` that will convert the ticker field in `raw_stocks` to the company's name. This will be done using the provided `ticker_to_name_dict` dictionary. We are only interested in the companies in that dictionary.

Fill out the function `ticker_to_name()` below. Then use `spark.udf.register()` to register it as a SQL function. The command is provided. ***You do not need to edit it***. Note, we have defined the udf as returning `StringType()`. Ensure that your function returns this. You must also deal with any potential `null` cases.

If the ticker value isn't in the table, set it to a string equal to `None`.

In [None]:
%%spark

# Dictionary linking stock ticker symbols to their names
ticker_to_name_dict = {'NOK': 'Nokia',
                       'UN': 'Unilever',
                       'BP': 'BP',
                       'JNJ': 'Johnson & Johnson',
                       'TCS': 'Tata Consultancy Services',
                       'SLB': 'Schlumberger',
                       'NVS': 'Novartis',
                       'CNY': 'Huawei',
                       'PFE': 'Pfizer',
                       'ACN': 'Accenture',
                       'DELL': 'Dell',
                       'MS': 'Morgan Stanley',
                       'ORCL': 'Oracle',
                       'BAC': 'Bank of America',
                       'PG': 'Procter & Gamble',
                       'CGEMY': 'Capgemini',
                       'GS': 'Goldman Sachs',
                       'C': 'Citi',
                       'IBM': 'IBM',
                       'CS': 'Credit Suisse',
                       'MDLZ': 'Kraft Foods',
                       'WIT': 'Wipro Technologies',
                       'CSCO': 'Cisco Systems',
                       'PWC': 'PwC',
                       'GOOGL': 'Google',
                       'CTSH': 'Cognizant Technology Solutions',
                       'HSBC': 'HSBC',
                       'DB': 'Deutsche Bank',
                       'MSFT': 'Microsoft',
                       'HPE': 'Hewlett-Packard',
                       'ERIC': 'Ericsson',
                       'BCS': 'Barclays Capital',
                       'GSK': 'GlaxoSmithKline'}

# TODO: Fill out [ticker_to_name()] and register it as a udf.


def ticker_to_name(ticker):
    #TODO
  return ticker_to_name_dict.get(ticker)



# Register udf as a SQL function. DO NOT EDIT
spark.udf.register("TICKER_TO_NAME", ticker_to_name, StringType())


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<function ticker_to_name at 0x7f2f0154f8c0>

In [None]:
#################     DO NOT EDIT      ##################
%%spark

ticker_to_name = [((str(ticker_to_name("GOOGL")),str(ticker_to_name("TSLA"))))]
columns = ['A', 'B']
dataframe = spark.createDataFrame(ticker_to_name, columns)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### 1.4.2: Wrangling stocks data

We can now begin to wrangle `stocks_sdf` with our new `TICKER_TO_NAME()` function.

Create an sdf called `filter_1_stocks_sdf` as follows. Convert all the ticker names in `stocks_sdf` to the company names and save it as `org`. Next, convert the `date` field to a datetime type. As explained before this will help order and group the rows in future steps.

Drop any company names that do not appear in `ticker_to_name_dict`. **Keep any date between January 1st 2001 ('2001-01-01') and December 4th 2012 ('2012-12-04') inclusive**, in the format shown below (note this is a datetime object not a string):

```
+----+------------+--------------+
|org |date        |Close         |
+----+------------+--------------+
|IBM |2000-01-03  |...           |
|... |...         |...           |
+----+------------+--------------+
```


In [None]:
%%spark

# TODO
answer_stocks_sdf.createOrReplaceTempView('stocks_sdf ')
filter_1_stocks_sdf = '''
SELECT TICKER_TO_NAME(org) AS org,
       to_date(date, 'yyyy-MM-dd') AS date,
       Close
FROM stocks_sdf
WHERE TICKER_TO_NAME(org) IN ('Nokia', 'Unilever', 'BP', 'Johnson & Johnson', 'Tata Consultancy Services',
                                 'Schlumberger', 'Novartis', 'Huawei', 'Pfizer', 'Accenture', 'Dell', 'Morgan Stanley',
                                 'Oracle', 'Bank of America', 'Procter & Gamble', 'Capgemini', 'Goldman Sachs', 'Citi',
                                 'IBM', 'Credit Suisse', 'Kraft Foods', 'Wipro Technologies', 'Cisco Systems', 'PwC',
                                 'Google', 'Cognizant Technology Solutions', 'HSBC', 'Deutsche Bank', 'Microsoft',
                                 'Hewlett-Packard', 'Ericsson', 'Barclays Capital', 'GlaxoSmithKline')

AND date >= '2001-01-01'
AND date <= '2012-12-04'

ORDER BY org, date
'''
filter_1_stocks_sdf = spark.sql(filter_1_stocks_sdf)
filter_1_stocks_sdf.show()


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+----------+------+
|      org|      date| Close|
+---------+----------+------+
|Accenture|2005-02-25|21.363|
|Accenture|2005-02-28|21.704|
|Accenture|2005-03-01|21.735|
|Accenture|2005-03-02|21.593|
|Accenture|2005-03-03|21.329|
|Accenture|2005-03-04|21.549|
|Accenture|2005-03-07|21.651|
|Accenture|2005-03-08|21.509|
|Accenture|2005-03-09|21.379|
|Accenture|2005-03-10|21.194|
|Accenture|2005-03-11|20.777|
|Accenture|2005-03-14|20.794|
|Accenture|2005-03-15| 20.37|
|Accenture|2005-03-16| 20.41|
|Accenture|2005-03-17|20.344|
|Accenture|2005-03-18|20.098|
|Accenture|2005-03-21|20.395|
|Accenture|2005-03-22|20.623|
|Accenture|2005-03-23|20.641|
|Accenture|2005-03-24|20.726|
+---------+----------+------+
only showing top 20 rows

In [None]:
#################     DO NOT EDIT      ##################
%%spark
filter_1_stocks_sdf.createOrReplaceTempView("test_1_4_2")
test_1_4_2_sdf = spark.sql("SELECT * FROM ((SELECT org, DATE_FORMAT(date, 'yyyy-MM-dd') as date, Close FROM test_1_4_2 ORDER BY date DESC, org DESC LIMIT 10) UNION (SELECT org, DATE_FORMAT(date, 'yyyy-MM-dd') as date, Close FROM test_1_4_2 ORDER BY date ASC, org ASC LIMIT 10)) ORDER BY date ASC, org DESC")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Step 2: Analysis on LinkedIn Data

### 2.1: Counting Employees

Now we would like to find for each company, the number of individuals who started in the same month and year. Use `filtered_start_dates_sdf` and create a new sdf called `start_dates_sdf` which will contain the total number of employees who began working at the same company on the same start date (name the new column as `num_employees`). The format of the sdf is shown below:

```
+--------------------------+---------------+---------------+
|org                       |start_date     |num_employees  |
+--------------------------+---------------+---------------+
|Walt Disney World Resort  |2005-01-01     |1              |
|...                       |...            |...            |
+--------------------------+---------------+---------------+
```

In [None]:
%%spark

# TODO:
filtered_start_dates_sdf.createOrReplaceTempView("filtered_start_dates_sdf")
start_dates_sdf = '''
SELECT org, start_date, COUNT(*) as num_employees
FROM filtered_start_dates_sdf
GROUP BY org, start_date
ORDER BY start_date
'''
start_dates_sdf = spark.sql(start_dates_sdf)
start_dates_sdf.show()



FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+----------+-------------+
|                 org|start_date|num_employees|
+--------------------+----------+-------------+
|  MMA - Formerly WMA|2000-01-01|            1|
|   AMI Creative Ltd.|2000-01-01|            1|
|Technology Furnit...|2000-01-01|            1|
|             Tectura|2000-01-01|            1|
|       Atento Brasil|2000-01-01|            1|
|         Ana Galleta|2000-01-01|            1|
|Pioneer Computers...|2000-01-01|            1|
|       Elbit Systems|2000-01-01|            1|
|ExxonMobil - Cora...|2000-01-01|            1|
|            Citibank|2000-01-01|            2|
|       Cisco Systems|2000-01-01|            2|
|Quest Search Asso...|2000-01-01|            1|
|Catharina Ziekenh...|2000-01-01|            1|
|            Comforce|2000-01-01|            1|
|Wcenter Internet ...|2000-01-01|            1|
|          Sema Group|2000-01-01|            1|
|   Micron Technology|2000-01-01|            1|
|Professional Conn...|2000-01-01|       

In [None]:
#################     DO NOT EDIT      ##################
%%spark

start_dates_sdf.createOrReplaceTempView("test_2_1")
test_2_1_sdf = spark.sql("SELECT org, DATE_FORMAT(start_date, 'yyyy-MM-dd') as start_date, num_employees FROM test_2_1 ORDER BY num_employees DESC, org DESC, start_date ASC LIMIT 10")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### 2.2: Reshape DataFrame

Our next step is to use `start_dates_sdf` and create a new sdf called `raw_hire_train_sdf` that has for a single company and a single year, the number of hires in Jan through Dec, as well as the total number of hires that year (name it `total_num`). Note that for each company you will have several rows corresponding to years between 2000 and 2011. It is alright if for a given company you don't have a given year. However, ensure that for a given company and given year, each month column has an entry, i.e. if no one was hired the value should be `0`.  

_Note_: We will use the first three letters of each month in naming, i.e. `jan, feb, mar, apr, may, jun, jul, aug, sep, oct, nov, dec`.

The format of the `raw_hire_train_sdf` is shown below:

```
+----+-----+----------+---------+----------+----------+
|org |year |jan_hired |   ...   |dec_hired |total_num |
+----+-----+----------+---------+----------+----------+
|IBM |2008 |...       |   ...   |...       |...       |
|IBM |2009 |...       |   ...   |...       |...       |
|... |...  |...       |   ...   |...       |...       |
+----+-----+----------+---------+----------+----------+
```


In [None]:
%%spark

# TODO:
start_dates_sdf.createOrReplaceTempView("start_dates_sdf")

raw_hire_train_sdf = '''
    SELECT org, YEAR(start_date) AS year,
           SUM(CASE WHEN MONTH(start_date) = 1 THEN num_employees ELSE 0 END) AS jan_hired,
           SUM(CASE WHEN MONTH(start_date) = 2 THEN num_employees ELSE 0 END) AS feb_hired,
           SUM(CASE WHEN MONTH(start_date) = 3 THEN num_employees ELSE 0 END) AS mar_hired,
           SUM(CASE WHEN MONTH(start_date) = 4 THEN num_employees ELSE 0 END) AS apr_hired,
           SUM(CASE WHEN MONTH(start_date) = 5 THEN num_employees ELSE 0 END) AS may_hired,
           SUM(CASE WHEN MONTH(start_date) = 6 THEN num_employees ELSE 0 END) AS jun_hired,
           SUM(CASE WHEN MONTH(start_date) = 7 THEN num_employees ELSE 0 END) AS jul_hired,
           SUM(CASE WHEN MONTH(start_date) = 8 THEN num_employees ELSE 0 END) AS aug_hired,
           SUM(CASE WHEN MONTH(start_date) = 9 THEN num_employees ELSE 0 END) AS sep_hired,
           SUM(CASE WHEN MONTH(start_date) = 10 THEN num_employees ELSE 0 END) AS oct_hired,
           SUM(CASE WHEN MONTH(start_date) = 11 THEN num_employees ELSE 0 END) AS nov_hired,
           SUM(CASE WHEN MONTH(start_date) = 12 THEN num_employees ELSE 0 END) AS dec_hired,
           SUM(num_employees) AS total_num
    FROM start_dates_sdf
    WHERE YEAR(start_date)>=2000 AND YEAR(start_date)<=2011
    GROUP BY org, YEAR(start_date)

'''
raw_hire_train_sdf = spark.sql(raw_hire_train_sdf)
raw_hire_train_sdf.show()


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+----+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+
|                 org|year|jan_hired|feb_hired|mar_hired|apr_hired|may_hired|jun_hired|jul_hired|aug_hired|sep_hired|oct_hired|nov_hired|dec_hired|total_num|
+--------------------+----+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+
|        Sondages BBM|2007|        0|        0|        0|        0|        0|        0|        0|        0|        1|        0|        0|        0|        1|
|      Active Website|2001|        1|        0|        0|        0|        0|        0|        0|        0|        0|        0|        0|        0|        1|
|   NigeriansTalk.Org|2009|        0|        0|        1|        0|        0|        0|        0|        0|        0|        0|        0|        0|        1|
|           Citigroup|2005|        2|        0|     

In [None]:
#################     DO NOT EDIT      ##################
%%spark
raw_hire_train_sdf.createOrReplaceTempView("test_2_2")
test_2_2_sdf = spark.sql("SELECT * FROM test_2_2 ORDER BY total_num DESC, org DESC, year ASC LIMIT 20")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### 2.3: Filtering on Company Size

Create an sdf called `hire_train_sdf` that contains all the observations in `raw_hire_train_sdf` with `total_num` greater than or equal to 20.

In [None]:
%%spark

# TODO
# Keep all rows where total_num >= 20
raw_hire_train_sdf.createOrReplaceTempView("raw_hire_train_sdf")

hire_train_sdf = '''
    SELECT *
    FROM raw_hire_train_sdf
    WHERE total_num >= 20
'''

hire_train_sdf = spark.sql(hire_train_sdf)
hire_train_sdf.show()


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+----+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+
|                 org|year|jan_hired|feb_hired|mar_hired|apr_hired|may_hired|jun_hired|jul_hired|aug_hired|sep_hired|oct_hired|nov_hired|dec_hired|total_num|
+--------------------+----+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+
|      JPMorgan Chase|2011|        3|        1|        1|        0|        1|        7|        6|        1|        1|        2|        1|        4|       28|
|     Thomson Reuters|2009|        2|        2|        3|        3|        4|        2|        3|        3|        2|        2|        2|        1|       29|
|            Deloitte|2008|        6|        4|        1|        3|        1|        9|        4|        3|       11|        4|        2|        0|       48|
|                KPMG|2011|        2|        4|     

In [None]:
#################     DO NOT EDIT      ##################
%%spark
hire_train_sdf.createOrReplaceTempView("test_2_3")
test_2_3_sdf = spark.sql("SELECT * FROM test_2_3 ORDER BY org ASC, year ASC LIMIT 10")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Step 3: Analyzing Stock Data

### 3.1: Average Closing Price

The data in `filter_1_stocks_sdf` gives closing prices on a daily basis. Since we are interested in monthly trends, we will only keep the **average of the closing price of each month per year for each org**.

Create an sdf `filter_2_stocks_sdf` that contains only the average of closing prices for each month-year pair sorted by the org alphabetically and then month, year from earliest to latest with the closing price rounded off to 3 decimal places. The format of the sdf is shown below:

```
+----+------------+--------------+--------------+
|org |month       |year          |close         |
+----+------------+--------------+--------------+
|IBM |01          |2000          |...           |
|... |...         |...           |...           |
+----+------------+--------------+--------------+
```

In [None]:
%%spark
filter_1_stocks_sdf.createOrReplaceTempView("filter_1_stocks_sdf")
# TODO

filter_2_stocks_sdf = '''
SELECT org,
       DATE_FORMAT(date, 'MM') AS month,
       DATE_FORMAT(date, 'yyyy') AS year,
       ROUND(AVG(Close), 3) AS close
FROM filter_1_stocks_sdf
GROUP BY org, year, month
ORDER BY org ASC, year ASC, month ASC

'''

filter_2_stocks_sdf = spark.sql(filter_2_stocks_sdf)
filter_2_stocks_sdf.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+-----+----+------+
|      org|month|year| close|
+---------+-----+----+------+
|Accenture|   02|2005|21.534|
|Accenture|   03|2005|20.904|
|Accenture|   04|2005|19.467|
|Accenture|   05|2005|18.977|
|Accenture|   06|2005|19.892|
|Accenture|   07|2005|20.492|
|Accenture|   08|2005|21.238|
|Accenture|   09|2005|21.629|
|Accenture|   10|2005|21.897|
|Accenture|   11|2005|23.048|
|Accenture|   12|2005|24.367|
|Accenture|   01|2006|26.098|
|Accenture|   02|2006|27.378|
|Accenture|   03|2006|26.787|
|Accenture|   04|2006|25.187|
|Accenture|   05|2006|24.343|
|Accenture|   06|2006|22.946|
|Accenture|   07|2006|24.066|
|Accenture|   08|2006|24.245|
|Accenture|   09|2006|24.995|
+---------+-----+----+------+
only showing top 20 rows

### 3.2: Reshape DataFrame Again!

Now, we will begin to shape our dataframe into the format of the final training sdf.

Create an sdf `filter_3_stocks_sdf` that has for a single company and a single year, the average stock price for each month in that year. This is similar to the table you created in Step 3.1. If the data is not avaliable, drop any rows containing any `null` values, in any column. The format of the sdf is shown below:

```
+----+-----+----------+---------+----------+
|org |year |jan_stock |   ...   |dec_stock |
+----+-----+----------+---------+----------+
|IBM |2008 |...       |   ...   |...       |
|IBM |2009 |...       |   ...   |...       |
|... |...  |...       |   ...   |...       |
+----+-----+----------+---------+----------+
```


In [None]:
%%spark

#ToDo
filter_2_stocks_sdf.createOrReplaceTempView("filter_2_stocks_sdf")

filter_3_stocks_sdf = '''
    SELECT org, year,
           AVG(CASE WHEN month = 01 THEN close ELSE NULL END) AS jan_stock,
           AVG(CASE WHEN month = 02 THEN close ELSE NULL END) AS feb_stock,
           AVG(CASE WHEN month = 03 THEN close ELSE NULL END) AS mar_stock,
           AVG(CASE WHEN month = 04 THEN close ELSE NULL END) AS apr_stock,
           AVG(CASE WHEN month = 05 THEN close ELSE NULL END) AS may_stock,
           AVG(CASE WHEN month = 06 THEN close ELSE NULL END) AS jun_stock,
           AVG(CASE WHEN month = 07 THEN close ELSE NULL END) AS jul_stock,
           AVG(CASE WHEN month = 08 THEN close ELSE NULL END) AS aug_stock,
           AVG(CASE WHEN month = 09 THEN close ELSE NULL END) AS sep_stock,
           AVG(CASE WHEN month = 10 THEN close ELSE NULL END) AS oct_stock,
           AVG(CASE WHEN month = 11 THEN close ELSE NULL END) AS nov_stock,
           AVG(CASE WHEN month = 12 THEN close ELSE NULL END) AS dec_stock
    FROM filter_2_stocks_sdf
    WHERE year >=2000 AND year <=2011
    GROUP BY org, year
    HAVING jan_stock IS NOT NULL

    '''
filter_3_stocks_sdf = spark.sql(filter_3_stocks_sdf)
filter_3_stocks_sdf.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------------+----+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+
|               org|year|jan_stock|feb_stock|mar_stock|apr_stock|may_stock|jun_stock|jul_stock|aug_stock|sep_stock|oct_stock|nov_stock|dec_stock|
+------------------+----+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+
|                BP|2006|   47.607|   46.719|   46.802|   50.167|   50.168|   46.519|    48.36|   47.883|   45.212|   45.593|   46.271|   46.442|
|     Cisco Systems|2010|   19.825|    19.68|   21.432|   22.183|   20.523|   18.871|   18.859|   18.484|   17.656|   18.896|   17.566|   16.249|
|          Ericsson|2010|     8.67|    8.797|    9.312|    9.747|    9.132|    9.446|    9.913|    9.282|      9.5|    9.616|    9.291|     9.73|
|             Nokia|2011|    8.152|    7.593|    6.451|    6.737|     6.72|    4.973|    4.706|    4.543|    4.712|    5.035

In [None]:
%%spark

#ToDo
filter_2_stocks_sdf.createOrReplaceTempView("filter_2_stocks_sdf")

filter_3_stocks_sdf = '''
    SELECT org, year,
           AVG(CASE WHEN month = 01 THEN close ELSE NULL END) AS jan_stock,
           AVG(CASE WHEN month = 02 THEN close ELSE NULL END) AS feb_stock,
           AVG(CASE WHEN month = 03 THEN close ELSE NULL END) AS mar_stock,
           AVG(CASE WHEN month = 04 THEN close ELSE NULL END) AS apr_stock,
           AVG(CASE WHEN month = 05 THEN close ELSE NULL END) AS may_stock,
           AVG(CASE WHEN month = 06 THEN close ELSE NULL END) AS jun_stock,
           AVG(CASE WHEN month = 07 THEN close ELSE NULL END) AS jul_stock,
           AVG(CASE WHEN month = 08 THEN close ELSE NULL END) AS aug_stock,
           AVG(CASE WHEN month = 09 THEN close ELSE NULL END) AS sep_stock,
           AVG(CASE WHEN month = 10 THEN close ELSE NULL END) AS oct_stock,
           AVG(CASE WHEN month = 11 THEN close ELSE NULL END) AS nov_stock,
           AVG(CASE WHEN month = 12 THEN close ELSE NULL END) AS dec_stock
    FROM filter_2_stocks_sdf
    GROUP BY org, year
    HAVING COUNT(DISTINCT CASE WHEN month BETWEEN 1 AND 12 THEN month END) =12
          AND COUNT(close) = COUNT(DISTINCT month)
    ORDER BY org, year

    '''
filter_3_stocks_sdf = spark.sql(filter_3_stocks_sdf)
filter_3_stocks_sdf.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------+----+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+
|            org|year|jan_stock|feb_stock|mar_stock|apr_stock|may_stock|jun_stock|jul_stock|aug_stock|sep_stock|oct_stock|nov_stock|dec_stock|
+---------------+----+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+
|      Accenture|2006|   26.098|   27.378|   26.787|   25.187|   24.343|   22.946|   24.066|   24.245|   24.995|   27.073|   28.732|   29.898|
|      Accenture|2007|   31.454|   32.581|   30.713|   32.814|   33.465|   35.021|   36.012|   34.198|   33.552|   33.995|    30.32|   30.331|
|      Accenture|2008|   28.881|   29.558|   29.349|   31.939|   32.483|   33.654|   34.099|   34.725|     32.2|   26.978|   25.106|   25.652|
|      Accenture|2009|   27.857|   26.419|   24.927|   23.716|   25.384|   26.733|   28.908|   30.164|   30.327|    32.45|   33.678|   35.515|

### 3.3: Direction of Change

The final element in our training set is the binary output for each case, i.e. the `y` label.

Create an sdf `stocks_train_sdf` from `filter_3_stocks_sdf` with an additional column `direction`. This should be the direction of percentage change in the closing stock price, i.e. `1` for positive or `-1` for negative, from the first month of a given year to the last month of the given year. Make this an **integer**.  The year begins in January and ends in December, inclusive. The format of the sdf is shown below:

```
+----+-----+----------+---------+----------+-------------+
|org |year |jan_stock |   ...   |dec_stock |direction    |
+----+-----+----------+---------+----------+-------------+
|IBM |2008 |...       |   ...   |...       |1            |
|IBM |2009 |...       |   ...   |...       |-1           |
|... |...  |...       |   ...   |...       |...          |
+----+-----+----------+---------+----------+-------------+
```

In [None]:
%%spark

#ToDo
filter_3_stocks_sdf.createOrReplaceTempView("filter_3_stocks_sdf")
stock_query = '''
SELECT *, CAST((CASE WHEN (dec_stock - jan_stock) >0 THEN 1 ELSE -1 END) AS INTEGER) AS direction
FROM filter_3_stocks_sdf
'''

stocks_train_sdf = spark.sql(stock_query)
stocks_train_sdf.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------+----+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+
|            org|year|jan_stock|feb_stock|mar_stock|apr_stock|may_stock|jun_stock|jul_stock|aug_stock|sep_stock|oct_stock|nov_stock|dec_stock|direction|
+---------------+----+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+
|      Accenture|2006|   26.098|   27.378|   26.787|   25.187|   24.343|   22.946|   24.066|   24.245|   24.995|   27.073|   28.732|   29.898|        1|
|      Accenture|2007|   31.454|   32.581|   30.713|   32.814|   33.465|   35.021|   36.012|   34.198|   33.552|   33.995|    30.32|   30.331|       -1|
|      Accenture|2008|   28.881|   29.558|   29.349|   31.939|   32.483|   33.654|   34.099|   34.725|     32.2|   26.978|   25.106|   25.652|       -1|
|      Accenture|2009|   27.857|   26.419|   24.927|   23.716|   25.384|   26.733|

## Step 4: Combining LinkedIn and Stocks Data

### 4.1: The CRAZY Combination

Now that we have individually created the two halves of our training data we will merge them together to create the combined final training sdf.

Create an sdf called `training_sdf` in the format of the one shown at the beginning of Step 3. Note that in our definition for the `stock_result` column, the `stock_result` value for a particular year corresponds to the direction of the stock percentage change in the **following** year. For example, the stock_result in the `2008` row for `IBM` will contain the direction of IBM's stock in the year 2009. For the final training dataframe, we only need the entries for the companies where both hiring and stock data are available for the particular year.
The format of the sdf is shown below:
```
+----+-----+----------+---------+----------+----------+---------+----------+-------------+
|org |year |jan_hired |   ...   |dec_hired |jan_stock |   ...   |dec_stock |stock_result |
+----+-----+----------+---------+----------+----------+---------+----------+-------------+
|IBM |2008 |...       |   ...   |...       |...       |   ...   |...       |-1           |
|IBM |2009 |...       |   ...   |...       |...       |   ...   |...       |1            |
|... |...  |...       |   ...   |...       |...       |   ...   |...       |...          |
+----+-----+----------+---------+----------+----------+---------+----------+-------------+
```

In [None]:
%%spark
hire_train_sdf.createOrReplaceTempView("hire_train_sdf")
stocks_train_sdf.createOrReplaceTempView("stocks_train_sdf")
training_sdf = '''
SELECT s.org, s.year,h.jan_hired,h.feb_hired,h.mar_hired,h.apr_hired,h.may_hired,h.jun_hired, h.jul_hired, h.aug_hired, h.sep_hired, h.oct_hired, h.nov_hired,h.dec_hired, s.jan_stock, s.feb_stock, s.mar_stock, s.apr_stock, s.may_stock, s.jun_stock, s.jul_stock, s.aug_stock, s.sep_stock, s.oct_stock, s.nov_stock, s.dec_stock, l.direction AS  stock_result
FROM hire_train_sdf AS h
JOIN stocks_train_sdf AS s
ON h.org = s.org AND h.year = s.year
JOIN stocks_train_sdf AS l
ON h.org = l.org AND h.year = l.year - 1
ORDER BY s.org, s.year
'''
training_sdf = spark.sql(training_sdf)
training_sdf.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------------+----+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+------------+
|             org|year|jan_hired|feb_hired|mar_hired|apr_hired|may_hired|jun_hired|jul_hired|aug_hired|sep_hired|oct_hired|nov_hired|dec_hired|jan_stock|feb_stock|mar_stock|apr_stock|may_stock|jun_stock|jul_stock|aug_stock|sep_stock|oct_stock|nov_stock|dec_stock|stock_result|
+----------------+----+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+------------+
|       Accenture|2006|        7|        5|        3|        3|        4|       11|        7|        6|       10|        9|        5|        3|   26.098|   27.378|   26.

## Step 5: Predicting Stock Performance Using LinkedIn and Stocks Data



Now Lets use the dataset generated to predict the direction of stock price movements based on historical hiring data and stock performance metrics. By leveraging the combined insights from employment trends and past stock performance, the model aims to help investors, financial analysts, and companies make informed decisions regarding stock investments.


**Data Preparation:**

Data Loading: On AWS, data was loaded from S3 buckets directly into Spark DataFrames. Locally, data was loaded from local storage or through mock data setups.

Feature Engineering: Utilized VectorAssembler to integrate various hiring and stock metrics into a unified feature vector for each data entry.
Feature Scaling: Applied StandardScaler to normalize the features to ensure equitable model training without dominance by any single feature due to its scale.

**Model Development:**
Model Choices: Developed models using Logistic Regression, Random Forest, and Gradient Boosted Trees to evaluate different approaches.

Pipeline Configuration: Each model was set up within a Spark ML pipeline that included steps for assembling features, scaling, and model training.

**Model Training and Evaluation:**

Data Splitting: The dataset was divided into training (70%) and test (30%) sets to evaluate model effectiveness.

Training and Predictions: Models were trained on the training set and evaluated on the test set.

Accuracy Assessment: Model performance was assessed using accuracy metrics to determine the efficacy in predicting stock direction.

**Visualization:**

Implementation: Employed Matplotlib for visualizing actual vs. predicted results, providing clear visual insights into each model's predictive accuracy. On AWS, plots were either saved to S3 or displayed via Jupyter notebooks hosted on EMR. Locally, plots were rendered directly within the development environment.

In [None]:
from pyspark.sql import SparkSession

# Initialize a Spark session
spark = SparkSession.builder.appName("StockPricePrediction").getOrCreate()


In [None]:
# Assuming 'training_sdf' is your DataFrame loaded with the required data
training_sdf.show()


In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import lag, col

# Define a window specification
windowSpec = Window.partitionBy("org").orderBy("year", "month")

# Generate lagged features for both stock prices and hiring data
for i in range(1, 4):  # Creating lags for the past three months
    training_sdf = training_sdf.withColumn(f'stock_price_lag_{i}', lag("stock_price", i).over(windowSpec))
    training_sdf = training_sdf.withColumn(f'hired_lag_{i}', lag("hired", i).over(windowSpec))

# Remove rows with any null values that might have been introduced by lagging
training_sdf = training_sdf.dropna()
training_sdf.show()


In [None]:
from pyspark.ml.feature import VectorAssembler

# List of input columns for the assembler
input_features = [f'stock_price_lag_{i}' for i in range(1, 4)] + [f'hired_lag_{i}' for i in range(1, 4)]

# Assemble features into a single vector
assembler = VectorAssembler(inputCols=input_features, outputCol="features")


In [None]:
# Split data into training (80%) and test (20%) sets
train_data, test_data = training_sdf.randomSplit([0.8, 0.2], seed=42)


In [None]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator

# Initialize the RandomForestRegressor with regularization
rf = RandomForestRegressor(featuresCol='features', labelCol='stock_price', maxBins=32)

# Create a Pipeline
pipeline = Pipeline(stages=[assembler, rf])

# Define a parameter grid
paramGrid = ParamGridBuilder() \
    .addGrid(rf.maxDepth, [5, 10, 20]) \
    .addGrid(rf.numTrees, [20, 50, 100]) \
    .build()

# Define the evaluator
evaluator = RegressionEvaluator(labelCol="stock_price", predictionCol="prediction", metricName="rmse")

# Set up the cross-validator
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=5)  # Use 5-fold cross-validation

# Run cross-validations
cvModel = crossval.fit(train_data)

# Fetch the best model
bestModel = cvModel.bestModel

# Make predictions on the test data
predictions = bestModel.transform(test_data)
predictions.select("prediction", "stock_price").show()


In [None]:
# Calculate RMSE for the best model
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE) on test data = {rmse}")
