In [1]:
!pip install kafka-python
!pip install pyspark
!pip install plotly bokeh ipywidgets
!pip3 install influxdb-client
!pip install yfinance

Defaulting to user installation because normal site-packages is not writeable
Defaulting to user installation because normal site-packages is not writeable
Defaulting to user installation because normal site-packages is not writeable
Defaulting to user installation because normal site-packages is not writeable
Defaulting to user installation because normal site-packages is not writeable


In [3]:
from kafka import KafkaProducer
import json
import datetime
import pandas as pd
import plotly.express as px
import yfinance as yf


producer = KafkaProducer(bootstrap_servers='localhost:9092',
                         value_serializer=lambda v: json.dumps(v).encode('utf-8'), api_version=(3, 8, 0))

# Function to fetch real-time stock data
def fetch_four_years_data(stock_symbol):
    # Get today's date and the first day of the year (YTD)
    today = datetime.date.today()
    four_years_ago = today - datetime.timedelta(days=4 * 365)
    
    # Download stock data using yfinance
    df = yf.download(stock_symbol, start=four_years_ago, end=today)
    df.reset_index(inplace=True)
    df = df[['Date', 'Open', 'High', 'Low', 'Close', 'Volume']]
    df.columns = ['date', 'open', 'high', 'low', 'close', 'volume']

    return df

# Function to produce data to Kafka topic
def produce_stock_data_to_kafka(df, stock_symbol):
    for _, row in df.iterrows():
        # Create a dictionary for each row
        stock_data = {
            'symbol': stock_symbol,
            'date': row['date'].strftime('%Y-%m-%d'),  # Convert datetime to string for JSON
            'open': row['open'],
            'high': row['high'],
            'low': row['low'],
            'close': row['close'],
            'volume': int(row['volume'])
        }
        # Send data to Kafka
        producer.send('stock_prices', stock_data)

    # Ensure all messages are sent before continuing
    producer.flush()

# Function to plot the YTD stock data
def plot_stock_data(df, stock_symbol):
    fig = px.line(df, x='date', y='close', title=f"4-Year Stock Prices for {stock_symbol} (Close Price)",
                  labels={'close': 'Price (USD)', 'date': 'Date'})
    fig.show()

# Example: Fetch and plot data for a stock
user_input = input("Enter stock symbol: ")  # e.g., 'AAPL'
stock_symbol = user_input.strip().upper()

# Fetch YTD data
stock_data = fetch_four_years_data(stock_symbol)

# Produce the fetched stock data to Kafka
produce_stock_data_to_kafka(stock_data, stock_symbol)

# Plot the YTD stock data
plot_stock_data(stock_data, stock_symbol)

[*********************100%***********************]  1 of 1 completed


In [4]:
from pyspark.sql import SparkSession
import pyspark
from pyspark.sql.functions import from_json, col, to_date
from pyspark.sql.types import StructType, StructField, StringType, FloatType, IntegerType, DateType, DoubleType

In [5]:
# Initializing Spark Session:
spark = SparkSession.builder.config("spark.driver.host", "localhost").appName("Stock_Prediction").config('spark.jars.packages', "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.3").getOrCreate()

24/11/24 22:53:11 WARN Utils: Your hostname, Zangetsu resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
24/11/24 22:53:11 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/karan-kumawat17/.local/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/karan-kumawat17/.ivy2/cache
The jars for the packages stored in: /home/karan-kumawat17/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-d9e99d5c-7927-442b-b0ca-0f0778029900;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.5.3 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.5.3 in central
	found org.apache.kafka#kafka-clients;3.4.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.5 in central
	found org.slf4j#slf4j-api;2.0.7 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commons-pool2;2.11.1 in central
:: resolution report :: resolve 731ms :: art

In [6]:
spark.sparkContext.setLogLevel("ERROR")

In [7]:
# Defining Kafka stream from starting:
df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "stock_prices").option("startingOffsets", "earliest").load()

In [8]:
schema = StructType([
    StructField("symbol", StringType(), True),
    StructField("date", StringType(), True),
    StructField("open", FloatType(), True),
    StructField("high", FloatType(), True),
    StructField("low", FloatType(), True),
    StructField("close", FloatType(), True),
    StructField("volume", DoubleType(), True),
])

In [9]:
stock_data_df = df.select(from_json(col("value").cast("string"), schema).alias("data")).select("data.*").withColumn("date", to_date(col("date"), "yyyy-MM-dd"))

In [10]:
def process_batch_stock_data(batch_df, batch_id):
    print(f"Batch ID: {batch_id}")

    symbols = batch_df.select("symbol").distinct().collect()
    for row in symbols:
        symbol = row["symbol"]
        print(f"\nShowing data for stock: {symbol}\n")

        batch_df.filter(col("symbol") == symbol).show(5, truncate=False)

In [11]:
import time

In [12]:
try:
    query = stock_data_df.writeStream.format("console").outputMode("append").foreachBatch(process_batch_stock_data).start()
    time.sleep(40)
finally:
    query.stop()

Batch ID: 0


                                                                                


Showing data for stock: AAPL

