In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from google.cloud import storage
from pyspark.sql.functions import lit
from functools import reduce
from pyspark.sql import DataFrame
from pyspark.sql.functions import *
from datetime import datetime
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import StringIndexer, OneHotEncoder, IndexToString, VectorAssembler
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql.window import Window

In [2]:
spark = SparkSession.builder.appName('Nifty50').getOrCreate()

#change configuration settings on Spark 
conf = spark.sparkContext._conf.setAll([('spark.executor.memory', '30g'), ('spark.app.name', 'Spark Updated Conf'), ('spark.executor.cores', '4'), ('spark.cores.max', '4'), ('spark.driver.memory','30g'), ("spark.driver.maxResultSize", "70g")])

gcs_client = storage.Client()

In [3]:
bucket_name = 'bigdata_nifty50'

In [4]:
bucket = gcs_client.bucket(bucket_name)

In [5]:
file_name = 'df_with_indicators.csv'

In [6]:
df = spark.read.csv('gs://{}//{}'.format(bucket_name, file_name), inferSchema=True)

                                                                                

In [7]:
new_columns = ['date',
 'close',
 'high',
 'low',
 'open',
 'volume',
 'sma5',
 'sma10',
 'sma15',
 'sma20',
 'ema5',
 'ema10',
 'ema15',
 'ema20',
 'upperband',
 'middleband',
 'lowerband',
 'HT_TRENDLINE',
 'KAMA10',
 'KAMA20',
 'KAMA30',
 'SAR',
 'TRIMA5',
 'TRIMA10',
 'TRIMA20',
 'ADX5',
 'ADX10',
 'ADX20',
 'APO',
 'CCI5',
 'CCI10',
 'CCI15',
 'macd510',
 'macd520',
 'macd1020',
 'macd1520',
 'macd1226',
 'MFI',
 'MOM10',
 'MOM15',
 'MOM20',
 'ROC5',
 'ROC10',
 'ROC20',
 'PPO',
 'RSI14',
 'RSI8',
 'slowk',
 'slowd',
 'fastk',
 'fastd',
 'fastksr',
 'fastdsr',
 'ULTOSC',
 'WILLR',
 'ATR',
 'Trange',
 'TYPPRICE',
 'HT_DCPERIOD',
 'BETA',
 'sector',
 'company',
 'Inflation Rate ',
 'Balance of Trade',
 'Bank Lending Rate',
 'Car Registrations',
 'Consumer Price Index',
 'Crude Oil Production',
 'Fiscal Expenditure',
 'Industrial Production',
 'Food Inflation',
 'Producer Prices',
 'Reverse Repo Rate',
 'Steel Production',
 'Tourist Arrivals',
 'Corporate Tax Rate ',
 'Export Prices',
 'GDP per Capita PPP',
 'GDP',
 'Gross National Product',
 'Import Prices',
 'Military Expenditure']     

In [8]:
from functools import reduce

old_columns = df.schema.names
df = reduce(lambda data, idx: data.withColumnRenamed(old_columns[idx], new_columns[idx]), range(len(old_columns)), df)

In [9]:
cols = df.columns
cols.remove('company')
cols.remove('sector')

exprs = {x: "avg" for x in cols}
exprs['sector'] = 'first'
date_df = df.groupby(df.company, to_date(df.date).alias('day')).agg(exprs)

In [10]:
w = Window.partitionBy('company').orderBy("day")

date_df = date_df.withColumn('diffOpenClose', col("avg(open)") - col("avg(close)"))
date_df = date_df.withColumn('diffHighLow', col("avg(high)") - col("avg(low)"))
date_df = date_df.withColumn('target', when(lag(col("avg(close)")).over(w) < col("avg(close)"), 1).otherwise(0))

In [11]:
date_df.show()

22/11/26 06:35:23 WARN org.apache.spark.sql.catalyst.util.package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.

+--------+----------+----------------------+------------------+----------------------+-------------------+------------------+------------------+------------------+------------------+-------------------+------------------+--------------------+------------------+------------------+------------------+------------------+------------------+---------------------+------------------+---------------------+---------------------------+------------------+--------------------+-------------------+--------------------+------------------+------------------+-----------------------+------------------+------------------+--------------------+------------------+--------------------+--------------------+------------------+-------------------+-----------------------+------------------+-------------------+-------------------------+------------------+------------------+--------------------+--------------------+--------------------+------------+--------------------+------------------+------------------+--------

                                                                                

