In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
#######################################
###!@0 START INIT ENVIRONMENT
from google.colab import drive
drive.mount('/content/drive')
!ls -al /content/drive/Shareddrives/DA231-2023/assignments/spark-3.0.3-bin-hadoop2.7.tgz
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!tar xf /content/drive/Shareddrives/DA231-2023/assignments/spark-3.0.3-bin-hadoop2.7.tgz
!pip install -q findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.3-bin-hadoop2.7"
###!@0 END INIT ENVIRONMENT

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
-r-------- 1 root root 220400553 Sep  4 09:38 /content/drive/Shareddrives/DA231-2023/assignments/spark-3.0.3-bin-hadoop2.7.tgz


In [None]:
#######################################
###!@1 START OF PYSPARK INIT
import findspark
findspark.init()
findspark.find()
from pyspark.sql import SparkSession
input_type = 'sample'
spark = SparkSession.builder\
         .master("local")\
         .appName("StockPrediction")\
         .config('spark.ui.port', '4050')\
         .getOrCreate()
spark
# Spark is ready to go within Colab!
###!@1 END OF PYSPARK INIT

In [None]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
from pyspark.sql import functions, Row
from pyspark.sql.window import Window
from pyspark.sql.functions import input_file_name, substring, regexp_extract, \
                                  col, regexp_replace, udf,date_format, \
                                  first, last, quarter, year, lag, sum

from glob import glob

import pandas as pd
import ast
import re
from datetime import datetime

## Master instrument data file which is used to capture data from indices

In [None]:
finance_data_path = "/content/drive/MyDrive/DataEngineeringAtScale/finance-data-2/*.csv"
finance_data_file_list = glob(finance_data_path)

quarters = set()

for finance_data_file in finance_data_file_list:
  #finance_data = spark.read.csv(finance_data_file_list[0], header = True)
  finance_data = spark.read.option("delimiter", "\t") \
                  .option("header", "true") \
                  .csv(finance_data_file)
  if (not finance_data.columns):
    print("name: ", finance_data_file)
  else:
    colnames = finance_data.columns
    for colname in colnames[1:-1]:
      quarters.add(datetime.strptime(colname, '%Y-%m-%d %H:%M:%S'))

finance_data = spark.read.option("delimiter", "\t") \
                  .option("header", "true") \
                  .csv(finance_data_file_list[0])
finance_data.show()

print(quarters)

+--------------------+-------------------+-------------------+-------------------+-------------------+-------------+
|                 _c0|2022-09-30 00:00:00|2022-12-31 00:00:00|2023-03-31 00:00:00|2023-06-30 00:00:00|tradingsymbol|
+--------------------+-------------------+-------------------+-------------------+-------------------+-------------+
|Tax Effect Of Unu...|                0.0|        -42760000.0|        -32000000.0|                0.0|        GEPIL|
|  Tax Rate For Calcs|                0.0|                0.4|                0.4|                0.0|        GEPIL|
|   Normalized EBITDA|       -856900000.0|           500000.0|       -940200000.0|      -1148500000.0|        GEPIL|
| Total Unusual Items|                0.0|       -106900000.0|        -80000000.0|                0.0|        GEPIL|
|Total Unusual Ite...|                0.0|       -106900000.0|        -80000000.0|                0.0|        GEPIL|
|Net Income From C...|      -1125700000.0|      -1392900000.0|  

In [None]:
instrument_data_path = "/content/drive/MyDrive/DataEngineeringAtScale/instruments-data.txt"

data = []
eq_search_pattern = r"'instrument_type': 'EQ'"
bse_search_pattern = r"'exchange': 'BSE'"
name_search_pattern = r"name':\s*'([^']+)'"

import json

instrument_data = pd.DataFrame(columns=["instrument_token", "exchange_token",
                                        "tradingsymbol"])
eq_data = []