+------+----------+------+------+------+------+----------+
|symbol|date      |open  |high  |low   |close |volume    |
+------+----------+------+------+------+------+----------+
|AAPL  |2020-11-25|115.55|116.75|115.17|116.03|7.64992E7 |
|AAPL  |2020-11-27|116.57|117.49|116.22|116.59|4.66913E7 |
|AAPL  |2020-11-30|116.97|120.97|116.81|119.05|1.694102E8|
|AAPL  |2020-12-01|121.01|123.47|120.01|122.72|1.277282E8|
|AAPL  |2020-12-02|122.02|123.37|120.89|123.08|8.90042E7 |
+------+----------+------+------+------+------+----------+
only showing top 5 rows


Showing data for stock: GOOGL

+------+----------+-------+-------+-------+-------+--------+
|symbol|date      |open   |high   |low    |close  |volume  |
+------+----------+-------+-------+-------+-------+--------+
|GOOGL |2020-11-25|88.3905|88.519 |87.418 |88.2065|1.96E7  |
|GOOGL |2020-11-27|88.227 |89.8505|88.227 |89.351 |1.479E7 |
|GOOGL |2020-11-30|88.7825|89.017 |87.392 |87.72  |3.2418E7|
|GOOGL |2020-12

In [14]:
# Trying InfluxDB
import os
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS, WriteOptions

In [15]:
INFLUXDB_URL = "http://localhost:8086"
INFLUXDB_TOKEN = "2Jy-FrQ32jdIcJI811xcCx589hsI_XiIEpbI4KuYy8ub2JM2UvfgyMGjwxaNTEurWmFQUXohPyLtqWHPw_JpAg=="
INFLUXDB_ORG = "DA331"
INFLUXDB_BUCKET = "Stock_Data"

influx_client = InfluxDBClient(url=INFLUXDB_URL, token=INFLUXDB_TOKEN, org=INFLUXDB_ORG)
write_api = influx_client.write_api(write_options=WriteOptions(
    batch_size = 500,
    flush_interval=10000,
    jitter_interval=2000,
    retry_interval=5000,
    max_retries=5,
    max_retry_delay=5000,
    exponential_base=2
))

In [16]:
def process_batch_and_write_to_influx(batch_df, batch_id):
    batch_df_pd = batch_df.toPandas()

    points = []
    for _, row in batch_df_pd.iterrows():
        point = Point("stocks") \
            .tag("symbol", row["symbol"]) \
            .field("open", row["open"]) \
            .field("high", row["high"]) \
            .field("low", row["low"]) \
            .field("close", row["close"]) \
            .field("volume", row["volume"]) \
            .time(row["date"].strftime("%Y-%m-%d"))
        points.append(point)
    
    write_api.write(bucket=INFLUXDB_BUCKET, record=points)
    print(f"Batch {batch_id} written to InfluxDB.")

In [17]:
try:
    query = stock_data_df.writeStream.outputMode("append").foreachBatch(process_batch_and_write_to_influx).start()
    time.sleep(20)
finally:
    query.stop()

                                                                                

Batch 0 written to InfluxDB.
Processing batch ID: 0
Batch ID: 0

Showing data for stock: AAPL

+------+----------+------+------+------+------+----------+-------------------+----------+----------+----------+----------+----------+---------+----------+---------+------------+---------+------------+-------------+-------------+--------------+
|symbol|date      |open  |high  |low   |close |volume    |Timestamp          |SMA_20    |SMA_50    |SMA_100   |SMA_200   |EMA_20    |EMA_50   |EMA_100   |EMA_200  |daily_volume|log_close|daily_return|volatility_30|volatility_60|volatility_120|
+------+----------+------+------+------+------+----------+-------------------+----------+----------+----------+----------+----------+---------+----------+---------+------------+---------+------------+-------------+-------------+--------------+
|AAPL  |2020-11-25|115.55|116.75|115.17|116.03|7.64992E7 |2020-11-25 00:00:00|116.03    |116.03    |116.03    |116.03    |116.03    |116.03   |116.03    |116.03   |7.64992E7

In [18]:
stock_data_df.printSchema()

root
 |-- symbol: string (nullable = true)
 |-- date: date (nullable = true)
 |-- open: float (nullable = true)
 |-- high: float (nullable = true)
 |-- low: float (nullable = true)
 |-- close: float (nullable = true)
 |-- volume: double (nullable = true)



**SMA(Simple Moving Average)**

In [19]:
from pyspark.sql import Window
from pyspark.sql.functions import avg, window, to_timestamp, to_json, struct, sum, log, max, min, stddev, lag

