In [31]:
import findspark
findspark.init()

In [32]:
from pyspark.conf import SparkConf
config = SparkConf()
config.setMaster("local").setAppName("DataFrameIntradayAsg03")

from pyspark.sql import SparkSession
spark = SparkSession.builder.config(conf=config).getOrCreate()

sc= spark.sparkContext

In [33]:
from pyspark.sql.types import StructType, LongType, StringType, IntegerType, DoubleType, DateType, TimestampType
# // TimestampType with , yyyyMMdd

IntradaySchema = StructType()\
                      .add("Symbol", StringType(), True)\
                      .add("Date", StringType(), True)\
                      .add("Time", StringType(), True)\
                      .add("Open", DoubleType(), True)\
                      .add("High", DoubleType(), True)\
                      .add("Low", DoubleType(), True)\
                      .add("Close", DoubleType(), True)\
                      .add("Volume", LongType(), True)\
                      .add("OI", LongType(), True)

In [34]:
intradayDf= spark.read.format("parquet").option("header", False).schema(IntradaySchema).load("hdfs://localhost:9000/silver/")
intradayDf.printSchema()

intradayDf.show(5)

root
 |-- Symbol: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Time: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: long (nullable = true)
 |-- OI: long (nullable = true)

+------------+--------+-----+-------+-------+-------+-------+------+-----+
|      Symbol|    Date| Time|   Open|   High|    Low|  Close|Volume|   OI|
+------------+--------+-----+-------+-------+-------+-------+------+-----+
|SILVERMIC_F1|20211201|09:01|62588.0|62707.0|62550.0|62659.0|   101|98752|
|SILVERMIC_F1|20211201|09:02|62662.0|62679.0|62658.0|62670.0|    98|98818|
|SILVERMIC_F1|20211201|09:03|62670.0|62670.0|62662.0|62669.0|    73|98852|
|SILVERMIC_F1|20211201|09:04|62667.0|62675.0|62650.0|62670.0|    71|98884|
|SILVERMIC_F1|20211201|09:05|62675.0|62678.0|62660.0|62670.0|    63|98895|
+------------+--------+-----+-------+-------+-------+-------+------+----

In [35]:
from pyspark.sql.functions import col, concat, lit, to_timestamp, date_format,date_trunc

#.withColumn("DateTimeStr", concat( col("Date"), lit(" "), col("Time")))
#dateDf= intradayDf.withColumn("DateTimeStr", concat( col("Date"), lit(" "), col("Time")))\
#                  .withColumn("DATEFORMAT", to_timestamp(col("DateTimeStr"), "yyyyMMdd HH:mm" ))\
#                  .withColumn("Min",date_format(("DATEFORMAT"),"mm"))\
#                  .withColumn("DATETO",date_format(("DATEFORMAT"),"MMM dd, yyyy hh:mm a"))

dateDf= intradayDf.withColumn("DateTimeStr", concat( col("Date"), lit(" "), col("Time")))\
                  .withColumn("DATEFORMAT", to_timestamp(col("DateTimeStr"), "yyyyMMdd HH:mm" ))\
                  .withColumn("DateinHour", date_trunc("hour", col("DATEFORMAT")))\
                  .drop("OI","DateTimeStr","Date","Time","DATEFORMAT")
                      
        
dateDf.show(5)
dateDf.printSchema()


+------------+-------+-------+-------+-------+------+-------------------+
|      Symbol|   Open|   High|    Low|  Close|Volume|         DateinHour|
+------------+-------+-------+-------+-------+------+-------------------+
|SILVERMIC_F1|62588.0|62707.0|62550.0|62659.0|   101|2021-12-01 09:00:00|
|SILVERMIC_F1|62662.0|62679.0|62658.0|62670.0|    98|2021-12-01 09:00:00|
|SILVERMIC_F1|62670.0|62670.0|62662.0|62669.0|    73|2021-12-01 09:00:00|
|SILVERMIC_F1|62667.0|62675.0|62650.0|62670.0|    71|2021-12-01 09:00:00|
|SILVERMIC_F1|62675.0|62678.0|62660.0|62670.0|    63|2021-12-01 09:00:00|
+------------+-------+-------+-------+-------+------+-------------------+
only showing top 5 rows

root
 |-- Symbol: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: long (nullable = true)
 |-- DateinHour: timestamp (nullable = true)



