<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#BigData---Final-Project" data-toc-modified-id="BigData---Final-Project-1"><span class="toc-item-num">1&nbsp;&nbsp;</span>BigData - Final Project</a></span><ul class="toc-item"><li><span><a href="#Loading-The-Data" data-toc-modified-id="Loading-The-Data-1.1"><span class="toc-item-num">1.1&nbsp;&nbsp;</span>Loading The Data</a></span></li><li><span><a href="#Exploring-The-Data" data-toc-modified-id="Exploring-The-Data-1.2"><span class="toc-item-num">1.2&nbsp;&nbsp;</span>Exploring The Data</a></span></li></ul></li></ul></div>

# BigData - Final Project

__AUTHORS__:
  - Théo Perinet (22172 - theo.perinet)
  - Mathieu Rivier (23553 - mathieu.rivier)
  - Marc Monteil (23742 - marc.monteil)

###### To Use when you are on google collab
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop2.7.tgz  
!tar xf spark-3.2.1-bin-hadoop2.7.tgz
!pip install -q findspark

###### TO USE WHEN YOU ARE ON GOOGLE COLLAB
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop2.7"
import findspark
findspark.init()

from google.colab import drive
drive.mount('/content/drive')

## Loading The Data

In [1]:
from pyspark.sql import SparkSession
import pandas as pd

In [2]:
spark_application_name = "WannaFlop_Project"

In [3]:
spark = (SparkSession.builder.appName(spark_application_name).getOrCreate())

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/05/20 08:11:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Exploring The Data

In [4]:
from pyspark.sql.functions import col,isnan,when,count
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.types import DoubleType, IntegerType, StringType, DateType, StructType,StructField
from pyspark.sql.functions import desc
import pyspark.sql.functions as func

In [41]:
class read_info(object):
    def __init__(self, file_path, header=False, delimiter=';', schema=None):
        self.file_path = file_path
        self.header = header
        self.delimiter = delimiter
        self.schema = schema

        self.df = self._load_df()

        #self.df_abstract = self._get_df_abstract()

    def __repr__(self):
        return f"{self._nb_rows()} \n{self.df.printSchema()} \n{self.get_df_abstract()}\n {self.show_missing()}\n{self._get_stats()}"

    def show_missing(self):
        print("Missing Data per column:")
        self._count_missing().show()

    def _get_num_cols(self):
        num_cols = [
            f.name for f in self.df.schema.fields
            if isinstance(f.dataType, DoubleType) or
            isinstance(f.dataType, IntegerType)
        ]
        
        return num_cols
    def _get_rounded_df(self):
        rounded_df = self.df
        dbl_cols = self._get_num_cols()
        for col in dbl_cols:
            rounded_df = rounded_df.withColumn(col, func.round('high'))

        return rounded_df

    def get_df_abstract(self):
        rounded_df = self._get_rounded_df()

        # First 40 rows
        print("First 40 rows:")
        rounded_df.show(40)

        # Last 40 rows
        print("Last 40 rows:")
        rounded_df = rounded_df.withColumn(
            "index", monotonically_increasing_id()
        )
        rounded_df.orderBy(desc("index")).drop("index").show(40)

    def _get_periodicity(self):
        self.df['data'][0]

    def _nb_rows(self):
        # Number of total rows
        print("Number of rows: " + str(self.df.count()) + "\n")

    def _handle_csv(self):
        '''
        @description: Read the csv file and return a Spark DataFrame

        @arg csv_file_path: Path to the csv file
        @arg header: boolean whether to load a header or not
        @arg delimiter: which delimiter to use by default
        '''
        return spark.read.option("inferSchema", "true").option("nullValue", "null").csv(
            self.file_path,
            sep=self.delimiter,
            schema=self.schema,
            header=self.header,
        )
    
    def _handle_json(self):
        return spark.read.json(self.file_path)

    def _load_df(self):
        ####### ADD TRY CATCH #####
        extension = self.file_path.split(".")[-1]

        df = None
        if extension == 'json':
            df = self._handle_json()
        elif extension == 'csv':
            df = self._handle_csv()

        return df

    def _count_missing(self):
        return self.df.select(
            [
                count(when(isnan(c) | col(c).isNull(), c)).alias(c)
                for c in self.df.columns
            ]
        )
        #.show()
        
    def _get_stats(self):
        self.df.summary().show()