In [20]:
def process_and_wirte_batch_stock_data(batch_df, batch_id):
    print(f"Processing batch ID: {batch_id}")

    batch_df = batch_df.withColumn("Timestamp", to_timestamp(col("date")))

    window_20 = Window.partitionBy("symbol").orderBy("Timestamp").rowsBetween(-19, 0)
    window_50 = Window.partitionBy("symbol").orderBy("Timestamp").rowsBetween(-49, 0)
    window_100 = Window.partitionBy("symbol").orderBy("Timestamp").rowsBetween(-99, 0)
    window_200 = Window.partitionBy("symbol").orderBy("Timestamp").rowsBetween(-199, 0)

    batch_df = batch_df.withColumn("SMA_20", avg("close").over(window_20)) \
                        .withColumn("SMA_50", avg("close").over(window_50)) \
                        .withColumn("SMA_100", avg("close").over(window_100)) \
                        .withColumn("SMA_200", avg("close").over(window_200))
    
    alpha_20, alpha_50, alpha_100, alpha_200 = 2 / 21, 2 / 51, 2 / 101, 2 / 201

    batch_df = batch_df.withColumn("EMA_50", col("close") * alpha_50 + col("SMA_50") * (1 - alpha_50))
    batch_df = batch_df.withColumn("EMA_20", col("close") * alpha_20 + col("SMA_20") * (1 - alpha_20))
    batch_df = batch_df.withColumn("EMA_100", col("close") * alpha_100 + col("SMA_100") * (1 - alpha_100))
    batch_df = batch_df.withColumn("EMA_200", col("close") * alpha_200 + col("SMA_200") * (1 - alpha_200))

    volume_window = Window.partitionBy("symbol").orderBy("Timestamp").rowsBetween(-1, 0)
    batch_df = batch_df.withColumn("daily_volume", sum("volume").over(volume_window))

    batch_df = batch_df.withColumn("log_close", log(col("close")))

    daily_return_window = Window.partitionBy("symbol").orderBy("Timestamp")
    batch_df = batch_df.withColumn("daily_return", col("log_close") - lag("log_close", 1).over(daily_return_window))

    volatility_window_30 = Window.partitionBy("symbol").orderBy("Timestamp").rowsBetween(-29, 0)
    volatility_window_60 = Window.partitionBy("symbol").orderBy("Timestamp").rowsBetween(-59, 0)
    volatility_window_120 = Window.partitionBy("symbol").orderBy("Timestamp").rowsBetween(-119, 0)

    batch_df = batch_df.withColumn("volatility_30", stddev("log_close").over(volatility_window_30)) \
                        .withColumn("volatility_60", stddev("log_close").over(volatility_window_60)) \
                        .withColumn("volatility_120", stddev("log_close").over(volatility_window_120))

    kafka_output_df = batch_df.select(to_json(struct(*batch_df.columns)).alias("value"))

    kafka_output_df.write.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("topic", "stock_prices_transformed").option("checkpointLocation", "file:///tmp/kafka_checkpoints2").save()


In [21]:
try:
    query = stock_data_df.writeStream.format("console").foreachBatch(process_and_wirte_batch_stock_data).start()
    time.sleep(40)
finally:
    query.stop()

                                                                                

In [22]:
# Defining Kafka stream from starting:
df_transformed = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "stock_prices_transformed").option("startingOffsets", "earliest").option("failOnDataLoss", "false").load()

In [23]:
schema_transformed = StructType([
    StructField("symbol", StringType(), True),
    StructField("date", StringType(), True),
    StructField("open", FloatType(), True),
    StructField("high", FloatType(), True),
    StructField("low", FloatType(), True),
    StructField("close", FloatType(), True),
    StructField("volume", DoubleType(), True),
    StructField("Timestamp", StringType(), True),
    StructField("SMA_20", FloatType(), True),
    StructField("SMA_50", FloatType(), True),
    StructField("SMA_100", FloatType(), True),
    StructField("SMA_200", FloatType(), True),
    StructField("EMA_20", FloatType(), True),
    StructField("EMA_50", FloatType(), True),
    StructField("EMA_100", FloatType(), True),
    StructField("EMA_200", FloatType(), True),
    StructField("daily_volume", FloatType(), True),
    StructField("log_close", FloatType(), True),
    StructField("daily_return", FloatType(), True),
    StructField("volatility_30", FloatType(), True),
    StructField("volatility_60", FloatType(), True),
    StructField("volatility_120", FloatType(), True),
])

In [24]:
stock_data_df_transformed = df_transformed.select(from_json(col("value").cast("string"), schema_transformed).alias("data")).select("data.*").withColumn("date", to_date(col("date"), "yyyy-MM-dd")).withColumn("Timestamp", to_timestamp(col("date")))

In [25]:
try:
    query = stock_data_df_transformed.writeStream.format("console").outputMode("append").foreachBatch(process_batch_stock_data).start()
    time.sleep(15)
finally:
    query.stop()

                                                                                

In [26]:
def get_sp500():
    sp500 = yf.download('^GSPC', start=datetime.date.today() - datetime.timedelta(days=4 * 365), end=datetime.date.today())
    sp500.reset_index(inplace=True)
    sp500 = sp500[['Date', 'Open', 'High', 'Low', 'Close', 'Volume']]
    sp500.columns = ['date', 'open', 'high', 'low', 'close', 'volume']

    return sp500

In [27]:
sp500_data = get_sp500()

[*********************100%***********************]  1 of 1 completed


In [28]:
sp500_data.columns

Index(['date', 'open', 'high', 'low', 'close', 'volume'], dtype='object')

In [29]:
def produce_sp500(df):
    for _, row in df.iterrows():
        data = row.to_dict()
        data['date'] = data['date'].strftime('%Y-%m-%d')
        producer.send("SP_500", value=data)
    producer.flush()

In [30]:
produce_sp500(sp500_data)

In [31]:
# Defining Kafka stream from starting:
sp500_stream_df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "SP_500").option("startingOffsets", "earliest").load()

In [32]:
schema_sp500 = StructType([
    StructField("date", StringType(), True),
    StructField("open", FloatType(), True),
    StructField("high", FloatType(), True),
    StructField("low", FloatType(), True),
    StructField("close", FloatType(), True),
    StructField("volume", DoubleType(), True),
])

In [33]:
sp500_data_df = sp500_stream_df.select(from_json(col("value").cast("string"), schema_sp500).alias("data")).select("data.*").withColumn("date", to_date(col("date"), "yyyy-MM-dd"))

