<a href="https://colab.research.google.com/github/limas-lab/stock_prices_study/blob/master/stock_prices_study.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Installing PySpark library

In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 34 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 51.1 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=81fa83fff214803ac1a44d18dc831fd3033eeea36010eaed24a4a0ecc10a0523
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


## Importing the libraries

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

from google.colab import drive
drive.mount('/content/drive')

sc = SparkContext('local')
spark = SparkSession(sc)

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


## Data engineering part

### Run the parameters py

In [None]:
%run '/content/drive/MyDrive/Colab Notebooks/parameters.ipynb'

### Variable assignation

In [None]:
path_transient = get_parameters('stock_prices_transient')

In [None]:
df = spark.read.csv(path_transient,header=True,inferSchema=True,sep=",")

In [None]:
df.show(10)

+------+----------+-------+-------+-------+-------+--------+
|symbol|      date|   open|   high|    low|  close|  volume|
+------+----------+-------+-------+-------+-------+--------+
|   AAL|2014-01-02|  25.07|  25.82|  25.06|  25.36| 8998943|
|  AAPL|2014-01-02|79.3828|79.5756|78.8601|79.0185|58791957|
|   AAP|2014-01-02| 110.36| 111.88| 109.29| 109.74|  542711|
|  ABBV|2014-01-02|  52.12|  52.33|  51.52|  51.98| 4569061|
|   ABC|2014-01-02|  70.11|  70.23|  69.48|  69.89| 1148391|
|   ABT|2014-01-02|  38.09|   38.4|   38.0|  38.23| 4967472|
|   ACN|2014-01-02|   81.5|  81.92|  81.09|  81.13| 2405384|
|  ADBE|2014-01-02|  59.06|  59.53|  58.94|  59.29| 2746370|
|   ADI|2014-01-02|  49.52|  49.75|  49.04|  49.28| 2799092|
|   ADM|2014-01-02|  43.22|  43.29|  42.79|  42.99| 2753765|
+------+----------+-------+-------+-------+-------+--------+
only showing top 10 rows



In [None]:
df.printSchema()

root
 |-- symbol: string (nullable = true)
 |-- date: string (nullable = true)
 |-- open: double (nullable = true)
 |-- high: double (nullable = true)
 |-- low: double (nullable = true)
 |-- close: double (nullable = true)
 |-- volume: integer (nullable = true)



In [None]:
string_cols = ['symbol']

date_cols = ['date']

double_cols = ['open','high','low','close']

int_cols = ['volume']

In [None]:
for colname in string_cols:
  df = df.withColumn(colname,col(colname).cast(StringType()))

for colname in date_cols:
  df = df.withColumn(colname,col(colname).cast(TimestampType()))

for colname in double_cols:
  df = df.withColumn(colname,col(colname).cast(DoubleType()))

for colname in int_cols:
  df = df.withColumn(colname,col(colname).cast(IntegerType()))

In [None]:
df.printSchema()

root
 |-- symbol: string (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- open: double (nullable = true)
 |-- high: double (nullable = true)
 |-- low: double (nullable = true)
 |-- close: double (nullable = true)
 |-- volume: integer (nullable = true)



In [None]:
df.show(10)

+------+-------------------+-------+-------+-------+-------+--------+
|symbol|               date|   open|   high|    low|  close|  volume|
+------+-------------------+-------+-------+-------+-------+--------+
|   AAL|2014-01-02 00:00:00|  25.07|  25.82|  25.06|  25.36| 8998943|
|  AAPL|2014-01-02 00:00:00|79.3828|79.5756|78.8601|79.0185|58791957|
|   AAP|2014-01-02 00:00:00| 110.36| 111.88| 109.29| 109.74|  542711|
|  ABBV|2014-01-02 00:00:00|  52.12|  52.33|  51.52|  51.98| 4569061|
|   ABC|2014-01-02 00:00:00|  70.11|  70.23|  69.48|  69.89| 1148391|
|   ABT|2014-01-02 00:00:00|  38.09|   38.4|   38.0|  38.23| 4967472|
|   ACN|2014-01-02 00:00:00|   81.5|  81.92|  81.09|  81.13| 2405384|
|  ADBE|2014-01-02 00:00:00|  59.06|  59.53|  58.94|  59.29| 2746370|
|   ADI|2014-01-02 00:00:00|  49.52|  49.75|  49.04|  49.28| 2799092|
|   ADM|2014-01-02 00:00:00|  43.22|  43.29|  42.79|  42.99| 2753765|
+------+-------------------+-------+-------+-------+-------+--------+
only showing top 10 

In [None]:
path_raw = get_parameters('stock_prices_raw')
df.write.mode('overwrite').parquet(path_raw)

## Data Analysis part

In [None]:
path_raw = get_parameters('stock_prices_raw')
df_2 = spark.read.parquet(path_raw)

In [None]:
df_2.show(10)

+------+-------------------+-------+-------+-------+-------+--------+
|symbol|               date|   open|   high|    low|  close|  volume|
+------+-------------------+-------+-------+-------+-------+--------+
|   AAL|2014-01-02 00:00:00|  25.07|  25.82|  25.06|  25.36| 8998943|
|  AAPL|2014-01-02 00:00:00|79.3828|79.5756|78.8601|79.0185|58791957|
|   AAP|2014-01-02 00:00:00| 110.36| 111.88| 109.29| 109.74|  542711|
|  ABBV|2014-01-02 00:00:00|  52.12|  52.33|  51.52|  51.98| 4569061|
|   ABC|2014-01-02 00:00:00|  70.11|  70.23|  69.48|  69.89| 1148391|
|   ABT|2014-01-02 00:00:00|  38.09|   38.4|   38.0|  38.23| 4967472|
|   ACN|2014-01-02 00:00:00|   81.5|  81.92|  81.09|  81.13| 2405384|
|  ADBE|2014-01-02 00:00:00|  59.06|  59.53|  58.94|  59.29| 2746370|
|   ADI|2014-01-02 00:00:00|  49.52|  49.75|  49.04|  49.28| 2799092|
|   ADM|2014-01-02 00:00:00|  43.22|  43.29|  42.79|  42.99| 2753765|
+------+-------------------+-------+-------+-------+-------+--------+
only showing top 10 