In [12]:
indexer1 = StringIndexer(inputCol="company", outputCol="companyIdx").setHandleInvalid("skip")
indexer2 = StringIndexer(inputCol="first(sector)", outputCol="sectorIdx").setHandleInvalid("skip")

inputs = [indexer1.getOutputCol(), indexer2.getOutputCol()]

encoder = OneHotEncoder(inputCols=inputs,  \
                                 outputCols=["companyVec", "sectorVec"])

#run it through a pipeline
pipeline = Pipeline(stages=[indexer1, indexer2, encoder])
date_df1 = pipeline.fit(date_df).transform(date_df)

date_df1.show(5)



+--------+----------+----------------------+------------------+----------------------+-------------------+------------------+------------------+------------------+------------------+------------------+------------------+--------------------+------------------+------------------+------------------+-----------------+-----------------+---------------------+------------------+---------------------+---------------------------+------------------+--------------------+-------------------+-------------------+------------------+------------------+-----------------------+------------------+------------------+--------------------+------------------+--------------------+--------------------+------------------+-------------------+-----------------------+------------------+-------------------+-------------------------+------------------+------------------+-------------------+--------------------+-------------------+------------+-------------------+------------------+------------------+---------------

                                                                                

In [13]:
date_df1 = date_df1.withColumn('day', unix_timestamp(date_df1.day))

In [14]:
cols = date_df1.columns
for col in ['company', 'first(sector)', 'target']:
    cols.remove(col)

cols.remove('companyIdx')
cols.remove('sectorIdx')
    
cols

['day',
 'avg(Reverse Repo Rate)',
 'avg(RSI14)',
 'avg(Car Registrations)',
 'avg(CCI10)',
 'avg(TRIMA10)',
 'avg(SAR)',
 'avg(fastdsr)',
 'avg(ADX20)',
 'avg(WILLR)',
 'avg(sma10)',
 'avg(macd1226)',
 'avg(sma20)',
 'avg(ema15)',
 'avg(ema10)',
 'avg(KAMA10)',
 'avg(TRIMA20)',
 'avg(Tourist Arrivals)',
 'avg(fastksr)',
 'avg(Steel Production)',
 'avg(Gross National Product)',
 'avg(TRIMA5)',
 'avg(macd1020)',
 'avg(CCI5)',
 'avg(MOM15)',
 'avg(Export Prices)',
 'avg(TYPPRICE)',
 'avg(GDP per Capita PPP)',
 'avg(HT_DCPERIOD)',
 'avg(ema5)',
 'avg(open)',
 'avg(Producer Prices)',
 'avg(PPO)',
 'avg(ADX10)',
 'avg(CCI15)',
 'avg(Fiscal Expenditure)',
 'avg(Import Prices)',
 'avg(ATR)',
 'avg(Crude Oil Production)',
 'avg(upperband)',
 'avg(low)',
 'avg(BETA)',
 'avg(ROC20)',
 'avg(macd520)',
 'avg(date)',
 'avg(MOM10)',
 'avg(fastk)',
 'avg(MFI)',
 'avg(Corporate Tax Rate )',
 'avg(APO)',
 'avg(sma5)',
 'avg(KAMA20)',
 'avg(RSI8)',
 'avg(macd510)',
 'avg(lowerband)',
 'avg(Military Expe

In [15]:
assembler = VectorAssembler(inputCols=cols, outputCol="features")

In [16]:
date_df1 = assembler.transform(date_df1)

In [17]:
train_df, test_df =  date_df1.randomSplit([0.7, 0.3],0.0)

In [18]:
train_df.select('features').show(5, truncate=False)

[Stage 32:>                                                         (0 + 1) / 1]

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

                                                                                

In [19]:
rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'target', numTrees=10)

In [22]:
rfModel = rf.fit(train_df)



In [24]:
rfModel.save('gs://{}//models//rfModel'.format(bucket_name))

                                                                                

In [26]:
predictions = rfModel.transform(test_df)

In [31]:
e = BinaryClassificationEvaluator(labelCol="target", rawPredictionCol="prediction")
print(e.evaluate(predictions, {e.metricName: "areaUnderROC"}))
print(e.evaluate(predictions, {e.metricName: "areaUnderPR"}))

                                                                                

0.7782533868094436


                                                                                

0.7544525732439485