In [42]:
amzn_schema = StructType([
    StructField('Date', DateType(), True),
    StructField('High', DoubleType(), True),
    StructField('Low', DoubleType(), True),
    StructField('Open', DoubleType(), True),
    StructField('Close', DoubleType(), True),
    StructField('Volume', IntegerType(), True),
    StructField('Adj Close', DoubleType(), True),
    StructField('company_name', StringType(), True)
])

In [43]:
AMZN = read_info('stocks_data/AMAZON.csv', header=True, delimiter=',', schema=amzn_schema)

In [44]:
print(AMZN)

Number of rows: 987

root
 |-- Date: date (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Open: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)
 |-- company_name: string (nullable = true)

First 40 rows:
+----------+-----+-----+-----+-----+------+---------+------------+
|      Date| High|  Low| Open|Close|Volume|Adj Close|company_name|
+----------+-----+-----+-----+-----+------+---------+------------+
|2017-01-03|759.0|759.0|759.0|759.0| 759.0|    759.0|      AMAZON|
|2017-01-04|760.0|760.0|760.0|760.0| 760.0|    760.0|      AMAZON|
|2017-01-05|782.0|782.0|782.0|782.0| 782.0|    782.0|      AMAZON|
|2017-01-06|799.0|799.0|799.0|799.0| 799.0|    799.0|      AMAZON|
|2017-01-09|802.0|802.0|802.0|802.0| 802.0|    802.0|      AMAZON|
|2017-01-10|798.0|798.0|798.0|798.0| 798.0|    798.0|      AMAZON|
|2017-01-11|800.0|800.0|800.0|800.0| 800.0|    800.0|