with open(instrument_data_path, 'r') as instruments_data_f:
    for data_str in instruments_data_f:
      bse_match= re.search(bse_search_pattern, data_str)
      if bse_match:
          equity_match = re.search(eq_search_pattern, data_str)
          if equity_match:
              name_match = re.search(name_search_pattern, data_str)
              if name_match:
                  name = name_match.group(1)

                  # Find the position of 'datetime' and the comma after it
                  datetime_index = data_str.find("name")
                  comma_after_datetime_index = data_str.find("',", datetime_index)
                  data_str = data_str[:datetime_index-1] + data_str[comma_after_datetime_index+2:]

                  # replace ' with "
                  data_str = data_str.replace("'", "\"")

                  # load the dictionary
                  data = json.loads(data_str)

                  eq_data.append({'name': name, \
                                   'instrument_token': data['instrument_token'], \
                                   'exchange_token': data['exchange_token'], \
                                   'tradingsymbol': data['tradingsymbol'], \
                                  })

def get_finance_data_file_name(symbolname) -> str:
    filename = symbolname + "_finance.csv"

    #check for filenane
    for file_n in finance_data_file_list:
      if file_n.find(filename) != -1:
        return file_n

    return "NA"

# Define a function to process each row and add a new column
def process_row(a2):
    path = get_finance_data_file_name(a2["name"])
    return a2 + (path, )

process_row_udf = udf(process_row, StringType())

# company data
company_df = spark.read.csv("/content/drive/MyDrive/DataEngineeringAtScale/trading-industry-2.csv", header=True, inferSchema=True)
company_df = company_df.filter((col("delisted") == False) & (col("delisted").isNotNull()))
company_df = company_df.drop("name")
company_df.printSchema()
#company_df.show()

#instrument data
instrument_data_df = spark.createDataFrame(eq_data)
instrument_data_df.show()

industry_data_df = instrument_data_df.join(company_df,
                        company_df.tradingindex == instrument_data_df.tradingsymbol,
                        how='inner')
industry_data_df = industry_data_df.drop("tradingsymbol", "exchange_token", "instrument_token")
industry_data_df = industry_data_df.rdd.map(process_row).toDF(industry_data_df.columns + ["finance-data-path"])
#industry_data_df.show()

root
 |-- macro: string (nullable = true)
 |-- sector: string (nullable = true)
 |-- industry: string (nullable = true)
 |-- basicIndustry: string (nullable = true)
 |-- delisted: string (nullable = true)
 |-- isETFSec: string (nullable = true)
 |-- tradingindex: string (nullable = true)





+--------------+----------------+--------------------+-------------+
|exchange_token|instrument_token|                name|tradingsymbol|
+--------------+----------------+--------------------+-------------+
|        543151|       139046660|NIPPON INDIA MUTU...|        08ABB|
|        543170|       139051524|NIPPON INDIA MUTU...|        08ADD|
|        543145|       139045124|NIPPON INDIA MUTU...|        08ADR|
|        543153|       139047172|NIPPON INDIA MUTU...|        08AGG|
|        543147|       139045636|NIPPON INDIA MUTU...|        08AMD|
|        543148|       139045892|NIPPON INDIA MUTU...|        08AMR|
|        543155|       139047684|NIPPON INDIA MUTU...|        08AQD|
|        543168|       139051012|NIPPON INDIA MUTU...|        08AQR|
|        543149|       139046148|NIPPON INDIA MUTU...|        08BPB|
|        543156|       139047940|NIPPON INDIA MUTU...|        08DPD|
|        543169|       139051268|NIPPON INDIA MUTU...|        08DPR|
|        543150|       139046404|N

## Stock price data (Hourly)

In [None]:
Stock_DataPath = "/content/drive/MyDrive/DataEngineeringAtScale/hourly-data/*.txt"
file_list = glob(Stock_DataPath)

schema = StructType([StructField("date",TimestampType(),True),
                     StructField("open",DoubleType(),True),
                     StructField("high",DoubleType(),True),
                     StructField("low",DoubleType(),True),
                     StructField("close",DoubleType(),True),
                     StructField("volume",DoubleType(),True)])

