- Youenn Loïe - Matthieu Schlienger - Nikoloz Chaduneli
- 1234567890 - Les Harengs

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType,  DateType, FloatType

In [2]:
spark = SparkSession.builder\
        .master("local")\
        .appName("myApp")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

In [45]:
import pathlib

def create_data_frame_with_csv(path):
    schema = StructType([
        StructField("Date", DateType()),
        StructField("High", FloatType()),
        StructField("Low", FloatType()),
        StructField("Open", FloatType()),
        StructField("Close", FloatType()),
        StructField("Volume", FloatType()),
        StructField("Adj", FloatType()),
        StructField("Company", StringType())
    ])
    
    df_with_schema = spark.read.csv(str(path), header=True, schema=schema)
    return df_with_schema

def create_dict_data_frame(dir_path):
    dfs = {}
    for path in pathlib.Path(dir_path).iterdir():
        if path.is_file():
            key = str(path).split('/')[1].split('.')[0]
            df = create_data_frame_with_csv(path)
            dfs[key] = df

    return dfs

In [46]:
dfs = create_dict_data_frame("stocks_data")
dfs

{'FACEBOOK': DataFrame[Date: date, High: float, Low: float, Open: float, Close: float, Volume: float, Adj: float, Company: string],
 'MICROSOFT': DataFrame[Date: date, High: float, Low: float, Open: float, Close: float, Volume: float, Adj: float, Company: string],
 'APPLE': DataFrame[Date: date, High: float, Low: float, Open: float, Close: float, Volume: float, Adj: float, Company: string],
 'GOOGLE': DataFrame[Date: date, High: float, Low: float, Open: float, Close: float, Volume: float, Adj: float, Company: string],
 'AMAZON': DataFrame[Date: date, High: float, Low: float, Open: float, Close: float, Volume: float, Adj: float, Company: string],
 'TESLA': DataFrame[Date: date, High: float, Low: float, Open: float, Close: float, Volume: float, Adj: float, Company: string],
 'ZOOM': DataFrame[Date: date, High: float, Low: float, Open: float, Close: float, Volume: float, Adj: float, Company: string]}

In [47]:
dfs['GOOGLE'].show()

+----------+-------+-------+------+-------+---------+-------+-------+
|      Date|   High|    Low|  Open|  Close|   Volume|    Adj|Company|
+----------+-------+-------+------+-------+---------+-------+-------+
|2017-01-03| 789.63|  775.8|778.81| 786.14|1657300.0| 786.14| GOOGLE|
|2017-01-04| 791.34| 783.16|788.36|  786.9|1073000.0|  786.9| GOOGLE|
|2017-01-05| 794.48| 785.02|786.08| 794.02|1335200.0| 794.02| GOOGLE|
|2017-01-06|  807.9|792.204|795.26| 806.15|1640200.0| 806.15| GOOGLE|
|2017-01-09|809.966| 802.83| 806.4| 806.65|1274600.0| 806.65| GOOGLE|
|2017-01-10| 809.13| 803.51|807.86| 804.79|1176800.0| 804.79| GOOGLE|
|2017-01-11| 808.15| 801.37| 805.0| 807.91|1065900.0| 807.91| GOOGLE|
|2017-01-12| 807.39| 799.17|807.14| 806.36|1353100.0| 806.36| GOOGLE|
|2017-01-13|811.224| 806.69|807.48| 807.88|1099200.0| 807.88| GOOGLE|
|2017-01-17| 807.14| 800.37|807.08| 804.61|1362100.0| 804.61| GOOGLE|
|2017-01-18|806.205| 800.99|805.81| 806.07|1294400.0| 806.07| GOOGLE|
|2017-01-19| 809.48|

- **Show the first and last 40 rows of each stock price**

In [73]:
def print_first_last_rows(df):
    print(df.take(40))
    print(df.tail(40))

for key in dfs:
    print(key)
    print_first_last_rows(dfs[key])

