# Pre-Preprocessing Data using Spark

In [None]:
!wget -q https://archive.apache.org/dist/spark/spark-3.4.1/spark-3.4.1-bin-hadoop3.tgz -O spark.tgz
!tar -xvzf spark.tgz
!pip install -q findspark

spark-3.4.1-bin-hadoop3/
spark-3.4.1-bin-hadoop3/R/
spark-3.4.1-bin-hadoop3/R/lib/
spark-3.4.1-bin-hadoop3/R/lib/sparkr.zip
spark-3.4.1-bin-hadoop3/R/lib/SparkR/
spark-3.4.1-bin-hadoop3/R/lib/SparkR/html/
spark-3.4.1-bin-hadoop3/R/lib/SparkR/html/R.css
spark-3.4.1-bin-hadoop3/R/lib/SparkR/html/00Index.html
spark-3.4.1-bin-hadoop3/R/lib/SparkR/INDEX
spark-3.4.1-bin-hadoop3/R/lib/SparkR/help/
spark-3.4.1-bin-hadoop3/R/lib/SparkR/help/aliases.rds
spark-3.4.1-bin-hadoop3/R/lib/SparkR/help/AnIndex
spark-3.4.1-bin-hadoop3/R/lib/SparkR/help/SparkR.rdx
spark-3.4.1-bin-hadoop3/R/lib/SparkR/help/SparkR.rdb
spark-3.4.1-bin-hadoop3/R/lib/SparkR/help/paths.rds
spark-3.4.1-bin-hadoop3/R/lib/SparkR/worker/
spark-3.4.1-bin-hadoop3/R/lib/SparkR/worker/worker.R
spark-3.4.1-bin-hadoop3/R/lib/SparkR/worker/daemon.R
spark-3.4.1-bin-hadoop3/R/lib/SparkR/tests/
spark-3.4.1-bin-hadoop3/R/lib/SparkR/tests/testthat/
spark-3.4.1-bin-hadoop3/R/lib/SparkR/tests/testthat/test_basic.R
spark-3.4.1-bin-hadoop3/R/lib/S

### Starting spark environment

In [None]:
import os
import findspark

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.4.1-bin-hadoop3"

findspark.init()

### Mounting data to Drive

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


## Loading data in and verifying it looks correct

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("DataFiltering").getOrCreate()

weather = spark.read.csv(
    "/content/drive/MyDrive/405 Final Project/noaa-daily-weather-data@public.csv",
    header=True,
    inferSchema=True,
    sep=";"
)
nasdaq_df = spark.read.csv("/content/drive/MyDrive/405 Final Project/WIKI_PRICES_212b326a081eacca455e13140d7bb9db.csv", header=True, inferSchema=True)

### Filtering for US records only

In [None]:
weather_us = weather.filter(weather["COUNTRY_CODE"] == "US")

### Filtering for records between 2016 and 2017

*   List item
*   List item



In [None]:
from pyspark.sql.functions import col

weather_us_filtered = weather_us.filter(
    (col("DATE") >= "2017-01-01") & (col("DATE") <= "2017-12-31")
)

weather_us_filtered.show(5)

+-----------+----------+-----+----+----+----+---------+-------------+----------------+------------+
|   GHCN_DIN|      DATE| PRCP|SNOW|TMAX|TMIN|ELEVATION|         NAME|           COORD|COUNTRY_CODE|
+-----------+----------+-----+----+----+----+---------+-------------+----------------+------------+
|US10adam001|2017-05-16|218.0|null|null|null|    598.0|JUNIATA 1.5 S|40.568, -98.5069|          US|
|US10adam001|2017-05-17|259.0|null|null|null|    598.0|JUNIATA 1.5 S|40.568, -98.5069|          US|
|US10adam001|2017-06-14|183.0|null|null|null|    598.0|JUNIATA 1.5 S|40.568, -98.5069|          US|
|US10adam001|2017-08-09| 25.0|null|null|null|    598.0|JUNIATA 1.5 S|40.568, -98.5069|          US|
|US10adam001|2017-08-16|503.0|null|null|null|    598.0|JUNIATA 1.5 S|40.568, -98.5069|          US|
+-----------+----------+-----+----+----+----+---------+-------------+----------------+------------+
only showing top 5 rows



### Sorting by date

In [None]:
weather_us_sorted = weather_us_filtered.orderBy(col("DATE").asc())

### Writing filtered data to Parquet file

In [None]:
weather_us_sorted.coalesce(1).write.parquet("/content/weather_data.parquet", mode="overwrite")

# Working with Nasdaq data

### Filtering Nasdaq data for 2016-2017

In [None]:
from pyspark.sql.functions import col

nasdaq_filtered = nasdaq_df.filter(
    (col("date") >= "2017-01-01") & (col("date") <= "2017-12-31")
)

nasdaq_filtered.show(5)


+------+----------+-----+------+-------+-----+---------+-----------+-----------+---------------+---------------+---------------+---------------+----------+
|ticker|      date| open|  high|    low|close|   volume|ex-dividend|split_ratio|       adj_open|       adj_high|        adj_low|      adj_close|adj_volume|
+------+----------+-----+------+-------+-----+---------+-----------+-----------+---------------+---------------+---------------+---------------+----------+
|     A|2017-01-03|45.93| 46.75|  45.74|46.49|1739726.0|        0.0|        1.0|45.620161936279|46.434630318333|45.431443652632|46.176384245975| 1739726.0|
|     A|2017-01-04|46.93| 47.38|46.8162| 47.1|1821264.0|        0.0|        1.0|46.613416060735|47.060380416741|46.500383741372|46.782269261893| 1821264.0|
|     A|2017-01-05|47.05| 47.07| 46.355|46.54|1503763.0|        0.0|        1.0| 46.73260655567|46.752471638159|46.042294939173|46.226046952197| 1503763.0|
|     A|2017-01-06|46.63| 48.07|  46.56|47.99|2883483.0|        

### Sorting by date

In [None]:
nasdaq_sorted = nasdaq_filtered.orderBy(col("date").asc())
nasdaq_sorted.show(5)


+------+----------+------+------+------+------+-----------+-----------+-----------+---------------+---------------+---------------+---------------+-----------+
|ticker|      date|  open|  high|   low| close|     volume|ex-dividend|split_ratio|       adj_open|       adj_high|        adj_low|      adj_close| adj_volume|
+------+----------+------+------+------+------+-----------+-----------+-----------+---------------+---------------+---------------+---------------+-----------+
|   ASH|2017-01-03| 110.4|110.43|108.15|109.12|   502956.0|        0.0|        1.0|109.26970921137|109.29940206714|107.04274502907|108.00281403211|   502956.0|
|  TSCO|2017-01-03|  76.9| 77.25| 75.73|  75.9|  1253422.0|        0.0|        1.0|75.595973459544|75.940038358255|74.445813655283|  74.6129308918|  1253422.0|
|   WMT|2017-01-03| 69.24| 69.24| 68.05| 68.66|1.0473162E7|        0.0|        1.0|67.859652686086|67.859652686086|66.693376159563|67.291215387444|1.0473162E7|
|  TSLA|2017-01-03|214.86|220.33|210.96|

### Saving filtered Nasdaq data to parquet file

In [None]:
nasdaq_sorted.coalesce(1).write.parquet("/content/nasdaq_data.parquet", mode="overwrite")