hourly_data_df = spark.read \
    .option("header", "false") \
    .option("inferSchema", "false") \
    .schema(schema) \
    .json(Stock_DataPath) \
    .withColumn("symbol", regexp_extract(input_file_name(), "/([^/]+)$", 1))

hourly_data_df = hourly_data_df.withColumnRenamed("symbol", "name")
hourly_data_df = hourly_data_df.withColumn("name", regexp_replace(col("name"), r"_hourly_data_bse\.txt", ""))
hourly_data_df = hourly_data_df.withColumn("name", regexp_replace(col("name"), "%20", " "))

In [None]:
hourly_data_df.show()

+-------------------+------+------+------+------+-------+--------------------+
|               date|  open|  high|   low| close| volume|                name|
+-------------------+------+------+------+------+-------+--------------------+
|2022-10-06 09:15:00| 323.3| 326.6| 321.8|323.05|89026.0|CHAMBAL FERTILISE...|
|2022-10-06 10:15:00|323.05| 324.8|322.85| 324.8| 6803.0|CHAMBAL FERTILISE...|
|2022-10-06 11:15:00|324.45| 325.0| 323.5| 323.7| 4044.0|CHAMBAL FERTILISE...|
|2022-10-06 12:15:00| 323.7|324.55|323.25| 323.6| 3927.0|CHAMBAL FERTILISE...|
|2022-10-06 13:15:00| 323.6|323.75| 320.6|320.95| 8709.0|CHAMBAL FERTILISE...|
|2022-10-06 14:15:00|320.95|323.05| 320.1|321.65| 9556.0|CHAMBAL FERTILISE...|
|2022-10-06 15:15:00|321.65|322.55| 320.9|322.55| 5936.0|CHAMBAL FERTILISE...|
|2022-10-07 09:15:00|324.15|324.15| 316.4|320.05|21319.0|CHAMBAL FERTILISE...|
|2022-10-07 10:15:00|320.05|320.05| 318.1| 318.5| 4467.0|CHAMBAL FERTILISE...|
|2022-10-07 11:15:00| 318.5| 319.2|317.75| 318.6| 87

## Identify the quarterly gain per stock

In [None]:
# Convert 'Date' column to a date format
#hourly_data_df = hourly_data_df.withColumn("date", date_format("date", "yyyy-MM-dd HH::MM::SS"))

# Extract date to group data by date
hourly_data_day_df = hourly_data_df.withColumn("day", substring("date", 1, 10))

# Calculate the daily stock price change for each stock
daily_data_aggregated_vol_df = hourly_data_day_df.groupBy("day", "name") \
    .agg({"open": "first", "close": "last", "volume": "sum"}) \
    .withColumnRenamed("first(open)", "open") \
    .withColumnRenamed("last(close)", "close") \
    .withColumnRenamed("sum(volume)", "total_volume")

daily_data_aggregated_vol_df.show(10)

# Extract year and quarter from the 'day' column
quarterly_data = daily_data_aggregated_vol_df.withColumn("year", year("day")).withColumn("quarter", quarter("day"))

In [None]:
# Group by 'name', 'Year', and 'Quarter', aggregating open price, close price, and total volume
quarterly_aggregated_df = quarterly_data.groupBy("name", "year", "quarter") \
    .agg(first("open").alias("quarterly_open"), \
         last("close").alias("quarterly_close"), \
         sum("total_volume").alias("total_volume"))

# Show the DataFrame with the calculated price change for each year and each quarter
quarterly_aggregated_df.show(10)

Find the hour of the day with maximum gain and maximum volume

In [None]:
from pyspark.sql import functions as F

def get_hour_day_from_df(hourly_df):
  # Extract date and hour from the 'date' column
  hourly_date_timestamp_df = hourly_df.withColumn("date", F.to_timestamp("date"))  # Ensure 'date' is in timestamp format
  hourly_date_day_df = hourly_date_timestamp_df.withColumn("day", F.to_date("date"))
  hourly_day_hour_df = hourly_date_day_df.withColumn("hour", F.hour("date"))
  return hourly_day_hour_df