FACEBOOK
[Row(Date=datetime.date(2017, 1, 3), High=117.83999633789062, Low=115.51000213623047, Open=116.02999877929688, Close=116.86000061035156, Volume=20663900.0, Adj=116.86000061035156, Company='FACEBOOK'), Row(Date=datetime.date(2017, 1, 4), High=119.66000366210938, Low=117.29000091552734, Open=117.55000305175781, Close=118.69000244140625, Volume=19630900.0, Adj=118.69000244140625, Company='FACEBOOK'), Row(Date=datetime.date(2017, 1, 5), High=120.94999694824219, Low=118.31999969482422, Open=118.86000061035156, Close=120.66999816894531, Volume=19492200.0, Adj=120.66999816894531, Company='FACEBOOK'), Row(Date=datetime.date(2017, 1, 6), High=123.87999725341797, Low=120.02999877929688, Open=120.9800033569336, Close=123.41000366210938, Volume=28545300.0, Adj=123.41000366210938, Company='FACEBOOK'), Row(Date=datetime.date(2017, 1, 9), High=125.43000030517578, Low=123.04000091552734, Open=123.55000305175781, Close=124.9000015258789, Volume=22880400.0, Adj=124.9000015258789, Company='FACEB

- **Get the number of observations**

In [82]:
def num_obbservations(dfs):
    observation_num = 0
    for key in dfs:
        observation_num += dfs[key].count()
    return observation_num

num_obbservations(dfs)

6333

* **Deduce programmatically what is the period you have between the data points**

In [105]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
import builtins



def period_data_points(df):
    my_window = Window.partitionBy().orderBy("Date")
    df_temp = df.withColumn("Date2", F.lag(df.Date).over(my_window))
    df_temp = df.orderBy("Date").withColumn("Date2", df.Date)
    df_temp2 = df_temp.withColumn("Period", df_temp.Date - df_temp.Date2)
    return df_temp2.groupby("Period").count()

def period_between_points(df):
    dates = df.select("Date").withColumn("id", monotonically_increasing_id())
    mwindow = Window.partitionBy().orderBy("id")
    dates = dates.withColumn("Previous_Date", F.lag(dates.Date).over(mwindow))
    dates = dates.withColumn("Period_Between", F.when(F.isnull(F.datediff(dates.Date, dates.Previous_Date)), 0)
        .otherwise(F.datediff(dates.Date, dates.Previous_Date)))
    dates = dates.drop("id")
    periods = dates.select('Period_Between').rdd.flatMap(lambda x: x).collect()
    nb_days = builtins.max(set(periods), key=periods.count)
    if nb_days < 28 :
        print(f"The period we have between the data points is {nb_days} day(s)")
    elif nb_days < 365 :
        print(f"The period we have between the data points is {nb_days/28} month(s)")
    elif nb_days > 365 :
        print(f"The period we have between the data points is {nb_days/365} year(s)")


#print(period_data_points(dfs["GOOGLE"]))
period_between_points(dfs["GOOGLE"])

The period we have between the data points is 1 day(s)


* **Descriptive statistics for each dataframe and each column (min, max, standard deviation)**

In [96]:
def df_statistics(df):
    dfs[key].describe().show()

for key in dfs:
    print(key)
    df_statistics(dfs[key])

FACEBOOK
+-------+------------------+-----------------+------------------+------------------+-------------------+------------------+--------+
|summary|              High|              Low|              Open|             Close|             Volume|               Adj| Company|
+-------+------------------+-----------------+------------------+------------------+-------------------+------------------+--------+
|  count|               987|              987|               987|               987|                987|               987|     987|
|   mean| 186.3984500224105|182.0770063931698|184.23939196461964|184.34894618968713|2.085238712259372E7|184.34894618968713|    null|
| stddev|37.254783636139116|35.96107958363421| 36.62328370702378| 36.62017415511841|1.323930068044522E7| 36.62017415511841|    null|
|    min|            117.84|           115.51|            116.03|            116.86|          4113594.0|            116.86|FACEBOOK|
|    max|            304.67|           293.05|            30

* **Number of missing values for each dataframe and column**

In [97]:
from pyspark.sql.functions import isnan, when, count, col

def missing_value_num(df):
    print(key + ':')
    df.select([count(when(col('Date').isNull(), 'Date')).alias('Date')]).show()
    df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns[1:]]).show()

for key in dfs:
    missing_value_num(dfs[key])



FACEBOOK:
+----+
|Date|
+----+
|   0|
+----+

+----+---+----+-----+------+---+-------+
|High|Low|Open|Close|Volume|Adj|Company|
+----+---+----+-----+------+---+-------+
|   0|  0|   0|    0|     0|  0|      0|
+----+---+----+-----+------+---+-------+