In [34]:
try:
    query = sp500_data_df.writeStream.format("console").outputMode("append").option("truncate", "false").start()
    time.sleep(30)
finally:
    query.stop()

                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+----------+-------+-------+-------+-------+---------+
|date      |open   |high   |low    |close  |volume   |
+----------+-------+-------+-------+-------+---------+
|2020-11-25|3635.5 |3635.5 |3617.76|3629.65|4.91044E9|
|2020-11-27|3638.55|3644.31|3629.33|3638.35|2.77839E9|
|2020-11-30|3634.18|3634.18|3594.39|3621.63|6.30841E9|
|2020-12-01|3645.87|3678.45|3645.87|3662.45|5.41848E9|
|2020-12-02|3653.78|3670.96|3644.84|3669.01|5.04125E9|
|2020-12-03|3668.28|3682.73|3657.17|3666.72|5.06534E9|
|2020-12-04|3670.94|3699.2 |3670.94|3699.12|5.09962E9|
|2020-12-07|3694.73|3697.41|3678.88|3691.96|4.8045E9 |
|2020-12-08|3683.05|3708.45|3678.83|3702.25|4.58439E9|
|2020-12-09|3705.98|3712.39|3660.54|3672.82|5.2328E9 |
|2020-12-10|3659.13|3678.49|3645.18|3668.1 |4.65848E9|
|2020-12-11|3656.08|3665.91|3633.4 |3663.46|4.37547E9|
|2020-12-14|3675.27|3697.61|3645.84|3647.49|4.62385E9|
|2020-12-15|3666.41|369

In [35]:
sp500_data_df.printSchema()

root
 |-- date: date (nullable = true)
 |-- open: float (nullable = true)
 |-- high: float (nullable = true)
 |-- low: float (nullable = true)
 |-- close: float (nullable = true)
 |-- volume: double (nullable = true)



In [36]:
def load_and_preprocess(batch_df):
    batch_df = batch_df.withColumn("Timestamp", to_timestamp(col("date")))
    
    batch_df = batch_df.withColumn("sp500_log_close", log(col("close")))

    window_spec = Window.orderBy("Timestamp")

    batch_df = batch_df.withColumn("sp500_prev_log_close", lag("sp500_log_close", 1).over(window_spec))

    batch_df = batch_df.withColumn("sp500_daily_return", col("sp500_log_close") - col("sp500_prev_log_close"))

    return batch_df

In [37]:
from pyspark.sql.functions import to_json, struct, from_json

In [38]:
processed_df = load_and_preprocess(sp500_data_df)

In [39]:
def write_back_to_new_topic(df, topic_name):
    kafka_output_df = df.select(to_json(struct(*df.columns)).alias("value"))
    kafka_output_df.write.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("topic", topic_name).option("checkpointLocation", "file:///tmp/kafka_checkpoints").save()
    

In [40]:
def process_batch_sp500(batch_df, batch_id):
    print(f"Processing batch ID: {batch_id}")

    processed_df = load_and_preprocess(batch_df)
    processed_df.show(truncate=False)
    write_back_to_new_topic(processed_df, "SP_500_transformed")

In [41]:
try:
    query = sp500_data_df.writeStream.foreachBatch(process_batch_sp500).trigger(once=True).start()
    time.sleep(30)
finally:
    query.stop()

                                                                                

In [42]:
# Defining Kafka stream from starting:
sp500_transformed_stream_df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "SP_500_transformed").option("startingOffsets", "earliest").option("failOnDataLoss", "false").load()

In [43]:
schema_sp500_transformed = StructType([
    StructField("date", StringType(), True),
    StructField("open", FloatType(), True),
    StructField("high", FloatType(), True),
    StructField("low", FloatType(), True),
    StructField("close", FloatType(), True),
    StructField("volume", DoubleType(), True),
    StructField("Timestamp", StringType(), True),
    StructField("sp500_log_close", FloatType(), True),
    StructField("sp500_prev_log_close", FloatType(), True),
    StructField("sp500_daily_return", FloatType(), True),
])

In [44]:
sp500_transformed_data_df = sp500_transformed_stream_df.select(from_json(col("value").cast("string"), schema_sp500_transformed).alias("data")).select("data.*").withColumn("date", to_date(col("date"), "yyyy-MM-dd")).withColumn("Timestamp", to_timestamp(col("date")))

In [45]:
try:
    query = sp500_transformed_data_df.writeStream.format("console").outputMode("append").option("truncate", "false").start()
    time.sleep(15)
finally:
    query.stop()

                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+----------+-------+-------+-------+-------+---------+-------------------+---------------+--------------------+------------------+
|date      |open   |high   |low    |close  |volume   |Timestamp          |sp500_log_close|sp500_prev_log_close|sp500_daily_return|
+----------+-------+-------+-------+-------+---------+-------------------+---------------+--------------------+------------------+
|2020-11-25|3635.5 |3635.5 |3617.76|3629.65|4.91044E9|2020-11-25 00:00:00|8.196892       |NULL                |NULL              |
|2020-11-27|3638.55|3644.31|3629.33|3638.35|2.77839E9|2020-11-27 00:00:00|8.1992855      |8.196892            |0.0023941111      |
|2020-11-30|3634.18|3634.18|3594.39|3621.63|6.30841E9|2020-11-30 00:00:00|8.194679       |8.1992855           |-0.004606141      |
|2020-12-01|3645.87|3678.45|3645.87|3662.45|5.41848E9|2020-12-01 00:00:00|8.205888       |8.194679            |0.0112