def get_hour_max_volume():
  hourly_day_hour_df = get_hour_day_from_df(hourly_data_df)

  # Window specification to find max volume hour for each day and stock
  window_max_volume = Window.partitionBy("day", "name").orderBy(F.desc("volume"))

  # Find the hour with the maximum volume for each day and each stock
  max_volume_df = hourly_day_hour_df.withColumn("max_volume_rank", F.rank().over(window_max_volume)) \
      .filter(F.col("max_volume_rank") == 1) \
      .drop("max_volume_rank")

  return max_volume_df

def get_hour_max_price_gain(hourly_data_df):
  hourly_day_hour_df = get_hour_day_from_df(hourly_data_df)

  # Window specification to find max gain hour within each day and stock's max volume hour
  window_max_gain = Window.partitionBy("day", "name", "hour")

  # Calculate max gain for each 'day', 'name', and 'hour'
  max_gain_price_df = hourly_day_hour_df.withColumn("max_gain", \
                                                    F.max("high").over(window_max_gain) - F.min("low").over(window_max_gain))

  # Calculate maximum gain percent
  max_gain_percent_df = max_gain_price_df.withColumn("max_gain_percent", (F.col("max_gain") / F.col("low") * 100)).drop("max_gain")


  # Select the rows with the maximum gain for each 'day' and 'name'
  max_gain_hour_df = max_gain_price_df.withColumn("rn", \
                                                  F.row_number().over(Window.partitionBy("day", "name").orderBy(F.desc("max_gain")))) \
                                    .filter(F.col("rn") == 1) \
                                    .drop("rn")

  return max_gain_hour_df

max_gain_price_df = get_hour_max_price_gain(hourly_data_df)

In [None]:
max_gain_price_df.show(10)

In [None]:
# Group by hour and count the number of stocks with +ve / -ve gain for each hour
positive_gains_by_hour = max_gain_price_df.groupBy("hour").agg(F.countDistinct("name").alias("count_by_hour"))

# Identify the hour with the most positive gains
hour_with_most_positive_gains = positive_gains_by_hour.orderBy(F.desc("count_by_hour"))

In [None]:
hour_with_most_positive_gains.show()

In [None]:
def calculate_moving_avg(df, n_days, column, tgt_column):
  # Create a window specification
  days = lambda i: i * 86400
  window_spec = Window.partitionBy("name").orderBy(F.col("day").cast("timestamp").cast("long")).rowsBetween(-n_days, 0)

  # Calculate the moving average of closing price over 'n' days
  df_with_ma = df.withColumn(tgt_column, F.avg(column).over(window_spec))

  # Replace null values with the 'close' price itself per name
  df_with_ma = df_with_ma.withColumn(tgt_column, F.coalesce(col(tgt_column), col("close")))


  return df_with_ma

#daily_data_aggregated_vol_df = calculate_moving_avg(daily_data_aggregated_vol_df, 20, "close", "moving_avg_closing_price_20")
#daily_data_aggregated_vol_df = calculate_moving_avg(daily_data_aggregated_vol_df, 50, "close", "moving_avg_closing_price_50")

In [None]:
daily_data_aggregated_vol_df.show(50)

In [None]:
it_sw_companies_df = industry_data_df[industry_data_df["industry"] == "IT - Software"]

In [None]:
# Select distinct 'name' from df
it_distinct_names = it_sw_companies_df.select('name').distinct()
it_sw_daily_data_df = daily_data_aggregated_vol_df.join(it_distinct_names, 'name', 'inner')
it_sw_daily_data_df.show()

In [None]:
#it_sw_daily_data_df = it_sw_daily_data_df.drop("moving_avg_closing_price_20")
it_sw_ma_df = calculate_moving_avg(it_sw_daily_data_df, 10, "close", "moving_average_10_days_new")
it_sw_ma_df.show(20)

