In [0]:
df=spark.read.csv('/FileStore/tables/stocks_df.csv',header=True,inferSchema=True)
#display(df)

In [0]:
df1=df.select('Date','Stock','Open','High','Low','Close')
#df1.cache()

In [0]:
from pyspark.sql.functions import *
df2=df1.withColumn('year',year(col('Date')))\
       .withColumn('month',month(col('Date')))\
       .withColumn('day',dayofmonth(col('Date')))


In [0]:
df2.cache()

Out[29]: DataFrame[Date: date, Stock: string, Open: double, High: double, Low: double, Close: double, year: int, month: int, day: int]

In [0]:
from pyspark.sql.window import Window
df_rank=df2.withColumn("rank",dense_rank().over(Window.partitionBy(col('year'),col('stock'),col('month')).orderBy(col('date'))))
#display(df_rank)

In [0]:
df_rank.cache()

Out[7]: DataFrame[Date: date, Stock: string, Open: double, High: double, Low: double, Close: double, year: int, month: int, day: int, rank: int]

In [0]:
df_open = df_rank.select('Open', col('year').alias('year_open'), col('month').alias('month_open')) \
                .filter((col('Stock') == 'ABFRL') & (col('rank') == 1)).orderBy(col('year'),col('month'))

#display(df_open)


In [0]:
df_max_rank=df_rank.filter(col('stock')=='ABFRL').groupBy(col('year').alias('max_year'),col('month').alias('max_month')).agg(max(col('rank')).alias('maxrank')).orderBy(col('year'),col('month'))
#display(df_max_rank)


In [0]:
df_close = df_rank.join(broadcast(df_max_rank),
                        (df_rank['year'] == df_max_rank['max_year']) &
                        (df_rank['month'] == df_max_rank['max_month']) &
                        (df_rank['rank'] == df_max_rank['maxrank']),
                        how='inner')
#display(df_close)

In [0]:
df_close_filter=df_close.select('Date','Stock','Close','year','month').filter(col('Stock')=='ABFRL').orderBy(col('year'),col('month'))
#display(df_close_filter)

In [0]:
df_final=df_close_filter.join(df_open,(df_close['year']==df_open['year_open']) &(df_close['month']==df_open['month_open']),how='inner')
df_final_1=df_final.select('Date','Stock','Open','Close','year','month').orderBy(col('year'),col('month'))
#display(df_final_1)

In [0]:
df_final_returns_monthly=df_final_1.withColumn('returns_monthly',round((col('Close')-col('Open'))/col('Open')*100,2))
display(df_final_returns_monthly)

Date,Stock,Open,Close,year,month,returns_monthly
2013-07-31,ABFRL,157.7,154.2,2013,7,-2.22
2013-08-30,ABFRL,158.8,127.0,2013,8,-20.03
2013-09-30,ABFRL,125.4,111.75,2013,9,-10.89
2013-10-31,ABFRL,111.25,101.25,2013,10,-8.99
2013-11-29,ABFRL,101.0,87.1,2013,11,-13.76
2013-12-31,ABFRL,86.5,89.65,2013,12,3.64
2014-01-31,ABFRL,89.9,84.05,2014,1,-6.51
2014-02-28,ABFRL,84.05,85.1,2014,2,1.25
2014-03-31,ABFRL,85.0,82.6,2014,3,-2.82
2014-04-30,ABFRL,81.8,122.65,2014,4,49.94


In [0]:
df_final_returns_yearly_open=df_final_returns_monthly.filter(col('month')==1).select('Stock','Open','year','month').orderBy(col('year'))
display(df_final_returns_yearly_open)

Stock,Open,year,month
ABFRL,89.9,2014,1
ABFRL,119.0,2015,1
ABFRL,233.0,2016,1
ABFRL,138.5,2017,1
ABFRL,170.1,2018,1
ABFRL,199.56,2019,1
ABFRL,229.57,2020,1
ABFRL,166.65,2021,1
ABFRL,272.0,2022,1


In [0]:
df_final_returns_yearly_close=df_final_returns_monthly.filter(col('month')==12).select(col('Stock').alias('Stock_close'),'Close',col('year').alias('year_close'),'month').orderBy(col('year_close'))
display(df_final_returns_yearly_close)

Stock_close,Close,year_close,month
ABFRL,89.65,2013,12
ABFRL,120.15,2014,12
ABFRL,231.3,2015,12
ABFRL,138.05,2016,12
ABFRL,172.45,2017,12
ABFRL,199.61,2018,12
ABFRL,228.39,2019,12
ABFRL,165.6,2020,12
ABFRL,272.25,2021,12


In [0]:
df_yearly_join=df_final_returns_yearly_open.join(df_final_returns_yearly_close,df_final_returns_yearly_open['year']==df_final_returns_yearly_close['year_close'],how='inner')
display(df_yearly_join)

Stock,Open,year,month,Stock_close,Close,year_close,month.1
ABFRL,89.9,2014,1,ABFRL,120.15,2014,12
ABFRL,119.0,2015,1,ABFRL,231.3,2015,12
ABFRL,233.0,2016,1,ABFRL,138.05,2016,12
ABFRL,138.5,2017,1,ABFRL,172.45,2017,12
ABFRL,170.1,2018,1,ABFRL,199.61,2018,12
ABFRL,199.56,2019,1,ABFRL,228.39,2019,12
ABFRL,229.57,2020,1,ABFRL,165.6,2020,12
ABFRL,166.65,2021,1,ABFRL,272.25,2021,12


In [0]:
df_yearly_final=df_yearly_join.withColumn('Yearly_returns',round((col('Close')-col('Open'))/col('Open')*100,2))
df_yearly_final.show(truncate=0)

+-----+------+----+-----+-----------+------+----------+-----+--------------+
|Stock|Open  |year|month|Stock_close|Close |year_close|month|Yearly_returns|
+-----+------+----+-----+-----------+------+----------+-----+--------------+
|ABFRL|89.9  |2014|1    |ABFRL      |120.15|2014      |12   |33.65         |
|ABFRL|119.0 |2015|1    |ABFRL      |231.3 |2015      |12   |94.37         |
|ABFRL|233.0 |2016|1    |ABFRL      |138.05|2016      |12   |-40.75        |
|ABFRL|138.5 |2017|1    |ABFRL      |172.45|2017      |12   |24.51         |
|ABFRL|170.1 |2018|1    |ABFRL      |199.61|2018      |12   |17.35         |
|ABFRL|199.56|2019|1    |ABFRL      |228.39|2019      |12   |14.45         |
|ABFRL|229.57|2020|1    |ABFRL      |165.6 |2020      |12   |-27.87        |
|ABFRL|166.65|2021|1    |ABFRL      |272.25|2021      |12   |63.37         |
+-----+------+----+-----+-----------+------+----------+-----+--------------+



In [0]:
df_yearly_final_1=df_yearly_final.select('Stock','year','Yearly_returns')
df_yearly_final_1.show(truncate=0)

+-----+----+--------------+
|Stock|year|Yearly_returns|
+-----+----+--------------+
|ABFRL|2014|33.65         |
|ABFRL|2015|94.37         |
|ABFRL|2016|-40.75        |
|ABFRL|2017|24.51         |
|ABFRL|2018|17.35         |
|ABFRL|2019|14.45         |
|ABFRL|2020|-27.87        |
|ABFRL|2021|63.37         |
+-----+----+--------------+