AnalysisException: cannot resolve 'isnan(Date)' due to data type mismatch: argument 1 requires (double or float) type, however, 'Date' is of date type.;
'Aggregate [count(CASE WHEN (isnan(Date#7310) OR isnull(Date#7310)) THEN Date END) AS Date#7657, count(CASE WHEN (isnan(High#7311) OR isnull(High#7311)) THEN High END) AS High#7659L, count(CASE WHEN (isnan(Low#7312) OR isnull(Low#7312)) THEN Low END) AS Low#7661L, count(CASE WHEN (isnan(Open#7313) OR isnull(Open#7313)) THEN Open END) AS Open#7663L, count(CASE WHEN (isnan(Close#7314) OR isnull(Close#7314)) THEN Close END) AS Close#7665L, count(CASE WHEN (isnan(cast(Volume#7315 as double)) OR isnull(Volume#7315)) THEN Volume END) AS Volume#7667L, count(CASE WHEN (isnan(Adj Close#7316) OR isnull(Adj Close#7316)) THEN Adj Close END) AS Adj Close#7669L, count(CASE WHEN (isnan(cast(company_name#7317 as double)) OR isnull(company_name#7317)) THEN company_name END) AS company_name#7671L]
+- Relation [Date#7310,High#7311,Low#7312,Open#7313,Close#7314,Volume#7315,Adj Close#7316,company_name#7317] csv


In [None]:
##### A FAIRE !!!! UN SCHEMA !!!!!

In [9]:
AMZN.get_df_abstract()

First 40 rows:
+----------+-----+-----+-----+-----+------+---------+------------+
|      Date| High|  Low| Open|Close|Volume|Adj Close|company_name|
+----------+-----+-----+-----+-----+------+---------+------------+
|2017-01-03|759.0|759.0|759.0|759.0| 759.0|    759.0|      AMAZON|
|2017-01-04|760.0|760.0|760.0|760.0| 760.0|    760.0|      AMAZON|
|2017-01-05|782.0|782.0|782.0|782.0| 782.0|    782.0|      AMAZON|
|2017-01-06|799.0|799.0|799.0|799.0| 799.0|    799.0|      AMAZON|
|2017-01-09|802.0|802.0|802.0|802.0| 802.0|    802.0|      AMAZON|
|2017-01-10|798.0|798.0|798.0|798.0| 798.0|    798.0|      AMAZON|
|2017-01-11|800.0|800.0|800.0|800.0| 800.0|    800.0|      AMAZON|
|2017-01-12|814.0|814.0|814.0|814.0| 814.0|    814.0|      AMAZON|
|2017-01-13|822.0|822.0|822.0|822.0| 822.0|    822.0|      AMAZON|
|2017-01-17|816.0|816.0|816.0|816.0| 816.0|    816.0|      AMAZON|
|2017-01-18|812.0|812.0|812.0|812.0| 812.0|    812.0|      AMAZON|
|2017-01-19|814.0|814.0|814.0|814.0| 814.0|    

In [45]:
AMZN.show_missing()

Missing Data per column:


AnalysisException: cannot resolve 'isnan(Date)' due to data type mismatch: argument 1 requires (double or float) type, however, 'Date' is of date type.;
'Aggregate [count(CASE WHEN (isnan(Date#7310) OR isnull(Date#7310)) THEN Date END) AS Date#7673, count(CASE WHEN (isnan(High#7311) OR isnull(High#7311)) THEN High END) AS High#7675L, count(CASE WHEN (isnan(Low#7312) OR isnull(Low#7312)) THEN Low END) AS Low#7677L, count(CASE WHEN (isnan(Open#7313) OR isnull(Open#7313)) THEN Open END) AS Open#7679L, count(CASE WHEN (isnan(Close#7314) OR isnull(Close#7314)) THEN Close END) AS Close#7681L, count(CASE WHEN (isnan(cast(Volume#7315 as double)) OR isnull(Volume#7315)) THEN Volume END) AS Volume#7683L, count(CASE WHEN (isnan(Adj Close#7316) OR isnull(Adj Close#7316)) THEN Adj Close END) AS Adj Close#7685L, count(CASE WHEN (isnan(cast(company_name#7317 as double)) OR isnull(company_name#7317)) THEN company_name END) AS company_name#7687L]
+- Relation [Date#7310,High#7311,Low#7312,Open#7313,Close#7314,Volume#7315,Adj Close#7316,company_name#7317] csv


In [46]:
AMZN._get_stats()

+-------+------------------+------------------+-----------------+------------------+-----------------+------------------+------------+
|summary|              High|               Low|             Open|             Close|           Volume|         Adj Close|company_name|
+-------+------------------+------------------+-----------------+------------------+-----------------+------------------+------------+
|  count|               987|               987|              987|               987|              987|               987|         987|
|   mean|1762.0071216958152|1722.1011452099956|1743.433881363487|1742.9566644206718| 4509728.05775076|1742.9566644206718|        null|
| stddev| 667.2385315752688| 644.7988093382758|657.1153070927137| 655.9576061129322|2179817.628631287| 655.9576061129322|        null|
|    min|  758.760009765625| 747.7000122070312|757.9199829101562| 753.6699829101562|           881300| 753.6699829101562|      AMAZON|
|    25%|            1191.0|            1176.0|1188.300

In [11]:
AMZN.df.withColumn("test", 
              func.datediff(AMZN.df["date"][0], AMZN.df["date"][1])).show()

AnalysisException: Can't extract value from date#0: need struct type but got date

In [12]:
AMZN.df["Date"].getItem(2)

Column<'Date[2]'>

In [13]:
AMZN.df.first()['Date']

datetime.date(2017, 1, 3)

In [14]:
AMZN.df.__get_item(0)

AttributeError: 'DataFrame' object has no attribute '__get_item'

In [15]:
AMZN.df.second()['Date']

AttributeError: 'DataFrame' object has no attribute 'second'

In [17]:
func.getrows(AMZN.df, rownums=[0, 2]).collect()

AttributeError: module 'pyspark.sql.functions' has no attribute 'getrows'

In [18]:
AMZN.df[0].__getitem__("Date").first()

TypeError: 'Column' object is not callable

In [19]:
AMZN.df[0]

Column<'Date'>

In [None]:
AMZN.df.select('Date').show()

In [20]:
AMZN.df


DataFrame[Date: date, High: double, Low: double, Open: double, Close: double, Volume: int, Adj Close: double, company_name: string]

In [21]:
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import functions as F
from pyspark.sql.window import Window



my_window = Window.partitionBy().orderBy("Date")

df = AMZN.df.withColumn("prev_value", F.lag(AMZN.df.Date).over(my_window))
df = df.withColumn("diff", F.when(F.isnull(F.datediff(df.Date, df.prev_value)), 0)
                              .otherwise(F.datediff(df.Date, df.prev_value)))

In [22]:
df.select("diff").show()

22/05/20 08:12:18 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/05/20 08:12:18 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/05/20 08:12:18 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+----+
|diff|
+----+
|   0|
|   1|
|   1|
|   1|
|   3|
|   1|
|   1|
|   1|
|   1|
|   4|
|   1|
|   1|
|   1|
|   3|
|   1|
|   1|
|   1|
|   1|
|   3|
|   1|
+----+
only showing top 20 rows



In [23]:
from pyspark.sql.functions import mean

In [24]:
df.select(mean('diff')).first()[0]

22/05/20 08:12:22 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/05/20 08:12:22 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


1.447821681864235

In [48]:
AMZN.df.stat.corr('High', 'Low')

0.999196080434689

TODO: Create function to compute per month week year

In [27]:
AMZN.df.select(mean ("Close")).first()[0]

1742.9566644206718

In [28]:
def get_col_mean(df, col):
    return df.select(mean (col)).first()[0]

In [29]:
get_col_mean(AMZN.df, "Close")

1742.9566644206718

In [30]:
AMZN.df.groupBy(func.weekofyear("day").alias("date_by_week")).agg(sum("Date"))

TypeError: unsupported operand type(s) for +: 'int' and 'str'

In [31]:
AMZN.df.groupBy(func.weekofyear("day").alias("date_by_week")).agg(sum("Close")).orderBy("date_by_week").show()

TypeError: unsupported operand type(s) for +: 'int' and 'str'

In [32]:
AMZN.df.withColumn("Date",func.date_sub(func.next_day(col("Date"),"sunday"),7)).groupBy("Date").agg(sum("Close").cast("int").alias("Close_total")).orderBy("week_strt_day").show()


TypeError: unsupported operand type(s) for +: 'int' and 'str'

In [33]:
AMZN.df.groupBy("Date").select("Close").show()

AttributeError: 'GroupedData' object has no attribute 'select'

In [34]:
AMZN.df.groupBy(func.month("Date").alias("hour")).agg(mean("Close").alias("close_mean")).show()

+----+------------------+
|hour|        close_mean|
+----+------------------+
|  12| 1563.564484627016|
|   1|1417.4578305899379|
|   6|1788.4167071174172|
|   3|1486.8781597970546|
|   5|1689.0211671341297|
|   9|1988.6682525634765|
|   4| 1634.024632151534|
|   8|1951.5419085213307|
|   7|1977.4292940027574|
|  10| 1937.555771891276|
|  11|1907.2337825123857|
|   2|1492.8052617123253|
+----+------------------+



In [35]:
AMZN.df.groupBy(func.year("Date").alias("hour")).agg(mean("Close").alias("close_mean")).show()

+----+------------------+
|hour|        close_mean|
+----+------------------+
|2018|1641.7261758629545|
|2019| 1789.189206077939|
|2020| 2636.649604240712|
|2017| 968.1670116409363|
+----+------------------+



In [36]:
def 

In [37]:
get_avg(AMZN.df, "Close", func.year)

+--------+------------------+
|new_time|        Close_mean|
+--------+------------------+
|    2018|1641.7261758629545|
|    2019| 1789.189206077939|
|    2020| 2636.649604240712|
|    2017| 968.1670116409363|
+--------+------------------+



In [38]:
get_avg(AMZN.df, "Open", func.year)

+--------+------------------+
|new_time|         Open_mean|
+--------+------------------+
|    2018|1644.0727091633466|
|    2019|1788.7461896623884|
|    2020|2636.5054538710433|
|    2017|  968.275618959708|
+--------+------------------+



In [123]:
class Exploration(object):
    def __init__(self, df):
        self.df = df

    def get_oc_avg(self, fun):
        close = self._compute_avg(self.df, "Close", fun)
        opening = self._compute_avg(self.df, "Open", fun)

        return close.join(
            opening, opening.Open_new_time == close.Close_new_time, "inner"
        ).orderBy("Close_new_time").select(
            close.Close_new_time, close.Close_mean, opening.Open_mean
        )

    def _compute_avg(self, df, col, fun):
        return df.groupBy(fun("Date").alias(col + "_new_time")).agg(
            mean(col).alias(col + "_mean")
        )

In [124]:
exAMZN = Exploration(AMZN.df)

In [126]:
exAMZN.get_oc_avg(func.month).show()

+--------------+------------------+------------------+
|Close_new_time|        Close_mean|         Open_mean|
+--------------+------------------+------------------+
|             1|1417.4578305899379|1413.4312052899097|
|             2|1492.8052617123253|1493.1706575092517|
|             3|1486.8781597970546|1484.0394181876347|
|             4| 1634.024632151534|1632.4401230230565|
|             5|1689.0211671341297|1687.3904660247092|
|             6|1788.4167071174172|1788.4576480641085|
|             7|1977.4292940027574|1977.4004746380974|
|             8|1951.5419085213307|1950.0660695279582|
|             9|1988.6682525634765| 1996.003623199463|
|            10| 1937.555771891276| 1943.571671549479|
|            11|1907.2337825123857|1907.4091514029153|
|            12| 1563.564484627016|1568.1172583795362|
+--------------+------------------+------------------+



In [127]:
exAMZN.get_oc_avg(func.year).show()

+--------------+------------------+------------------+
|Close_new_time|        Close_mean|         Open_mean|
+--------------+------------------+------------------+
|          2017| 968.1670116409363|  968.275618959708|
|          2018|1641.7261758629545|1644.0727091633466|
|          2019| 1789.189206077939|1788.7461896623884|
|          2020| 2636.649604240712|2636.5054538710433|
+--------------+------------------+------------------+



In [148]:
def get_price_change(period=None):
    df = AMZN.df
    if period:
        df= exAMZN.get_oc_avg(period)
   
    return  df.withColumn('diff', ( df['Close_mean'] - df['Open_mean'] ))

In [149]:
get_price_change(func.month).show()

+--------------+------------------+------------------+--------------------+
|Close_new_time|        Close_mean|         Open_mean|                diff|
+--------------+------------------+------------------+--------------------+
|             1|1417.4578305899379|1413.4312052899097|   4.026625300028172|
|             2|1492.8052617123253|1493.1706575092517|-0.36539579692635016|
|             3|1486.8781597970546|1484.0394181876347|  2.8387416094199125|
|             4| 1634.024632151534|1632.4401230230565|  1.5845091284775208|
|             5|1689.0211671341297|1687.3904660247092|  1.6307011094204427|
|             6|1788.4167071174172|1788.4576480641085|-0.04094094669130...|
|             7|1977.4292940027574|1977.4004746380974|0.028819364659966595|
|             8|1951.5419085213307|1950.0660695279582|   1.475838993372463|
|             9|1988.6682525634765| 1996.003623199463| -7.3353706359864645|
|            10| 1937.555771891276| 1943.571671549479|  -6.015899658203125|
|           

In [150]:
get_price_change(func.year).show()

+--------------+------------------+------------------+--------------------+
|Close_new_time|        Close_mean|         Open_mean|                diff|
+--------------+------------------+------------------+--------------------+
|          2017| 968.1670116409363|  968.275618959708|-0.10860731877176022|
|          2018|1641.7261758629545|1644.0727091633466| -2.3465333003921387|
|          2019| 1789.189206077939|1788.7461896623884|  0.4430164155505736|
|          2020| 2636.649604240712|2636.5054538710433|  0.1441503696687505|
+--------------+------------------+------------------+--------------------+