In [46]:
import os
import pandas as pd

In [47]:
try:
    query = sp500_transformed_data_df.writeStream.format("parquet").option("path", "file:///tmp/sp500_data").option("checkpointLocation", "file:///tmp/sp500_checkpoint").trigger(once=True).start()
    time.sleep(30)
finally:
    query.stop()

                                                                                

In [48]:
sp500_data_static = spark.read.parquet("file:///tmp/sp500_data").select("date", "sp500_daily_return").toPandas()

In [49]:
sp500_data_static.head()

Unnamed: 0,date,sp500_daily_return
0,2020-11-25,
1,2020-11-27,0.002394
2,2020-11-30,-0.004606
3,2020-12-01,0.011208
4,2020-12-02,0.00179


In [50]:
try:
    query = stock_data_df_transformed.writeStream.format("parquet").option("path", "file:///tmp/stock_data_df").option("checkpointLocation", "file:///tmp/stock_checkpoint").trigger(once=True).start()
    time.sleep(30)
finally:
    query.stop()

                                                                                

In [51]:
stock_static_df = spark.read.parquet("file:///tmp/stock_data_df").select("symbol", "date", "daily_return").toPandas()

In [52]:
stock_static_df.head()

Unnamed: 0,symbol,date,daily_return
0,AAPL,2020-11-25,
1,GOOGL,2020-11-25,
2,GOOGL,2020-11-27,0.012892
3,GOOGL,2020-11-30,-0.018422
4,GOOGL,2020-12-01,0.023079


In [53]:
def find_corr_by_symbol(stock_df, sp500_df):
    symbols = stock_df["symbol"].unique().tolist()

    correlations = []
    for symbol in symbols:
        symbol_data = stock_df[stock_df["symbol"] == symbol]
        merged_df = symbol_data.merge(sp500_df, on="date", how="inner")

        if len(merged_df) > 1:
            corr_value = merged_df["daily_return"].corr(merged_df["sp500_daily_return"])
            print(f"Correlation for {symbol} with S&P 500 is: {corr_value}")
            correlations.append((symbol, corr_value))
    
    corr_df = pd.DataFrame(correlations, columns=["Symbol", "Correlation"])
    corr_df.to_csv("/home/karan-kumawat17/Downloads/correlation_results.csv", mode="a", index=False, header=True)
    

In [54]:
find_corr_by_symbol(stock_static_df, sp500_data_static)

Correlation for AAPL with S&P 500 is: 0.7583874687836138
Correlation for GOOGL with S&P 500 is: 0.7145523187480187


In [55]:
import plotly.express as px

In [56]:
def data_for_plottings():
    df_pandas = spark.read.parquet("file:///tmp/stock_data_df").select("symbol", "date", "close", "SMA_20", "SMA_50", "SMA_100", "SMA_200", "EMA_20", "EMA_50", "EMA_100", "EMA_200", 
                          "volume", "daily_return", "volatility_30", "volatility_60", "volatility_120").toPandas()
    df_pandas['date'] = pd.to_datetime(df_pandas['date'])
    return df_pandas

In [57]:
df_pandas = data_for_plottings()

In [58]:
def SMA_plot(df):
    unique_symbols = df["symbol"].unique()
    
    for symbol in unique_symbols:
        df_symbol = df[df["symbol"] == symbol]
        fig = px.line(df_symbol, x="date", y=["close", "SMA_20", "SMA_50", "SMA_100", "SMA_200"],
              labels={"value": "Price ($)", "date": "Date"},
              title=f"Closing Price with SMAs for {symbol}")
        fig.update_layout(yaxis_title="Price ($)", xaxis_title="Date", xaxis_rangeslider_visible=True)
        fig.show() 

In [59]:
SMA_plot(df_pandas)

In [60]:
def EMA_plot(df):
    unique_symbols = df["symbol"].unique()
    
    for symbol in unique_symbols:
        df_symbol = df[df["symbol"] == symbol]
        fig = px.line(df_symbol, x="date", y=["close", "EMA_20", "EMA_50", "EMA_100", "EMA_200"],
                labels={"value": "Price ($)", "date": "Date"},
                title=f"Closing Price with EMAs for {symbol}")
        fig.update_layout(yaxis_title="Price ($)", xaxis_title="Date", xaxis_rangeslider_visible=True)
        fig.show()

In [61]:
EMA_plot(df_pandas)

In [62]:
def trading_volume_plot(df):
    unique_symbols = df["symbol"].unique()
    
    for symbol in unique_symbols:
        df_symbol = df[df["symbol"] == symbol]
        fig = px.bar(df_symbol, x='date', y='volume', 
                title=f"Trading Volume Over Time for {symbol}",
                labels={'date': 'Date', 'volume': 'Volume'})  # Optional: use a dark theme

        # Customize layout for better readability
        fig.update_layout(
            xaxis_title="Date",
            yaxis_title="Trading Volume",
            xaxis_rangeslider_visible=True  # Add a range slider for easy zooming
        )

        fig.show()

In [63]:
trading_volume_plot(df_pandas)

* High Volume: Indicates strong investor interest and can confirm price movements.
* Low Volume: Suggests weaker interest and may not confirm the trend.