In [36]:
from pyspark.sql.functions import first,max,min,last,sum
hourlyDf= dateDf.groupby("symbol","DateinHour")\
                .agg(first("Open").alias("Open"),\
                     max("High").alias("High"),\
                     min("Low").alias("Low"),\
                     last("Close").alias("Close"),\
                     sum("Volume").alias("Volume"),\
                 )\
                 .orderBy("symbol","DateinHour")
hourlyDf.show(6)



+------+-------------------+-------+------+-------+-------+------+
|symbol|         DateinHour|   Open|  High|    Low|  Close|Volume|
+------+-------------------+-------+------+-------+-------+------+
|   ACC|2021-01-01 09:00:00| 1617.2|1632.0|1615.65| 1628.0| 41722|
|   ACC|2021-01-01 10:00:00| 1628.5|1628.5| 1621.0| 1625.0| 25785|
|   ACC|2021-01-01 11:00:00|1624.55|1629.5|1624.35| 1627.0| 18471|
|   ACC|2021-01-01 12:00:00| 1627.0|1631.9| 1623.0|1624.75| 16733|
|   ACC|2021-01-01 13:00:00| 1625.0|1626.6| 1623.4| 1625.0| 11990|
|   ACC|2021-01-01 14:00:00|1624.35|1628.0| 1618.0| 1619.2| 27039|
+------+-------------------+-------+------+-------+-------+------+
only showing top 6 rows



                                                                                

In [43]:
from pyspark.sql.window import Window
from pyspark.sql.functions import lag,lead, desc

windowspec=Window.partitionBy("Symbol").orderBy("DateinHour")


hourlylagDf=hourlyDf.withColumn("Closelag",lag("Close",1).over(windowspec))\
                    .withColumn("DATEFROM",lag("DateinHour",1).over(windowspec))\
                    .withColumn("Gain",col("Close")-col("Closelag"))\
                    .withColumn("GainP",col("Gain")/col("Closelag") * 100)\
                    .withColumn("Volumelag",lag("Volume",1).over(windowspec))\
                    .withColumn("VolumeGain",col("Volume")-col("Volumelag"))\
                    .withColumn("VolumeGainP",col("VolumeGain")/col("Volumelag")*100)\
                    .withColumnRenamed("DateinHour","DateTo")
                    
hourlylagDf.show(25)



+------------+-------------------+-------+-------+-------+-------+------+--------+-------------------+--------------------+--------------------+---------+----------+-------------------+
|      symbol|             DateTo|   Open|   High|    Low|  Close|Volume|Closelag|           DATEFROM|                Gain|               GainP|Volumelag|VolumeGain|        VolumeGainP|
+------------+-------------------+-------+-------+-------+-------+------+--------+-------------------+--------------------+--------------------+---------+----------+-------------------+
|GBPINR_JUN21|2021-04-29 17:00:00| 104.19| 104.19| 104.19| 104.19|     1|    null|               null|                null|                null|     null|      null|               null|
|GBPINR_JUN21|2021-04-30 17:00:00| 104.08| 104.08| 104.08| 104.08|     1|  104.19|2021-04-29 17:00:00|-0.10999999999999943|-0.10557635089739845|        1|         0|                0.0|
|     GOLD_F1|2021-01-01 09:00:00|50180.0|50239.0|50128.0|50165.0|   1

                                                                                

In [45]:
finalDf = hourlylagDf.select("Symbol","DATEFROM","DATETO","GainP","VolumeGainP")
finalDf.show(100)



+------------+-------------------+-------------------+--------------------+-------------------+
|      Symbol|           DATEFROM|             DATETO|               GainP|        VolumeGainP|
+------------+-------------------+-------------------+--------------------+-------------------+
|GBPINR_JUN21|               null|2021-04-29 17:00:00|                null|               null|
|GBPINR_JUN21|2021-04-29 17:00:00|2021-04-30 17:00:00|-0.10557635089739845|                0.0|
|     GOLD_F1|               null|2021-01-01 09:00:00|                null|               null|
|     GOLD_F1|2021-01-01 09:00:00|2021-01-01 10:00:00|0.023921060500348847|-50.955414012738856|
|     GOLD_F1|2021-01-01 10:00:00|2021-01-01 11:00:00| 0.01594355979831397|  48.05194805194805|
|     GOLD_F1|2021-01-01 11:00:00|2021-01-01 12:00:00|-0.00996313639533...|  29.82456140350877|
|     GOLD_F1|2021-01-01 12:00:00|2021-01-01 13:00:00| 0.02989238740534077| -86.48648648648648|
|     GOLD_F1|2021-01-01 13:00:00|2021-0

                                                                                