MICROSOFT:
+----+
|Date|
+----+
|   0|
+----+

+----+---+----+-----+------+---+-------+
|High|Low|Open|Close|Volume|Adj|Company|
+----+---+----+-----+------+---+-------+
|   0|  0|   0|    0|     0|  0|      0|
+----+---+----+-----+------+---+-------+

APPLE:
+----+
|Date|
+----+
|   0|
+----+

+----+---+----+-----+------+---+-------+
|High|Low|Open|Close|Volume|Adj|Company|
+----+---+----+-----+------+---+-------+
|   0|  0|   0|    0|     0|  0|      0|
+----+---+----+-----+------+---+-------+

GOOGLE:
+----+
|Date|
+----+
|   0|
+----+

+----+---+----+-----+------+---+-------+
|High|Low|Open|Close|Volume|Adj|Company|
+----+---+----+-----+------+---+-------+
|   0|  0|   0|    0|     0|  0|      0|
+----+---+----+-----+------+---+------

* **Correlation between values**

In [98]:
from pyspark.sql.functions import *

def corr_bet_values(df, column1, column2):
    return df.stat.corr(column1, column2)

print(corr_bet_values(dfs["GOOGLE"], "High", "Low"))

0.9979768472196063


**Questions**

- What is the average of the opening and closing prices for each stock price and for 
different time periods (week, month, year)

In [101]:
def average_prices(dataframe, period):
    averages =  dataframe.withColumn('week', F.weekofyear(F.col("Date"))) \
            .withColumn('month', F.month(F.col("Date"))) \
            .withColumn('year', F.year(F.col("Date"))) \
            .groupby('year', period) \
            .agg(F.avg("Open").alias("Average Open"), F.avg("Close").alias("Average Close")) \
            .orderBy('year', period).show()

    return averages


for key in dfs:
    print(key)
    print(average_prices(dfs[key], "week"))

FACEBOOK
+----+----+------------------+------------------+
|year|week|      Average Open|     Average Close|
+----+----+------------------+------------------+
|2017|   1|118.35500144958496|119.90750122070312|
|2017|   2|125.16399993896485|126.05999908447265|
|2017|   3|128.19499969482422|127.59500122070312|
|2017|   4|             130.2| 130.9479949951172|
|2017|   5|131.69200134277344| 131.2699981689453|
|2017|   6| 132.8820037841797|133.28599853515624|
|2017|   7|133.76400146484374|133.74200134277345|
|2017|   8|134.28750228881836| 135.1599998474121|
|2017|   9|  136.447998046875|136.65999755859374|
|2017|  10|137.53800048828126| 137.8940002441406|
|2017|  11|             139.6|139.69400329589843|
|2017|  12| 139.6699981689453|139.58199768066407|
|2017|  13| 141.2220001220703|141.83800048828124|
|2017|  14| 141.8719970703125|141.56199951171874|
|2017|  15| 140.2849998474121| 139.9824981689453|
|2017|  16|141.84599914550782| 142.4260009765625|
|2017|  17| 146.8019989013672|147.2940002

- How do the stock prices change day to day and month to month (may be you can 
create new columns to save those calculations)

In [57]:
def price_changes(df,period):
    if period == 'day':
        window = Window.partitionBy().orderBy('Date')
        return df.withColumn('previous_open', F.lag(F.col("open")).over(window)) \
            .withColumn('previous_close', F.lag(F.col("close")).over(window)) \
            .withColumn('evolution_open', F.when(F.isnull(F.col("open") - F.col("previous_open")), 0).otherwise((F.col("open") - F.col("previous_open")))) \
            .withColumn('evolution_close', F.when(F.isnull(F.col("close") - F.col("previous_close")), 0).otherwise((F.col("close") - F.col("previous_close")))) \
            .select('Date' ,'evolution_open', 'evolution_close')

    elif period == 'month':
        window = Window.partitionBy().orderBy('year', 'month')
        return df.withColumn('month', F.month(F.col("Date"))) \
            .withColumn('year', F.year(F.col("Date"))) \
            .groupby('year', 'month') \
            .agg(F.sum("Open").alias("sum_open"), F.sum("Close").alias("sum_close")) \
            .orderBy('year', 'month') \
            .withColumn('previous_sum_open', F.lag(F.col("sum_open")).over(window)) \
            .withColumn('previous_sum_close', F.lag(F.col("sum_close")).over(window)) \
            .withColumn('evolution_open', F.when(F.isnull(F.col("sum_open") - F.col("previous_sum_open")), 0).otherwise((F.col("sum_open") - F.col("previous_sum_open")))) \
            .withColumn('evolution_close', F.when(F.isnull(F.col("sum_close") - F.col("previous_sum_close")), 0).otherwise((F.col("sum_close") - F.col("previous_sum_close")))) \
            .select('year' ,'month', 'evolution_open', 'evolution_close')