In [64]:
def stock_daily_return(df):
    unique_symbols = df["symbol"].unique()
    
    for symbol in unique_symbols:
        df_symbol = df[df["symbol"] == symbol]
        fig = px.line(df_symbol, x="date", y="daily_return",
                labels={"value": "Daily return ($)", "date": "Date"},
                title=f"Daily returns of {symbol}")
        fig.update_layout(yaxis_title="Daily return ($)", xaxis_title="Date", xaxis_rangeslider_visible=True)
        fig.show()

In [65]:
stock_daily_return(df_pandas)

In [66]:
def daily_returns_distribution(df):
    unique_symbols = df["symbol"].unique()
    
    for symbol in unique_symbols:
        df_symbol = df[df["symbol"] == symbol]
        fig = px.histogram(df_symbol, x='daily_return', 
                    title=f"{symbol}'s Distribution of Daily Returns",
                    labels={'daily_return': 'Daily Returns'},
                    nbins=100,  # Adjust the number of bins for better granularity
                    template='plotly_dark')  # Optional: use a dark theme

        # Customize layout for better readability
        fig.update_layout(
            xaxis_title="Daily Returns",
            yaxis_title="Frequency",
            bargap=0.1  # Adjust the gap between bars
        )

        fig.show()

In [67]:
daily_returns_distribution(df_pandas)

* Normal Distribution: Many stocks exhibit returns that roughly follow a normal distribution, but with fat tails.
* Skewness and Kurtosis: Indicate asymmetry and the presence of outliers.

In [68]:
def rolling_volatility(df):
    unique_symbols = df["symbol"].unique()
    
    for symbol in unique_symbols:
        df_symbol = df[df["symbol"] == symbol]
        fig = px.line(df_symbol, x="date", y=["volatility_30", "volatility_60", "volatility_120"],
                labels={"value": "Volatility ($)", "date": "Date"},
                title=f"{symbol}'s Rolling Volatility")
        fig.update_layout(yaxis_title="Volatility ($)", xaxis_title="Date")
        fig.show()

In [69]:
rolling_volatility(df_pandas)

In [70]:
def corr_plot(stock_df, sp500_df):
    symbols = stock_df["symbol"].unique().tolist()

    for symbol in symbols:
        symbol_data = stock_df[stock_df["symbol"] == symbol]
        merged_df = symbol_data.merge(sp500_df, on="date", how="inner")

        fig = px.scatter(merged_df, x='sp500_daily_return', y='daily_return', 
                    title=f"Scatter Plot of {symbol}'s Daily Returns vs. S&P 500 Daily Returns",
                    labels={'sp500_daily_return': 'S&P 500 Daily Returns', 'daily_return': f"{symbol}'s Daily Returns"},
                    template='plotly_dark')  # Optional: use a dark theme

        # Customize layout for better readability
        fig.update_layout(
            xaxis_title="S&P 500 Daily Returns",
            yaxis_title="Stock Daily Returns"
        )

        fig.show()

In [71]:
corr_plot(stock_static_df, sp500_data_static)

** AR **

In [72]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
import numpy as np

In [73]:
def prepare_ar_features(df, target_col='close', p=3):
    window_spec = Window.partitionBy("symbol").orderBy("Timestamp")
    for i in range(1, p+1):
        df = df.withColumn(f"{target_col}_lag_{i}", lag(target_col, i).over(window_spec))
    df = df.na.drop(subset=[f"{target_col}_lag_{i}" for i in range(1, p+1)])
    return df

In [74]:
def vectorize_features(df, target_col='close', p=3):
    lag_cols = [f"{target_col}_lag_{i}" for i in range(1, p+1)]
    assembler = VectorAssembler(inputCols=lag_cols, outputCol="features")
    df = assembler.transform(df)
    return df

In [75]:
def train_AR_model(df, target_col='close'):
    lr = LinearRegression(featuresCol="features", labelCol=target_col, predictionCol="prediction")
    model = lr.fit(df)
    return model

In [76]:
def tune_ar_model(df, target_col='close', p_values=[1,2,3,4,5]):
    best_p = None
    best_mse = float("inf")
    best_model = None

    evaluator = RegressionEvaluator(labelCol=target_col, predictionCol="prediction", metricName="mse")

    for p in p_values:
        print(f"Training AR model with p={p}")

        df_p = prepare_ar_features(df, target_col, p)
        df_p = vectorize_features(df_p, target_col, p)

        model = train_AR_model(df_p, target_col)

        predictions = model.transform(df_p)
        mse = evaluator.evaluate(predictions)
        print(f"MSE for p={p}: {mse}")

        if mse < best_mse:
            best_mse = mse
            best_p = p
            best_model = model
        
    print(f"Best model found for p={best_p} with MSE={best_mse}")
    return best_model, best_p, best_mse

In [77]:
historical_stock_data = spark.read.parquet("file:///tmp/stock_data_df")

In [78]:
symbols = [row["symbol"] for row in historical_stock_data.select("symbol").distinct().collect()]
symbol_models = {}

for symbol in symbols:
    stock = historical_stock_data.filter(historical_stock_data['symbol'] == symbol)
    p_values_range = np.arange(1, 11)
    best_model, best_p, best_mse = tune_ar_model(stock, target_col='close', p_values=p_values_range)

    symbol_models[symbol] = {"model": best_model, "best_p": best_p, "best_mse": best_mse}
    print(f"Trained AR model for {symbol} with best_p = {best_p} and MSE = {best_mse}")

Training AR model with p=1


                                                                                

