# EDGAR project - PySpark edition 

This notebook contains all of the code and Windows command prompt/Linux terminal for deploying this project on a Hadoop server. 

**N.B.** `Please read **ALL CELLS**`. There are commands to be run from the Windows command prompt or Linux terminal

---

# The non-Python bit 

## 1. Connect to your server hosting Hadoop

I am using an AWS EMR Linux server and used Windows command prompt and `ssh` to connect to it remotely.

From **Windows command prompt**: <br>
> `ssh <pem file location> <username>@<server_address>` <br>

e.g 
`ssh -i ~/.ssh/datalab_emr3.pem hadoop@ec2-12-345-678-900.eu-west-2.compute.amazonaws.com`      

--- 

## 2. Download raw files

To follow along, download the raw files from the GitHub directly into Linux storage using the following commands:

From **Linux terminal**: <br>
> Make sure in home directory: <br>
> `cd` <br><br>
> For **10-k reports** of S&P 100 from last 12 years - download and unzip: <br>
> `wget https://gitlab.com/pardeep7/edgar_public_v2/-/archive/master/edgar_public_v2-master.zip?path=10k_filings_raw -O 10k_raw_full.zip`<br>
>> **N.B.** This is exactly what's in thr GitHub repo, but zipped up. I couldn't get it to upload to GitHub properly, hence why it says GitLab. Promise this is the only time. <br>

> `mkdir raw_full` <br>
> `unzip 10k_raw_full.zip -d raw_full` <br><br>
> For Loughran-McDonald **sentiment word list**: <br>
> `wget https://raw.githubusercontent.com/pardeep729/edgar_pyspark/main/sentiment_words.json -O sentiment_words.json`

--- 

## 3. Move raw files into  HDFS

Putting all of the files we just downloaded to linux storage into HDFS, so that Spark can use them.

From **Linux terminal**: <br>
> Make directories in HDFS: <br>
> `hdfs dfs -mkdir /edgar` <br>
> `hdfs dfs -mkdir /edgar/raw_full` <br><br>
> Move files from within our linux directory into our HDFS directory <br>
> `hdfs dfs -put raw_full/edgar_public_v2-master-10k_filings_raw/10k_filings_raw/* /edgar/raw_full` <br>
> `hdfs dfs -put sentiment_words.json /edgar` <br>

---

## 4. Make directory in HDFS for results 

Once our Spark program produces results, we will write them to this directory in HDFS

From **Linux terminal**: <br>
> `hdfs dfs -mkdir /edgar/sentiment_counts` <br>

## 5. Install extra python libraries

Our program needs **pandas** and **bs4**. If they are not installed, this will install them. If they are installed, this will tell you.

From **Linux terminal**: <br>
> `sudo python3 -m pip install pandas` <br>
> `sudo python3 -m pip install beautifulsoup4` <br>

---

#  The Python bit

##  6. Run this .ipynb script from within PySpark

You have a couple options:
1. Load this .ipynb file into **jupyter lab** installed on your Hadoop server. Then run from within there <br><br>
2. Use **PySpark CLI** - copy and paste each **cell** into it individually and hit enter each time (**what I did**) <br><br>
From **Linux terminal**: <br>
> `pyspark` <br>

### Import libraries 

In [None]:
import re
from bs4 import BeautifulSoup
from time import time
from datetime import timedelta

from pyspark.sql.functions import input_file_name
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.sql.functions import lower, col, split, explode, count

### Load raw data from HDFS into Spark DataFrames 

Whatever data you're using, always get them into Spark DataFrames wherever possible. They are the most optimised thing to use in Spark, no matter how trivially small or excessively large your data is.

In [None]:
# Raw 10-k reports
df = spark.read.text('/edgar/raw_full/*.html', wholetext=True).withColumn('raw file name', input_file_name())

# Sentiment words - into a list of dfs (for joining later)
sentiment_words = spark.read.json('/edgar/sentiment_words.json').toPandas().transpose().to_dict()[0]
dfs = [spark.createDataFrame(data = [(i, ) for i in sentiment_words[k]], schema=[k]) for k in sentiment_words.keys()]

### Setup transformations on `df`

