**Libraries Installation**

In [None]:
!pip install kafka-python
!pip install confluent_kafka
!pip install pyspark
!pip install sweetviz



**Importing Libraries**

In [None]:
import os
from datetime import datetime
import time
import threading
import json

from kafka import KafkaProducer
from kafka.errors import KafkaError
import pandas as pd
import yfinance as yf
import sweetviz as sv
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.ml.clustering import KMeans
from pyspark.sql.functions import col
from datetime import date
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql import Row
from pyspark.ml.feature import VectorAssembler, StandardScaler
import matplotlib.pyplot as plt
import seaborn as sns


**Crawling and appending data to Dataframe from Yahoo finance**

In [None]:
import pandas as pd
import yfinance as yf

def get_stock_data(ticker, start_date, end_date):
    # Download historical data for the specified ticker symbol
    data = yf.download(ticker, start=start_date, end=end_date)
    data['Symbol'] = ticker
    return data

def get_multiple_stocks_data(tickers, start_date, end_date):
    # Create an empty DataFrame to store the combined data
    stock_data = pd.DataFrame()

    # Loop through each ticker and fetch historical data
    for ticker in tickers:
        print(ticker)
        combined_data = get_stock_data(ticker, start_date, end_date)
        stock_data = stock_data.append(combined_data)  # Use append (deprecated, should use pd.concat instead)

    return stock_data

# Specify the ticker symbol and date range
start_date = "2023-01-01"
end_date = "2024-01-01"

# Specify additional ticker symbols
all_tickers = ['AAPL','T','GOOG','MO','AA','AXP','BABA','ABT','AMAT','AMGN','AIG','ALL','ADBE','GOOGL','ACN','ABBV', 'MT','LLY','APA','ADP','AKAM','NLY','ADSK','ADM','WBA','PANW','AMD','AVGO','EA','AEM','APD','AMBA','NVS','LULU','ARRY','A','ORLY','AZO','AN','AZN','BUD','BDX','AB','AFL','ADI','ACIW','AMP','AMTD','AEO','NVO','ALTR','PAA','AAP','FNMA','UBS','ARLP','ATI','ADT','AVB','LH','AVY','AON','ADC','AYI','ASML','AMT','ACM','ARI','AR','AAN','BAH','ALB','AIZ','SAIC','CAR','AU','APH','AMX','JKHY','AMKR','AEIS','VRSK','APO','RBA','MAA','ASX','ARCO','ANET','AIR','WAB','RS','PKG','AMG']

# Get historical data for multiple stocks
stock_data = get_multiple_stocks_data(all_tickers, start_date, end_date)

# Resetting Index
stock_data = stock_data.reset_index()
stock_data['Date']


In [None]:
stock_data.head()

**Sweetviz report for data analysis**

In [None]:
#Convert 'Date' column to datetime type 10
stock_data['Date'] = pd.to_datetime(stock_data['Date'])
# Group by 'Date' and 'Stock' and pivot the DataFrame
grouped_data = stock_data.groupby(['Date', 'Symbol']).first().unstack()
# Flatten the multi-level column index
grouped_data.columns = ['_'.join(map(str, col)).strip() for col in grouped_data.columns.values]
# Reset index to make 'Date' a column again
grouped_data.reset_index(inplace=True)
# Display the resulting DataFrame
print(grouped_data)
advert_report = sv.analyze(grouped_data.filter(like = 'Close'))
advert_report.show_html('Stocks_raw_report.html')


**Transfering Pandas Data to pyspark DF and then to RDD**

In [None]:

spark = SparkSession.builder.appName("StockDataRDD").getOrCreate()
stock_data = stock_data.rename(columns={"Adj Close": "Adj_Close"})
stock_data_df = spark.createDataFrame(stock_data)
# Convert Python DataFrame to PySpark RDD
stock_data_rdd = stock_data_df.rdd
stock_data_df.show()


**Performing feature engineering using RDD map to form Daily_return and Cumulative_Return feature**


In [None]:

# Create a PySpark session
spark = SparkSession.builder.appName("StockDataRDD").getOrCreate()
#stock_data_rdd = spark.sparkContext.parallelize(data)
# Function to calculate returns for each stock
def calculate_returns(partition_index, iterator):
prev_close_price = {}
cumulative_return = {}
for row in iterator:
date, open, high, low, closing_price, adj, vol, symbol = row
# Initialize cumulative_return for the stock_symbol if not exists
cumulative_return.setdefault(symbol, {'value': 0.0})
# Initialize prev_close_price for the stock_symbol if not exists
prev_close_price.setdefault(symbol, {'value': None})
# Check if this is a new stock_symbol and reset prev_close_price
if prev_close_price[symbol]['value'] is None:
prev_close_price[symbol]['value'] = closing_price
daily_return = (closing_price / prev_close_price[symbol]['value'] - 1) if (prev_close_price[symbol]['value'] is not None and closing_price is not None) else None
cumulative_return[symbol]['value'] += daily_return if daily_return is not None else 0.0
yield Row(
Date=date,
Open=open,
High=high,
Low=low,
Close=closing_price,

Adj_Close=adj,
Volume=vol,
Symbol=symbol,
DailyReturn=daily_return,
CumulativeReturn=cumulative_return[symbol]['value']
)
prev_close_price[symbol]['value'] = closing_price