price_changes(dfs["GOOGLE"], "month").show(10)

+----+-----+-------------------+-------------------+
|year|month|     evolution_open|    evolution_close|
+----+-----+-------------------+-------------------+
|2017|    1|                0.0|                0.0|
|2017|    2| -659.5100708007812| -636.6799926757812|
|2017|    3| 3694.4901733398438|   3663.14501953125|
|2017|    4|  -3176.85009765625|-3147.4801025390625|
|2017|    5| 4598.6300048828125|     4627.169921875|
|2017|    6|   437.030029296875|  318.6202392578125|
|2017|    7|           -2210.25|-2125.5699462890625|
|2017|    8|   2364.14990234375| 2349.2298583984375|
|2017|    9|-2580.7899780273438|-2580.5399780273438|
|2017|   10| 2984.8200073242188|             2989.5|
+----+-----+-------------------+-------------------+
only showing top 10 rows



- Based on the opening and closing price, calculate the daily return of each stock

In [58]:
def daily_stocks_price(dataframe):
    return dataframe.withColumn("daily_price", dataframe['Close'] - dataframe['Open'])

daily_stocks_price(dfs["GOOGLE"]).show()

+----------+-------+-------+------+-------+---------+-------+-------+------------+
|      Date|   High|    Low|  Open|  Close|   Volume|    Adj|Company| daily_price|
+----------+-------+-------+------+-------+---------+-------+-------+------------+
|2017-01-03| 789.63|  775.8|778.81| 786.14|1657300.0| 786.14| GOOGLE|    7.330017|
|2017-01-04| 791.34| 783.16|788.36|  786.9|1073000.0|  786.9| GOOGLE|  -1.4599609|
|2017-01-05| 794.48| 785.02|786.08| 794.02|1335200.0| 794.02| GOOGLE|   7.9400024|
|2017-01-06|  807.9|792.204|795.26| 806.15|1640200.0| 806.15| GOOGLE|   10.890015|
|2017-01-09|809.966| 802.83| 806.4| 806.65|1274600.0| 806.65| GOOGLE|        0.25|
|2017-01-10| 809.13| 803.51|807.86| 804.79|1176800.0| 804.79| GOOGLE|  -3.0700073|
|2017-01-11| 808.15| 801.37| 805.0| 807.91|1065900.0| 807.91| GOOGLE|   2.9099731|
|2017-01-12| 807.39| 799.17|807.14| 806.36|1353100.0| 806.36| GOOGLE|  -0.7800293|
|2017-01-13|811.224| 806.69|807.48| 807.88|1099200.0| 807.88| GOOGLE|   0.4000244|
|201

- What are the stocks with the highest daily return

In [108]:
def highest_value_stock(df, start, end):
    df_daily = daily_stocks_price(df)
    data = df_daily.filter(df_daily["Date"] >= lit(start))
    data = data.filter(data["Date"] <= lit(end))
    print(data.agg({"daily_price": "max"}).collect()[0][0])
    
highest_value_stock(dfs["APPLE"],"2017-08-20","2017-08-28")

0.3875007629394531


- Calculate the average daily return for different periods (week, month, and year)

In [103]:
def average_daily(dataframe, period):
    return daily_stocks_price(dataframe).withColumn('week', F.weekofyear(F.col("Date"))) \
            .withColumn('month', F.month(F.col("Date"))) \
            .withColumn('year', F.year(F.col("Date"))) \
            .groupby('year', period) \
            .agg(F.avg("daily_price").alias("Average Daily Return")) \
            .orderBy('year', period)

average_daily(dfs["GOOGLE"], 'week').show(10)

