In [1]:
import sys; 
sys.path.insert(0, '..')

In [2]:
import findspark
findspark.init()

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder. \
    appName("pyspark-1"). \
    getOrCreate()

In [6]:
# Please use below file names
# NIFTY - NIFTY_5min.csv
# BANKNIFTY - BN_5min.csv
# FINNIFTY - FINNIFTY_5min.csv
# Candle data is available in the CSV files
"""
datetime,open,high,low,close
2022-05-23T09:15:00+05:30,15830.95,15910.6,15809.4,15825.9
"""
df = spark.read.csv("/dataset/FINNIFTY_5min.csv", header=True, inferSchema=True)
df.printSchema()

root
 |-- datetime: timestamp (nullable = true)
 |-- open: double (nullable = true)
 |-- high: double (nullable = true)
 |-- low: double (nullable = true)
 |-- close: double (nullable = true)



In [7]:
import pyspark.sql.functions as f
from pyspark.sql import Window
df_extended=df.withColumn("date",f.to_date('datetime')) \
              .withColumn("IST",f.from_utc_timestamp(f.col("datetime"),"IST")) \
              .withColumn('time',f.date_format('IST', 'HH:mm:ss')) \
              .withColumn("day",f.date_format('date','EEEE')) \
              .withColumn("duration_unit",f.row_number().over(Window.partitionBy("date").orderBy("datetime") )) \
              .withColumn("duration",f.col("duration_unit")*5) \
              .drop("duration_unit")
df_extended.show()

+-------------------+--------+--------+--------+--------+----------+-------------------+--------+---------+--------+
|           datetime|    open|    high|     low|   close|      date|                IST|    time|      day|duration|
+-------------------+--------+--------+--------+--------+----------+-------------------+--------+---------+--------+
|2022-07-27 03:45:00| 16727.1|16755.95|16694.55| 16741.6|2022-07-27|2022-07-27 09:15:00|09:15:00|Wednesday|       5|
|2022-07-27 03:50:00| 16743.3|16746.55| 16678.0| 16679.2|2022-07-27|2022-07-27 09:20:00|09:20:00|Wednesday|      10|
|2022-07-27 03:55:00| 16678.3| 16688.5| 16666.7| 16667.9|2022-07-27|2022-07-27 09:25:00|09:25:00|Wednesday|      15|
|2022-07-27 04:00:00|16666.25| 16675.7| 16661.7|16670.05|2022-07-27|2022-07-27 09:30:00|09:30:00|Wednesday|      20|
|2022-07-27 04:05:00|16669.45| 16698.2| 16669.1|16685.05|2022-07-27|2022-07-27 09:35:00|09:35:00|Wednesday|      25|
|2022-07-27 04:10:00| 16685.4|16686.65|16661.55| 16678.4|2022-07

In [9]:
# Start time and End time in IST
# The volatality will be calcuted based on this timeframe
start_time="10:00:00"
end_time="12:00:00"

In [13]:
avg_df=df_extended.filter(df_extended.time==start_time) \
                  .selectExpr("date","duration as dur_start","round((high+low)/2,2) as avg")

joined_df=df_extended.filter((df_extended.time>=start_time) & (df_extended.time<=end_time)) \
                     .join(avg_df,df_extended.date==avg_df.date).drop(avg_df.date)

In [20]:
updated_df=joined_df.withColumn("diff_high",f.round(f.abs(f.col("avg")-f.col("high")),2)) \
         .withColumn("diff_low",f.round(f.abs(f.col("avg")-f.col("low")),2)) \
         .withColumn("max_diff",f.when(f.col("diff_high")>f.col("diff_low"),f.col("diff_high")) \
                     .otherwise(f.col("diff_low"))) \
         .drop(*("diff_high","diff_low","datetime","IST")) \
         .withColumn("duration_in_min",f.col("duration")-f.col("dur_start"))

updated_df.filter(f.col("day")=="Tuesday").show()

+--------+--------+--------+--------+--------+-------+--------+----------+---------+-------+--------+---------------+
|    open|    high|     low|   close|    time|    day|duration|      date|dur_start|    avg|max_diff|duration_in_min|
+--------+--------+--------+--------+--------+-------+--------+----------+---------+-------+--------+---------------+
|17423.45| 17435.8| 17414.2| 17435.8|10:00:00|Tuesday|      50|2022-08-02|       50|17425.0|    10.8|              0|
|17434.45| 17444.9|17402.85| 17409.2|10:05:00|Tuesday|      55|2022-08-02|       50|17425.0|   22.15|              5|
| 17409.1| 17409.1|17385.95| 17393.8|10:10:00|Tuesday|      60|2022-08-02|       50|17425.0|   39.05|             10|
| 17392.6| 17413.9| 17392.4|17411.35|10:15:00|Tuesday|      65|2022-08-02|       50|17425.0|    32.6|             15|
|17410.95| 17427.2|17397.25|17417.85|10:20:00|Tuesday|      70|2022-08-02|       50|17425.0|   27.75|             20|
|17416.95| 17421.4| 17404.2|17411.05|10:25:00|Tuesday|  

In [None]:
# filtering Tuesday records since Tuesday is the expiry day for FINNIFTY
final_df=updated_df.filter(f.col("day")=="Tuesday") \
          .withColumn("r1",f.row_number().over(Window.partitionBy("date").orderBy(f.col("max_diff").desc()) )) \
          .filter(f.col("r1")==1) \
          .selectExpr("row_number() over(order by date) as Sl_No","day","date","time","max_diff","duration_in_min")
final_df.show(1000)