# Sort the RDD by Date and StockSymbol, then calculate returns using mapPartitionsWithIndex
sorted_rdd = stock_data_rdd.sortBy(lambda x: (x[7], x[0]))
indexed_rdd = sorted_rdd.mapPartitionsWithIndex(calculate_returns)
# Show the resulting RDD
indexed_rdd.collect()


**Transfering RDD data to Pyspark dataframe**

In [None]:
stock_data_with_returns_df = indexed_rdd.toDF()
stock_data_with_returns_df.show()

**Pivot table on Date with group by on Symbols**

In [None]:
pivot_df = (
stock_data_with_returns_df.groupBy("Symbol")
.pivot("Date")
.agg(F.first("DailyReturn").alias("DailyReturn"), F.first("CumulativeReturn").alias("CumulativeReturn"))
.orderBy("Symbol")
)
pivot_df.show()


**Analyzing appropriate number of clusters for applying K means**

In [None]:
feature_columns=pivot_df.columns
feature_columns=feature_columns[1:]
# Assemble the feature columns into a vector column
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
assembled_data = assembler.transform(pivot_df)
# Scale the features
scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withStd=True, withMean=True)
scaled_df = scaler.fit(assembled_data).transform(assembled_data)
# Run k-means clustering for a range of values of k
k_values = range(2, 11) # Adjust the range as needed
sse = [] # Sum of squared distances
for k in k_values:
kmeans = KMeans(k=k, seed=1)
model = kmeans.fit(assembled_data)
sse.append(model.summary.trainingCost)
# Plot the elbow curve
plt.plot(k_values, sse, marker='o')
plt.xlabel('Number of Clusters (k)')
plt.ylabel('Sum of Squared Distances (SSE)')
plt.title('Elbow Method for Optimal k')
plt.show()


**Applying K-mean cluster**

In [None]:
# Apply K-Means clustering
num_clusters = 6 # Adjust as needed
kmeans = KMeans(k=num_clusters, seed=42, featuresCol="scaled_features", predictionCol="cluster")
model = kmeans.fit(scaled_df)
result_df = model.transform(scaled_df)
# Display the results
result_df.select("Symbol", "cluster").show


**Converting to Pandas from pyspark**

In [None]:
pandas_df = result_df.toPandas()
def calculate_average(lst):
return sum(lst) / len(lst)
pandas_df['average_column'] = pandas_df['features'].apply(lambda x: calculate_average(x))


**Visual representation**

In [None]:
fig = plt.figure()
ax = fig.add_subplot(111)
scatter = ax.scatter(pandas_df['average_column'],pandas_df['Symbol'],c=pandas_df['cluster'],s=50)
ax.set_title('K-Means Clustering')
ax.set_xlabel('Avg_of_daily_and_cumulative_return')
ax.set_ylabel('Symbol')
plt.colorbar(scatter)


In [None]:
sns.catplot(data=pandas_df, x="cluster", y="Symbol")

**Setting up Kafka**

In [None]:
!curl -sSOL https://archive.apache.org/dist/kafka/3.3.1/kafka_2.13-3.3.1.tgz
!tar -xzf kafka_2.13-3.3.1.tgz

**Creating Kafka Topics**

In [None]:
!./kafka_2.13-3.3.1/bin/zookeeper-server-start.sh -daemon ./kafka_2.13-3.3.1/config/zookeeper.properties
!./kafka_2.13-3.3.1/bin/kafka-server-start.sh -daemon ./kafka_2.13-3.3.1/config/server.properties
!echo "Waiting for 10 secs until kafka and zookeeper services are up and running"
!sleep 10


**Creating Kafka Topics**

In [None]:
import os
for i in all_tickers:
command = str('./kafka_2.13-3.3.1/bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 1 --topic ') + i
print(command)
os.system(command)
os.system('sleep 2')


**Inserting Stock name and cluster number to Kafka**

In [None]:
new_data = {}
for ticker in all_tickers:
new_data[ticker] = (pandas_df.loc[(pandas_df['Symbol'] == ticker)][['Symbol','cluster']]).to_dict()
def error_callback(exc):
raise Exception('Error while sendig data to kafka: {0}'.format(str(exc)))
def write_to_kafka(topic_name, items):
count=0
producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'])
for key in items:
producer.send(topic_name, key=key.encode('utf-8')).add_errback(error_callback)
count+=1
producer.flush()
print("Wrote {0} messages into topic: {1}".format(count, topic_name))
for i in all_tickers:
write_to_kafka(i, new_data[i])
