In [2]:
from pyspark.sql import SparkSession


In [3]:
spark = SparkSession.builder.getOrCreate()


In [4]:
import os
import pandas as pd
from datetime import datetime, timedelta


In [28]:
parent_dir = os.path.dirname(os.getcwd())
metv_path = os.path.join(parent_dir, "data", "raw", "metv_monthly.json")

df_metv = pd.read_json(metv_path)
df_metv['Date'] = pd.to_datetime(df_metv['Date'])
df_metv.head()

Unnamed: 0,Date,Open,High,Low,Close,Adj Close,Volume
0,2023-05-11,9.14,9.18,9.09,9.15,9.15,293900
1,2023-05-12,9.18,9.18,8.985,9.06,9.06,322100
2,2023-05-15,9.12,9.22,9.04,9.22,9.22,460300
3,2023-05-16,9.14,9.2,9.115,9.15,9.15,169100
4,2023-05-17,9.18,9.38,9.18,9.36,9.36,267800


**Processing news data**

In [52]:
news_path = os.path.join(parent_dir, "data", "raw", "news.json")
df_news = pd.read_json(news_path)
df_news['publish_date'] = pd.to_datetime(df_news['publish_date'])
df_news['Date'] = df_news['publish_date'].dt.strftime('%Y-%m-%d')


In [53]:
# Convert the 'Date' column in both dataframes to datetime
df_news['Date'] = pd.to_datetime(df_news['Date'])
df_metv['Date'] = pd.to_datetime(df_metv['Date'])

combined_df = pd.merge(df_news, df_metv, on='Date', how='outer')

# Sort the dataframe by 'Date' column
combined_df = combined_df.sort_values('Date')

# Reset the index of the combined dataframe
combined_df = combined_df.reset_index(drop=True)
combined_df.dropna(inplace=True)

#Create the final text for implementing NLP
combined_df["summary"] = combined_df["title"] + " " + combined_df["description"] + " " + combined_df["content"]

In [58]:
df_news = combined_df.copy()

Applying the classifier model

In [59]:
import flair

# Load the sentiment classifier
classifier = flair.models.TextClassifier.load('en-sentiment')

# Define a function for sentiment analysis
def sentiment_analysis(text):
    sentence = flair.data.Sentence(text)
    classifier.predict(sentence)
    label = sentence.labels[0].value
    score = sentence.labels[0].score
    return label, score


In [60]:
# Convert DataFrame to dictionary
data_dict = df_news['summary'].to_dict()

In [61]:
# Apply sentiment analysis to the dictionary
sentiment_results = {key: sentiment_analysis(value) for key, value in data_dict.items()}

In [62]:
# Convert sentiment results dictionary to DataFrame
sentiment_df = pd.DataFrame.from_dict(sentiment_results, orient='index', columns=['label', 'score'])

# Merge sentiment results DataFrame with the original DataFrame
df_news = pd.concat([df_news, sentiment_df], axis=1)

In [70]:
columns_to_drop = ["title", "description", "content", "publish_date", "Open", "High", "Low", "Close", "summary"]
df_news.drop(columns=columns_to_drop, inplace=True)
df_news['score'] = df_news.apply(lambda row: row['score'] * -1 if row['label'] == 'NEGATIVE' else row['score'], axis=1)
df_news.head()

Unnamed: 0,Date,Adj Close,Volume,label,score
0,2023-05-11,9.15,293900.0,NEGATIVE,-0.999829
1,2023-05-11,9.15,293900.0,POSITIVE,0.975447
2,2023-05-11,9.15,293900.0,POSITIVE,0.994877
3,2023-05-11,9.15,293900.0,NEGATIVE,-0.777512
4,2023-05-11,9.15,293900.0,NEGATIVE,-0.841667


In [74]:
# Calculate average score for each date in df_news
df_news_avg_scores = df_news.groupby('Date')['score'].mean().reset_index()

# Merge df_news_avg_scores with df_metv based on 'Date'
merged_df = df_news_avg_scores.merge(df_metv[['Date', 'Adj Close', 'Volume']], on='Date', how='inner')

# Display the resulting DataFrame with selected columns
result_df = merged_df[['Date', 'Adj Close', 'Volume', 'score']]
result_df['score'] = result_df['score'] + 10

In [81]:
current_file_path = os.getcwd()
parent_folder_path = os.path.dirname(os.path.dirname(current_file_path))
output_directory = os.path.join(parent_folder_path, 'big_data_project', "data", "processed")

monthly_data_output = os.path.join(output_directory, "processed_monthly_data.csv")
result_df.to_csv(monthly_data_output, index=False)

In [71]:
# fig2 = go.Figure(data=go.Scatter(x=df_moving_avg_news['date'], y=df_moving_avg_news['moving_avg_score'], mode='lines'))

# fig2.update_layout(
#     xaxis_title='Date',
#     yaxis_title='Score',
#     title='Score vs. Date',
#     xaxis=dict(
#         tickmode='linear',
#         dtick='D1',  # Set the tick frequency to one day (D1)
#     )
# )

# fig2.show()

In [72]:
# # Create the first scatter trace
# trace1 = go.Scatter(
#     x=df_moving_avg_stock['date'],
#     y=df_moving_avg_stock['moving_avg_price'],
#     mode='lines',
#     name='Adj Close'
# )

# # Create the second scatter trace with a secondary y-axis
# trace2 = go.Scatter(
#     x=df_moving_avg_news['date'],
#     y=df_moving_avg_news['moving_avg_score'],
#     mode='lines',
#     name='Score',
#     yaxis='y2'
# )

# # Define the layout
# layout = go.Layout(
#     title='Adj Close and Score Comparison',
#     xaxis=dict(title='Date'),
#     yaxis=dict(title='Adj Close'),
#     yaxis2=dict(
#         title='Score',
#         overlaying='y',
#         side='right'
#     )
# )

# # Create the figure
# fig = go.Figure(data=[trace1, trace2], layout=layout)

# # Display the figure
# fig.show()