MSE for p=1: 7.375117447646528
Training AR model with p=2
MSE for p=2: 7.380141096182102
Training AR model with p=3
MSE for p=3: 7.366672397642692
Training AR model with p=4
MSE for p=4: 7.357578723424759
Training AR model with p=5
MSE for p=5: 7.362688755265936
Training AR model with p=6
MSE for p=6: 7.370016414783699
Training AR model with p=7
MSE for p=7: 7.3764384345596286
Training AR model with p=8
MSE for p=8: 7.380881170002731
Training AR model with p=9
MSE for p=9: 7.383629521969338
Training AR model with p=10
MSE for p=10: 7.3789555659478205
Best model found for p=4 with MSE=7.357578723424759
Trained AR model for AAPL with best_p = 4 and MSE = 7.357578723424759
Training AR model with p=1
MSE for p=1: 5.7394228597616435
Training AR model with p=2
MSE for p=2: 5.7443756673667155
Training AR model with p=3
MSE for p=3: 5.74026107610786
Training AR model with p=4
MSE for p=4: 5.720525495806652
Training AR model with p=5
MSE for p=5: 5.72412791862783
Training AR model with p=6
MSE 

In [79]:
def forecast_ar_model(symbol_data, model, best_p, target_col, forecast_period):
    last_p_values = symbol_data.select(target_col).orderBy(col("Timestamp").desc()).limit(best_p).toPandas()
    if last_p_values.empty or len(last_p_values) < best_p:
        print(f"Not enough data for symbol. Neede {best_p}, got {len(last_p_values)}.")
        return None
    
    last_p_values = last_p_values[target_col].tolist()
    forecast_values = []

    assembler = VectorAssembler(inputCols=[f"lag_{i}" for i in range(1, best_p+1)], outputCol="features")

    for _ in range(forecast_period):
        df_forecast = pd.DataFrame([last_p_values], columns=[f"lag_{i}" for i in range(1, best_p+1)])
        spark_forecast_df = spark.createDataFrame(df_forecast)
        spark_forecast_df = assembler.transform(spark_forecast_df)

        next_prediction = model.transform(spark_forecast_df).select("prediction").collect()[0][0]

        forecast_values.append(next_prediction)
        last_p_values = last_p_values[1:] + [next_prediction]

    return forecast_values

In [80]:
def add_forecasts_to_data(historical_data, forecast_results, forecast_period=28):
    historical_data_pd = historical_data.toPandas()
    historical_data_pd["date"] = pd.to_datetime(historical_data_pd["date"])

    all_forecasts = []

    for symbol, forecasts in forecast_results.items():
        df_symbol = historical_data_pd[historical_data_pd["symbol"] == symbol].copy()
        last_date = df_symbol["date"].max()

        forecast_dates = [last_date + datetime.timedelta(days = i) for i in range(1, forecast_period+1)]

        forecast_df = pd.DataFrame({
            "date": forecast_dates,
            "symbol": symbol,
            "best_AR_predicted": forecasts
        })

        df_symbol = pd.concat([df_symbol, forecast_df], ignore_index=True)
        all_forecasts.append(df_symbol)

    combined_data = pd.concat(all_forecasts, ignore_index=True)
    return combined_data

In [81]:
forecasts_results_AR = {}

for symbol, best_model in symbol_models.items():
    symbol_data = historical_stock_data.filter(historical_stock_data['symbol'] == symbol)
    best_model = symbol_models[symbol]["model"]
    best_p = symbol_models[symbol]["best_p"]
    forecast_values = forecast_ar_model(symbol_data, best_model, best_p, target_col="close", forecast_period=28)

    if forecast_values:
        forecasts_results_AR[symbol] = forecast_values
        print(f"Forecast for {symbol}: {forecast_values}")

                                                                                