In [None]:
def calculate_days_spent_above_below_ma(df, ma_column):
  # Calculate the difference between close and the moving average
  price_diff_df = df.withColumn('price_diff_ma', F.col('close') - F.col(ma_column))

  # Determine if the stock is above or below the moving average
  price_diff_df = price_diff_df.withColumn('above_ma', F.when(F.col('price_diff_ma') > 0, 1).otherwise(0))
  price_diff_df = price_diff_df.withColumn('below_ma', F.when(F.col('price_diff_ma') < 0, 1).otherwise(0))

  # Define a window specification based on consecutive days
  window_spec_above = Window.partitionBy('name').orderBy('day').rowsBetween(Window.unboundedPreceding, Window.currentRow)
  window_spec_below = Window.partitionBy('name', 'below_ma').orderBy('day').rowsBetween(Window.unboundedPreceding, Window.currentRow)

  # Use window functions to count the number of days spent above and below the moving average
  result_df = price_diff_df.withColumn('days_above_ma', F.sum('above_ma').over(window_spec_above))
  result_df = result_df.withColumn('days_below_ma', F.sum('below_ma').over(window_spec_above))

  # Aggregate to get the total number of days spent above and below the moving average
  agg_result = result_df.groupBy('name').agg(F.max('days_above_ma').alias('total_days_above_ma'), F.max('days_below_ma').alias('total_days_below_ma'))
  return agg_result


In [None]:
ma_10days = calculate_days_spent_above_below_ma(it_sw_ma_df, "moving_average_10_days")
#ma_50days = calculate_days_spent_above_below_ma(daily_data_aggregated_vol_df, "moving_avg_closing_price_20")

In [None]:
ma_10days = ma_10days.withColumn("ratio_above_below_ma",
                             col("total_days_above_ma") / col("total_days_below_ma"))
companies_trading_above_ma = ma_10days.filter(col("ratio_above_below_ma") > 1).select("name").distinct().count()
print("Out of {0} companies {1} are trading above ma for longer time".format(ma_10days.count(), companies_trading_above_ma))

In [None]:
# Filter the stocks which have fundamental data captured
industry_data_distinct_names = industry_data_df.select('name').distinct()
filtered_data_df = daily_data_aggregated_vol_df.join(industry_data_distinct_names, 'name', 'inner')

#Calculate moving averages
filtered_data_ma_20_days_df = calculate_moving_avg(filtered_data_df, 20, "close", "moving_average_20_days")
filtered_data_ma_20_days_analysis_df = calculate_days_spent_above_below_ma(filtered_data_ma_20_days_df, "moving_average_20_days")

# Get the ratio on days spent above / below moving average
filtered_data_ma_20_days_analysis = filtered_data_ma_20_days_analysis_df.withColumn("ratio_above_below_ma",
                             col("total_days_above_ma") / col("total_days_below_ma"))
companies_trading_above_ma = filtered_data_ma_20_days_analysis.filter(col("ratio_above_below_ma") > 1).select("name").distinct().count()


print("Out of {0} companies {1} are trading above ma for longer time".format(filtered_data_ma_20_days_analysis.count(), companies_trading_above_ma))

In [None]:
ma_20_analysis_df = industry_data_df.join(filtered_data_ma_20_days_analysis, filtered_data_ma_20_days_analysis["name"] == industry_data_df["name"])
ma_20_analysis_df.show(10)


In [None]:
import matplotlib.pyplot as plt


# Aggregate counts for each industry status
industry_counts = ma_20_analysis_df.groupBy('ratio_above_below_ma').count().collect()

# Extracting counts for above and below MA
above_ma_count = next(item['count'] for item in industry_counts if item['above_below_ma'] == 'Above MA')
below_ma_count = next(item['count'] for item in industry_counts if item['above_below_ma'] == 'Below MA')

# Creating a bar chart
status = ['Above MA', 'Below MA']
counts = [above_ma_count, below_ma_count]

plt.figure(figsize=(8, 6))
plt.bar(status, counts, color=['green', 'red'])
plt.xlabel('Industry Status')
plt.ylabel('Count')
plt.title('Count of Industries Above and Below Moving Average')
plt.show()