+----+----+--------------------+
|year|week|Average Daily Return|
+----+----+--------------------+
|2017|   1|   6.175018310546875|
|2017|   2|       -0.0580078125|
|2017|   3| -1.7612457275390625|
|2017|   4|     0.5239990234375|
|2017|   5|    -2.6329833984375|
|2017|   6|    1.60399169921875|
|2017|   7|     2.7179931640625|
|2017|   8|   1.805023193359375|
|2017|   9|     0.8040283203125|
|2017|  10|    1.96998291015625|
+----+----+--------------------+
only showing top 10 rows



* **Moving Average**

In [61]:
def moving_average(dataframe, column_name, points_num):
    days = lambda i: i * 86400
    w = Window.partitionBy().orderBy(F.col("Date").cast("timestamp").cast("long")).rangeBetween(0, days(points_num-1))
    dataframe.select(col("Date"), col("Company"), avg(column_name).over(w).alias("Moving average of " + column_name)).show(5)
    return

moving_average(dfs["GOOGLE"], "Close", 3)

+----------+-------+-----------------------+
|      Date|Company|Moving average of Close|
+----------+-------+-----------------------+
|2017-01-03| GOOGLE|        789.02001953125|
|2017-01-04| GOOGLE|      795.6900227864584|
|2017-01-05| GOOGLE|      800.0850219726562|
|2017-01-06| GOOGLE|      806.1500244140625|
|2017-01-09| GOOGLE|      806.4499918619791|
+----------+-------+-----------------------+
only showing top 5 rows



* **Correlation**

In [62]:
def correlation(data1,data2):
    L = []
    cols = data1.drop("Date","Company")
    for col in cols.columns :
        L.append(col)
    cor_res = [L,[]]
    company_string = "_" + data2.collect()[0][7]
    df2 = data2.select(*(F.col(c).alias(c+company_string) for c in data2.columns))
    data_join = data1.join(df2,data1["Date"]==df2["Date"+company_string],how="inner")
    for cols in L:
        cor_res[1].append(data_join.corr(cols,cols+company_string))
    return cor_res

res = correlation(dfs["GOOGLE"],dfs["AMAZON"])
print(res[0])
print(res[1])

['High', 'Low', 'Open', 'Close', 'Volume', 'Adj']
[0.9279218467592136, 0.9238630174063733, 0.9262870255493187, 0.9255061932293709, 0.6104197582530982, 0.9255061932293709]


- When investing in stocks, the return rate is very important. Code a function that calculates the return rate of the stock in different periods (week, month and year)

In [106]:
def rate_on_period(df, period):
    return daily_stocks_price(df).withColumn('week', F.weekofyear(F.col("Date"))) \
            .withColumn('month', F.month(F.col("Date"))) \
            .withColumn('year', F.year(F.col("Date"))) \
            .groupby('year', period) \
            .agg(F.sum("daily_price").alias("Return Rate")) \
            .orderBy('year', period)

rate_on_period(dfs["GOOGLE"], "week").show(10)

+----+----+-----------------+
|year|week|      Return Rate|
+----+----+-----------------+
|2017|   1| 24.7000732421875|
|2017|   2|    -0.2900390625|
|2017|   3|-7.04498291015625|
|2017|   4|  2.6199951171875|
|2017|   5|-13.1649169921875|
|2017|   6| 8.01995849609375|
|2017|   7| 13.5899658203125|
|2017|   8|  7.2200927734375|
|2017|   9|  4.0201416015625|
|2017|  10| 9.84991455078125|
+----+----+-----------------+
only showing top 10 rows



- Given a specific month, what is the stock with the best return rate

**Insights**
- The calculation of the stock volatility : volatility is the rate at which the price of a stock increases or decreases over a particular period. Higher stock price volatility often means higher risk and helps an investor to estimate the fluctuations that may happen in the future.
- The Stochastic oscillator : the term stochastic refers to the comparison of the current price of a security to its previous values. The purpose of this oscillator is to determine trend reversals.
- The calculation of the sharpe ratio : it is an indicator of the (marginal) return obtained per unit of risk taken in this management it allows us to answer the following question: does the manager manage to obtain a higher return than the benchmark, but with more risk?
- A graph about the stock evolution.
- The Bollinger Bands : they are used in market finance for technical analysis and allow to evaluate the volatility and the probable evolution of prices or indices
- The valuation of the company : volume * price of a stock exchange action
- The difference between the stock price BEFORE and AFTER the COVID pandemic . It reveals if the company is resilient to crisis and if it made profits are not during this period.
- A machine learning prediction of the evolution of the stock rates depending on past months rates