## Basic Spark Practise 

### Test Spark Setup

In [4]:
from pyspark.sql import SparkSession 

spark = SparkSession.builder \
    .appName("BasicPySparkExample") \
    .master("local[2]") \
    .config("spark.network.timeout", "300s") \
    .config("spark.executor.heartbeatInterval", "100s") \
    .config("spark.driver.host", "127.0.0.1") \
    .config("spark.driver.port", "6066") \
    .config("spark.dynamicAllocation.enabled", "false") \
    .getOrCreate()

data = ["Hello world", "Welcome to PySpark", "PySpark is fun", "Hello again"]
rdd = spark.sparkContext.parallelize(data)
words = rdd.flatMap(lambda line: line.split(" "))
word_pairs = words.map(lambda word: (word, 1))

word_counts = word_pairs.reduceByKey(lambda a, b: a + b)

print("Word Count Results:")
for word, count in word_counts.collect():
    print(f"{word}: {count}")

spark.stop()

Word Count Results:
world: 1
Welcome: 1
to: 1
PySpark: 2
fun: 1
again: 1
Hello: 2
is: 1


In [5]:
import os 
import pandas as pd
directory = os.getcwd()
print(directory)
NSE_data = pd.read_parquet(f"{directory}/NSE-Data-2024.parquet")
print(NSE_data.size)
print(NSE_data.dtypes)

C:\Learning\DE Projects\project-2-apache-spark\notebooks
12909234
TradDt                  object
BizDt                   object
Sgmt                    object
Src                     object
FinInstrmTp             object
FinInstrmId              int64
ISIN                    object
TckrSymb                object
SctySrs                 object
XpryDt                 float64
FininstrmActlXpryDt    float64
StrkPric               float64
OptnTp                 float64
FinInstrmNm             object
OpnPric                float64
HghPric                float64
LwPric                 float64
ClsPric                float64
LastPric               float64
PrvsClsgPric           float64
UndrlygPric            float64
SttlmPric              float64
OpnIntrst              float64
ChngInOpnIntrst        float64
TtlTradgVol              int64
TtlTrfVal              float64
TtlNbOfTxsExctd          int64
SsnId                   object
NewBrdLotQty             int64
Rmks                   float64
Rsvd

1) Calculated the volatity each day using ((high price - low price)/low price) * 100
2) The take the average value for each stock's average volatility
3) Perform (1) and (2) using python and using Apache spark

In [6]:
import timeit
import time

def calculateAvgVolatility(NSE_data):
    NSE_data["daily_volatility"] = ((NSE_data["HghPric"] - NSE_data["LwPric"])/ NSE_data["LwPric"])*100
    NSE_average_volatility = NSE_data.groupby("FinInstrmNm")["daily_volatility"].mean()
    volatility_df = NSE_average_volatility.reset_index()
    volatility_df.columns = ["StockName", "AverageVolatility"]
    return(volatility_df)
    
def time_decorator(func):
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()
        print(f"Execution Time: {end_time - start_time:.6f} seconds")
        return result
    return wrapper

@time_decorator
def test_function(data):
    return calculateAvgVolatility(data)

# Call the decorated function
python_result = test_function(NSE_data)


Execution Time: 0.065623 seconds


In [7]:
from pyspark.sql.functions import col, avg
def calculateAvgVolatilityWithSpark(NSE_data, spark):
    NSE_data_pyspark_df = spark.createDataFrame(NSE_data)
    
    # Calculate daily volatility as a new column
    df_with_volatility = NSE_data_pyspark_df.withColumn(
        "daily_volatility",
        ((col("HghPric") - col("LwPric")) / col("LwPric") * 100)
    )
    
    # Calculate the average daily volatility per stock
    average_volatility_df = df_with_volatility.groupBy("FinInstrmNm").agg(
        avg("daily_volatility").alias("average_volatility")
    )
    return(average_volatility_df.toPandas())

In [8]:
@time_decorator
def test_function(data,spark):
    return calculateAvgVolatilityWithSpark(data,spark)

# Call the decorated function
spark = SparkSession.builder \
    .appName("BasicPySparkExample") \
    .master("local[2]") \
    .config("spark.network.timeout", "300s") \
    .config("spark.executor.heartbeatInterval", "100s") \
    .config("spark.driver.host", "127.0.0.1") \
    .config("spark.driver.port", "6066") \
    .config("spark.dynamicAllocation.enabled", "false") \
    .getOrCreate()
    
result = test_function(NSE_data,spark)

Execution Time: 37.491835 seconds


In [9]:
print(result[result["FinInstrmNm"] == "APOLLO TYRES LTD"])
print(python_result[python_result["StockName"] == "APOLLO TYRES LTD"])
#result["FinInstrmNm"].unique().tolist()

         FinInstrmNm  average_volatility
89  APOLLO TYRES LTD            2.932801
            StockName  AverageVolatility
263  APOLLO TYRES LTD           2.932801


In [13]:
Filtered = NSE_data[NSE_data["FinInstrmNm"] == "APOLLO TYRES LTD"]
Filtered.to_csv("stock_data.csv", index=False)

For each stock perform below 
- Calculate monthly average price volatility - using HghPric and	LwPric 
- Calculate monthly average stock volumn using column - TtlTradgVol
- Aggregate the data based on sector and store the results
- For each sector calcualte which stock has highest volatility and for each month
- then across all secotr find which stock has highest volatility and for each month