Forecast for AAPL: [229.82668028388102, 228.45497865583906, 228.98254622395416, 228.14248874618212, 229.78294168633548, 228.38621848705995, 228.9707193213444, 228.0042347504865, 229.73910434655144, 228.31330971919385, 228.9645263501255, 227.86529254255646, 229.69552463647253, 228.23584308184473, 228.96396786659886, 227.7256878975283, 229.65258946153747, 228.15341470692854, 228.96904048358886, 227.5854143567305, 229.61071539030152, 228.0656265336919, 228.97973959302695, 227.44443107381133, 229.57034774930668, 227.97208648130223, 228.99606224175605, 227.3026607681554]
Forecast for GOOGL: [165.11172064225167, 166.81489904053524, 175.52004312801546, 178.5761727828187, 165.52999844177683, 166.03290775256235, 174.9868306198942, 179.00458730831113, 166.01368419435622, 165.2911492337851, 174.3823627503053, 179.3976344837284, 166.5608723959242, 164.59673039573752, 173.70938329338196, 179.7477872518747, 167.16888438741702, 163.9566718580355, 172.97138493689363, 180.04767019652135, 167.8342639299

In [82]:
combined_df = add_forecasts_to_data(historical_stock_data, forecasts_results_AR, forecast_period=28)

In [83]:
def plot_forecasted_data(df):
    unique_symbols = df['symbol'].unique()

    for symbol in unique_symbols:
        df_symbol = df[df["symbol"] == symbol]
        fig = px.line(df_symbol, x='date', y=['close', 'best_AR_predicted'], 
                    title=f"Closing Price and AR Forecast for {symbol}",
                    labels={'value': 'Price ($)', 'date': "Date"})  # Optional: use a dark theme

        # Customize layout for better readability
        fig.update_layout(
            xaxis_title="Date",
            yaxis_title="Price ($)",
            xaxis_rangeslider_visible=True
        )

        fig.show()

In [84]:
plot_forecasted_data(combined_df)

In [85]:
!pip install prophet

Defaulting to user installation because normal site-packages is not writeable


In [86]:
from prophet import Prophet
from prophet.plot import plot_plotly
import plotly.offline as py
import plotly.graph_objects as go

def prepare_data_for_prophets(stock_data_df, symbol):
    symbol_data = stock_data_df.filter(col("symbol") == symbol)
    prophet_df = symbol_data.select(
        col("date").alias("ds"),
        col("close").alias("y")
    ).toPandas()
    return prophet_df

In [87]:
def forecast_with_prophet(prophet_df, forecast_period=28):
    model = Prophet(daily_seasonality=True, yearly_seasonality=True)
    model.fit(prophet_df)
    future = model.make_future_dataframe(periods=forecast_period)
    forecast = model.predict(future)
    return model, forecast

In [88]:
def create_forecast_plots(model, forecast, prophet_df, symbol, forecast_period=28):
    fig = plot_plotly(model, forecast, trend=True)

    fig.add_trace(
        go.Scatter(
            x = prophet_df['ds'],
            y = prophet_df['y'],
            mode = 'lines+markers',
            name = "Actual Close Price"
        )
    )

    fig.add_trace(
        go.Scatter(
            x = forecast['ds'],
            y = forecast['yhat'],
            mode = 'lines',
            line = dict(color="royalblue", width=2),
            name = "Forecast"
        )
    )

    fig.add_trace(
        go.Scatter(
            x = forecast['ds'],
            y = forecast['yhat_upper'],
            fill = None,
            mode = 'lines',
            line = dict(color="lightblue", width=0.5),
            name = "Upper Confidence Interval"
        )
    )

    fig.add_trace(
        go.Scatter(
            x = forecast['ds'],
            y = forecast['yhat_lower'],
            fill = 'tonexty',
            mode = 'lines',
            line = dict(color="lightblue", width=0.5),
            name = "Lower Confidence Interval"
        )
    )

    fig.update_layout(
        title = f"Stock forecast for {symbol} - {forecast_period} Days",
        xaxis_title = "Date",
        yaxis_title = "Price",
        legend_title = "Legend",
        template = "plotly_white",
        annotations = [
            go.layout.Annotation(
                text = f"Forecasting {forecast_period} Days Ahead for {symbol}",
                xref="paper",
                yref="paper",
                x=0.5,
                y=-0.15,
                showarrow=False,
                font=dict(size=12)
            )
        ]
    )

    py.iplot(fig)

In [89]:
for symbol in symbols:
    prophet_df = prepare_data_for_prophets(historical_stock_data, symbol)

    model, forecast = forecast_with_prophet(prophet_df, forecast_period=28)

    create_forecast_plots(model, forecast, prophet_df, symbol, forecast_period=28)

DEBUG:cmdstanpy:input tempfile: /tmp/tmpvq8slirp/62hwnfwb.json
DEBUG:cmdstanpy:input tempfile: /tmp/tmpvq8slirp/3sv2ok7b.json
DEBUG:cmdstanpy:idx 0
DEBUG:cmdstanpy:running CmdStan, num_threads: None
DEBUG:cmdstanpy:CmdStan args: ['/home/karan-kumawat17/.local/lib/python3.10/site-packages/prophet/stan_model/prophet_model.bin', 'random', 'seed=92755', 'data', 'file=/tmp/tmpvq8slirp/62hwnfwb.json', 'init=/tmp/tmpvq8slirp/3sv2ok7b.json', 'output', 'file=/tmp/tmpvq8slirp/prophet_model35em1che/prophet_model-20241124232256.csv', 'method=optimize', 'algorithm=lbfgs', 'iter=10000']
23:22:56 - cmdstanpy - INFO - Chain [1] start processing
INFO:cmdstanpy:Chain [1] start processing
23:22:57 - cmdstanpy - INFO - Chain [1] done processing
INFO:cmdstanpy:Chain [1] done processing


DEBUG:cmdstanpy:input tempfile: /tmp/tmpvq8slirp/fojb2uuh.json
DEBUG:cmdstanpy:input tempfile: /tmp/tmpvq8slirp/0eu0_te1.json
DEBUG:cmdstanpy:idx 0
DEBUG:cmdstanpy:running CmdStan, num_threads: None
DEBUG:cmdstanpy:CmdStan args: ['/home/karan-kumawat17/.local/lib/python3.10/site-packages/prophet/stan_model/prophet_model.bin', 'random', 'seed=19981', 'data', 'file=/tmp/tmpvq8slirp/fojb2uuh.json', 'init=/tmp/tmpvq8slirp/0eu0_te1.json', 'output', 'file=/tmp/tmpvq8slirp/prophet_modelcdcyij9s/prophet_model-20241124232257.csv', 'method=optimize', 'algorithm=lbfgs', 'iter=10000']
23:22:57 - cmdstanpy - INFO - Chain [1] start processing
INFO:cmdstanpy:Chain [1] start processing
23:22:58 - cmdstanpy - INFO - Chain [1] done processing
INFO:cmdstanpy:Chain [1] done processing