These essentially lay out the plan of action. But does **not** perform the actions, hence why they run relatively quickly.

#### Cleaning

We will register a user-defined function (`udf`) that encapsulates the cleaning logic.

In [None]:
# Python function
def clean_line(raw_html_text):
    '''
    A function takes raw html text and outputs a clean version
    '''
    soup = BeautifulSoup(raw_html_text.strip(),"html.parser")
    html_text = soup.text
    cleaned_html_text = re.sub(r'[^a-zA-Z0-9 ]','', html_text)
    return cleaned_html_text

In [None]:
# Register as Spark UDF
clean_line_UDF = udf(clean_line, StringType())
spark.udf.register("clean_line_UDF", clean_line, StringType())

In [None]:
# Apply UDF to df, returning a "cleaned" df
df_cleaned = df.withColumn('cleaned', clean_line_UDF(df['value'])).drop('value')

#### Sentiment extraction 

We will be using **joins** - a surprisingly efficient way of achieving this.

In [None]:
# Split each document (row) into a list of words (for explosion later)
df_with_split = df_cleaned.select('*', split(lower(col('cleaned')), ' ').alias('cleaned_split'))

# Explode this table based on these lists (should get millions of rows)
df_exploded = df_with_split.withColumn('exploded', explode('cleaned_split'))

# Join each sentiment word list to the exploded dataframe
dfs_joined = [ df_exploded.join(dfs[i], df_exploded['exploded'] == dfs[i][dfs[i].columns[0]], 'inner') for i in range(len(dfs))]
 
# Count number of matches for each document and sentiment
dfs_counts = [dfs_joined[i].groupBy('raw file name').agg(count('*').alias(dfs_joined[i].columns[-1])) for i in range(len(dfs_joined))]
 
# Join all sentiment counts into one dataframe
df_counts = df_with_split.select('raw file name')
for sentiment in dfs_counts:
    df_counts = df_counts.join(sentiment, 'raw file name', 'left')

In [None]:
# Replace all NULLS with 0
df_counts = df_counts.na.fill(0)

### Setup timer 

We can track how long the execution actually takes <br>
***N.B.*** *You could use `%time`, but was found to not work sometimes. This method (below) always works.*

In [6]:
class T():
    def __enter__(self):
        self.start = time()
    def __exit__(self, type, value, traceback):
        self.end = time()
        elapsed = self.end - self.start
        print(str(timedelta(seconds=elapsed)))

### Peform action

An action will actually run the execution plan, so is the beefy part of this.

In [None]:
with T():
    df_counts.write.option("header","true").option("sep",",").mode("overwrite").csv('/edgar/sentiment_counts/')

### (Optional) Check results 

I like to import my results into a Spark DataFrame, just to eyeball them.

In [None]:
test = spark.read.option("header","true").csv('/edgar/sentiment_counts')
test.show(20, truncate=False)
test.count() 

---

## (Optional) non-Python bit - Getting results out of HDFS

It's likely you want access your results file, so this is how you move stuff from HDFS to Linux storage.

From **Linux terminal**: <br>
> Merge files from HDFS results directory and output to HDFS: <br>
> `hdfs dfs -cat /edgar/sentiment_counts/* | hdfs dfs -put - /edgar/sentiment_counts_merged/` <br><br>
>> **N.B.** Likely our results are many files in the HDFS results directory (due to how Spark uses partitions). But we can combine them into a single file called `-` (default name, cannot change) <br>

> Get file from HDFS into Linux (present working directory) <br>
> `hdfs dfs -get /edgar/sentiment_counts_merged/- .` <br><br>
> Rename it <br>
> `cp - final.csv`<br><br>
> Remove old file <br>
> `rm -` <br>


---

## (Optional) non-Python bit - Getting results out of Linux

Assuming you're on a Windows machine, use `scp` to get the results file onto your local system

From **Windows command prompt**: <br>
> ```scp <pem file location> <username>@<server_address>:/home/<username>/edgar/final.csv "<Windows folder path>" ```<br>

e.g  
    `ssh -i ~/.ssh/datalab_emr3.pem hadoop@ec2-12-345-678-900.eu-west-2.compute.amazonaws.com:/home/hadoop/edgar/final.csv .\results\`      


--- 