In [1]:
# Import necessary libraries
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, to_date, col, mean as F_mean, last
import pandas as pd
import os
import re
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Initialize Spark session
spark = SparkSession.builder.appName("StockPriceAndTweetProcessor").getOrCreate()

# Define paths for stock price and tweet data
stockprice_path = "/Users/moatasimfarooque/Desktop/CATIA/Latest/stockprice"  # Replace with actual path
stocktweet_path = "/Users/moatasimfarooque/Desktop/CATIA/Latest/stocktweet/stocktweet.csv"  # Replace with actual path
output_path = "/Users/moatasimfarooque/Desktop/CATIA/Latest/processed_stocks/"

# Ensure the output directory exists
os.makedirs(output_path, exist_ok=True)

# List of desired stock Tickers
Tickers = ["TSLA", "AAPL", "BA", "DIS", "AMZN"]



24/11/07 15:10:32 WARN Utils: Your hostname, Moatasims-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.2.1 instead (on interface bridge0)
24/11/07 15:10:32 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/07 15:10:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


24/11/07 15:10:50 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [2]:
# Step 1: Load and process stock price data
stock_dfs = []
for Ticker in Tickers:
    filepath = os.path.join(stockprice_path, f"{Ticker}.csv")
    if os.path.exists(filepath):
        df = spark.read.csv(filepath, header=True, inferSchema=True)
        df = df.withColumn("Ticker", lit(Ticker))
        stock_dfs.append(df)

# Combine all stock data into a single DataFrame
if stock_dfs:
    stockprice_df = stock_dfs[0]
    for df in stock_dfs[1:]:
        stockprice_df = stockprice_df.union(df)
else:
    raise ValueError("No stock price data found for selected Tickers.")

# Standardize Date format and select columns
stockprice_df = stockprice_df.withColumn("Date", to_date(col("Date"), "yyyy-MM-dd"))
stockprice_df = stockprice_df.select("Date", "Close", "Volume", "Ticker")

# Fill missing stock prices
window_spec = (
    Window.partitionBy("Ticker")
    .orderBy("Date")
    .rowsBetween(Window.unboundedPreceding, 0)
)
stockprice_df = stockprice_df.withColumn(
    "Close", last("Close", ignorenulls=True).over(window_spec)
)
stockprice_df = stockprice_df.withColumn(
    "Volume", last("Volume", ignorenulls=True).over(window_spec)
)
median_price = stockprice_df.approxQuantile("Close", [0.5], 0.01)[0]  # Median estimate
stockprice_df = stockprice_df.na.fill({"Close": median_price, "Volume": 0})



In [3]:
# Step 2: Create a continuous daily date range
min_date = stockprice_df.agg(F.min("Date")).collect()[0][0]
max_date = stockprice_df.agg(F.max("Date")).collect()[0][0]
date_range_df = spark.sql(
    f"SELECT explode(sequence(to_date('{min_date}'), to_date('{max_date}'), interval 1 day)) as Date"
)



In [4]:
# Step 3: Ensure each Ticker has daily values by joining with date range
Tickers_df = spark.createDataFrame([(Ticker,) for Ticker in Tickers], ["Ticker"])
full_date_Ticker_df = date_range_df.crossJoin(Tickers_df)

# Join full date range with stock data
stockprice_df = full_date_Ticker_df.join(
    stockprice_df, on=["Date", "Ticker"], how="left"
)

# Fill missing values using the last known value for each Ticker
window_spec = (
    Window.partitionBy("Ticker")
    .orderBy("Date")
    .rowsBetween(Window.unboundedPreceding, 0)
)
stockprice_df = stockprice_df.withColumn(
    "Close", last("Close", ignorenulls=True).over(window_spec)
)
stockprice_df = stockprice_df.withColumn(
    "Volume", last("Volume", ignorenulls=True).over(window_spec)
)

# Fill any remaining missing values with defaults
median_price = stockprice_df.approxQuantile("Close", [0.5], 0.01)[0]  # Median estimate
stockprice_df = stockprice_df.na.fill({"Close": median_price, "Volume": 0})



                                                                                

In [5]:
# Step 4: Convert to Pandas to continue with sentiment processing
final_daily_df = stockprice_df.toPandas()
final_daily_df["Date"] = pd.to_datetime(final_daily_df["Date"], dayfirst=True)



In [9]:
# Step 5: Load and process tweet data
stocktweet = spark.read.csv(stocktweet_path, header=True, inferSchema=True)
stocktweet = stocktweet.filter(stocktweet.stock_name.isin(Tickers))
stocktweet_pd = stocktweet.toPandas()

# Sentiment analysis setup
sia = SentimentIntensityAnalyzer()


# Clean and analyze tweets
def clean_text(text):
    return re.sub(r"[^a-zA-Z0-9\s]", "", text.lower()) if isinstance(text, str) else ""


stocktweet_pd["cleaned_tweet"] = stocktweet_pd["tweet"].apply(clean_text)
stocktweet_pd["date"] = pd.to_datetime(stocktweet_pd["date"], dayfirst=True)
stocktweet_pd["sentiment"] = stocktweet_pd["cleaned_tweet"].apply(
    lambda x: sia.polarity_scores(x)["compound"]
)

# Prepare tweet data for merging
stocktweet_pd = stocktweet_pd[["date", "stock_name", "sentiment"]]
stocktweet_df = stocktweet_pd.rename(columns={"date": "Date", "stock_name": "Ticker"})

# Merge with stock data
merged_df = pd.merge(final_daily_df, stocktweet_df, on=["Date", "Ticker"], how="left")
merged_df["sentiment"].fillna(0, inplace=True)  # Neutral sentiment if missing

# Convert merged data back to Spark DataFrame
merged_df = spark.createDataFrame(merged_df)

# Aggregate results
final_aggregated_df = merged_df.groupBy("Date", "Ticker").agg(
    F_mean("Close").alias("Close"),
    F_mean("Volume").alias("Volume"),
    F_mean("sentiment").alias("sentiment"),
)

# Save individual CSVs for each Ticker
for Ticker in Tickers:
    Ticker_df = final_aggregated_df.filter(
        final_aggregated_df.Ticker == Ticker
    ).toPandas()
    Ticker_df=Ticker_df.sort_values("Date")
    output_filepath = os.path.join(output_path, f"{Ticker}_processed.csv")
    Ticker_df.to_csv(output_filepath, index=False)

print(
    "Processed stock data with single row per date saved in 'processed_stocks' folder."
)

  stocktweet_pd["date"] = pd.to_datetime(stocktweet_pd["date"], dayfirst=True)
The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.


  merged_df["sentiment"].fillna(0, inplace=True)  # Neutral sentiment if missing


Processed stock data with single row per date saved in 'processed_stocks' folder.


In [8]:
Ticker_df.sort_values('Date')

Unnamed: 0,Date,Ticker,Close,Volume,sentiment
85,2019-12-31,AMZN,92.391998,50130000.0,0.0000
135,2020-01-01,AMZN,92.391998,50130000.0,0.0000
65,2020-01-02,AMZN,94.900497,80580000.0,0.0000
90,2020-01-03,AMZN,93.748497,75288000.0,0.0000
88,2020-01-04,AMZN,93.748497,75288000.0,0.0000
...,...,...,...,...,...
217,2020-12-27,AMZN,158.634506,29038000.0,0.0000
288,2020-12-28,AMZN,164.197998,113736000.0,0.0000
310,2020-12-29,AMZN,166.100006,97458000.0,0.0097
218,2020-12-30,AMZN,164.292496,64186000.0,0.0000
