In [None]:
!pip install pytrends yfinance

Collecting pytrends
  Downloading pytrends-4.9.2-py3-none-any.whl.metadata (13 kB)
Downloading pytrends-4.9.2-py3-none-any.whl (15 kB)
Installing collected packages: pytrends
Successfully installed pytrends-4.9.2


In [None]:
import yfinance as yf
from pytrends.request import TrendReq
import pandas as pd
import os, time, json
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, round, lit

In [None]:
def fetch_stock_price():
    try:
        # Fetch stock data for Netflix (NFLX)
        stock_df = yf.download("NFLX", period="5d", interval="1d")
        # Flatten the column index if multi-level
        if stock_df.columns.nlevels > 1:
            stock_df.columns = stock_df.columns.get_level_values(0)
        print("Stock DataFrame:")
        print(stock_df)
    except Exception as e:
        print(f"Error fetching stock data: {e}")

fetch_stock_price()

[*********************100%***********************]  1 of 1 completed

Stock DataFrame:
Price             Close         High          Low         Open   Volume
Date                                                                   
2025-04-28  1110.380005  1114.000000  1082.619995  1100.000000  3831100
2025-04-29  1125.640015  1127.810059  1095.479980  1103.920044  3777300
2025-04-30  1131.719971  1133.199951  1101.109985  1112.650024  4266700
2025-05-01  1133.469971  1142.420044  1111.829956  1122.520020  3499500
2025-05-02  1156.489990  1159.439941  1133.319946  1136.660034  3767900





In [None]:
spark = SparkSession.builder.master("local").appName("NetflixStockGenreAnalysis").getOrCreate()

def fetch_stock_price():
    try:
        # Fetch stock data for Netflix (NFLX)
        stock_df = yf.download("NFLX", period="5d", interval="1d")

        if stock_df.columns.nlevels > 1:
            stock_df.columns = stock_df.columns.get_level_values(0)
        # print("Stock DataFrame:")
        # print(stock_df)

        # Access the 'Close' column correctly (multi-level index)
        close_prices = stock_df['Close']

        # Calculate the average close price
        avg_close_price = close_prices.mean()
        return stock_df, avg_close_price

    except Exception as e:
        print(f"Error fetching stock data: {e}")
        return None, None



# ✅ Fetch Google Trends Interest
def fetch_trend_interest():
    try:
        pytrends = TrendReq()
        pytrends.build_payload(kw_list=["Netflix"], timeframe="now 7-d")
        trend_df = pytrends.interest_over_time()
        if trend_df.empty:
            print("⚠️ Trend data is empty!")
            return None, 0
        print("\ntrend data:\n", trend_df)
        trend_df = trend_df.reset_index()
        interest_scores = trend_df['Netflix']
        avg_trend_interest = interest_scores.mean()
        return trend_df, avg_trend_interest
    except Exception as e:
        print(f"⚠️ Error fetching trend data: {e}")
        return None, 0



base_df_data = [
    ("Comedy", "Youth"),
    ("Thriller", "Youth"),
    ("Drama", "Adult"),
    ("Action", "Adult")
]
base_df = spark.createDataFrame(base_df_data, ["genre", "audience"])

# Fetch stock and trend data
stock_df, avg_price = fetch_stock_price()
trend_df, avg_interest = fetch_trend_interest()

# Check if data is valid before proceeding
if avg_price is None or avg_price == 0.0 or avg_interest is None or avg_interest == 0:
    print("⚠️ Invalid data, please check the data fetching functions.")
else:
    # Add static values as columns
    df = base_df.withColumn("avg_price", lit(avg_price))  # Use lit() for scalar values
    df = df.withColumn("avg_interest", lit(avg_interest))  # Use lit() for scalar values

    # Add interest_level
    df = df.withColumn(
        "interest_level",
        when(col("avg_interest") >= 81, "High")
        .when(col("avg_interest") >= 70, "Moderate")
        .otherwise("Low")
    )

    # Add price_level
    df = df.withColumn(
        "price_level",
        when(col("avg_price") >= 440, "High")
        .when(col("avg_price") >= 435, "Moderate")
        .otherwise("Low")
    )

    # Show Result in Colab
    print("\nFinal PySpark DataFrame:")
    df.show(truncate=False)

    # Save to CSV
    df_pd = df.toPandas()
    df_pd.to_csv("netflix_genre_analysis.csv", index=False)
    print("✅ CSV saved as 'netflix_genre_analysis.csv'")

# Stop the Spark session
spark.stop()


# stock_df, avg_close_price = fetch_stock_price()
# trend_df, avg_trend_interest = fetch_trend_interest()

# if stock_df is not None and trend_df is not None:
#     # Add avg_trend_interest as a column to stock_df
#     stock_df['avg_trend_interest'] = avg_trend_interest

#     # Print results
#     print("Stock DataFrame with avg_trend_interest:")
#     print(stock_df)
#     print(f"\nCalculated Average Close Price: {avg_close_price}")
#     print(f"Calculated Average Trend Interest: {avg_trend_interest}")

[*********************100%***********************]  1 of 1 completed
  df = df.fillna(False)



trend data:
                      Netflix  isPartial
date                                   
2025-04-27 18:00:00       82      False
2025-04-27 19:00:00       86      False
2025-04-27 20:00:00       80      False
2025-04-27 21:00:00       81      False
2025-04-27 22:00:00       76      False
...                      ...        ...
2025-05-04 14:00:00       59      False
2025-05-04 15:00:00       63      False
2025-05-04 16:00:00       68      False
2025-05-04 17:00:00       73      False
2025-05-04 18:00:00       88       True

[169 rows x 2 columns]

Final PySpark DataFrame:
+--------+--------+-----------------+-----------------+--------------+-----------+
|genre   |audience|avg_price        |avg_interest     |interest_level|price_level|
+--------+--------+-----------------+-----------------+--------------+-----------+
|Comedy  |Youth   |1131.539990234375|57.79289940828402|Low           |High       |
|Thriller|Youth   |1131.539990234375|57.79289940828402|Low           |High       |
|

In [None]:
output_dir = "/content/stream_input"
os.makedirs(output_dir, exist_ok=True)

In [None]:
genre_data = [
    {"genre": "Comedy", "audience": "Youth"},
    {"genre": "Thriller", "audience": "Youth"},
    {"genre": "Documentary", "audience": "Adults"},
    {"genre": "Romance", "audience": "Youth"},
    {"genre": "Action", "audience": "All"},
    {"genre": "Horror", "audience": "Adults"}
]

base_df = spark.createDataFrame(genre_data)

# ✅ Fetch Netflix Stock Price
def fetch_stock_price():
    try:
        stock_df = yf.download("NFLX", period="5d", interval="1d")
        if stock_df.empty:
            print("⚠️ Stock data is empty!")
            return 0.0
        print("\nstock data:\n", stock_df)
        return round(stock_df["Close"].mean(), 2)  # calculate the average Close price
    except Exception as e:
        print(f"⚠️ Error fetching stock data: {e}")
        return 0.0

# ✅ Fetch Google Trends Interest
def fetch_trend_interest():
    try:
        pytrends = TrendReq()
        pytrends.build_payload(kw_list=["Netflix"], timeframe="now 7-d")
        trend_df = pytrends.interest_over_time()
        if trend_df.empty:
            print("⚠️ Trend data is empty!")
            return 0
        print("\ntrend data:\n", trend_df)
        return round(trend_df["Netflix"].mean(), 0)  # calculate the average interest
    except Exception as e:
        print(f"⚠️ Error fetching trend data: {e}")
        return 0

# ✅ Fetch data dynamically
avg_price = fetch_stock_price()
avg_interest = fetch_trend_interest()

# ✅ Check if data is valid before proceeding
if avg_price == 0.0 or avg_interest == 0:
    print("⚠️ Invalid data, please check the data fetching functions.")

# ✅ Add static values as columns
df = base_df.withColumn("avg_price", lit(avg_price))  # Use lit() for scalar values
df = df.withColumn("avg_interest", lit(avg_interest))  # Use lit() for scalar values

# ✅ Add interest_level
df = df.withColumn(
    "interest_level",
    when(col("avg_interest") >= 81, "High")
    .when(col("avg_interest") >= 70, "Moderate")
    .otherwise("Low")
)

# ✅ Add price_level
df = df.withColumn(
    "price_level",
    when(col("avg_price") >= 440, "High")
    .when(col("avg_price") >= 435, "Moderate")
    .otherwise("Low")
)

# ✅ Show Result in Colab
df.show(truncate=False)

# ✅ Save to CSV
df_pd = df.toPandas()
df_pd.to_csv("netflix_genre_analysis.csv", index=False)
print("✅ CSV saved as 'netflix_genre_analysis.csv'")

[*********************100%***********************]  1 of 1 completed
  df = df.fillna(False)



stock data:
 Price             Close         High          Low         Open   Volume
Ticker             NFLX         NFLX         NFLX         NFLX     NFLX
Date                                                                   
2025-04-28  1110.380005  1114.000000  1082.619995  1100.000000  3831100
2025-04-29  1125.640015  1127.810059  1095.479980  1103.920044  3777300
2025-04-30  1131.719971  1133.199951  1101.109985  1112.650024  4266700
2025-05-01  1133.469971  1142.420044  1111.829956  1122.520020  3499500
2025-05-02  1156.489990  1159.439941  1133.319946  1136.660034  3767900
⚠️ Error fetching stock data: [NOT_COLUMN_OR_STR] Argument `col` should be a Column or str, got Series.

trend data:
                      Netflix  isPartial
date                                   
2025-04-27 17:00:00       66      False
2025-04-27 18:00:00       77      False
2025-04-27 19:00:00       84      False
2025-04-27 20:00:00       83      False
2025-04-27 21:00:00       78      False
...         

In [None]:
spark = SparkSession.builder.appName("NetflixStreamingAnalysis").getOrCreate()

# Read streaming data from folder
df = spark.readStream.schema("""
    timestamp STRING,
    genre STRING,
    audience STRING,
    avg_price DOUBLE,
    avg_interest DOUBLE
""").json("/content/stream_input")

# Add column
df = df.withColumn(
    "interest_level",
    when(col("avg_interest") >= 81, "High")
    .when(col("avg_interest") >= 70, "Moderate")
    .otherwise("Low")
)

df = df.withColumn(
    "price_level",
    when(col("avg_price") >= 440, "High")
    .when(col("avg_price") >= 435, "Moderate")
    .otherwise("Low")
)

# Output to console log
query = df.writeStream.outputMode("append").format("console").option("truncate", False).start()

In [None]:
# Install required libraries
# !pip install yfinance pytrends pyspark

import yfinance as yf
from pytrends.request import TrendReq
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, when, sum as sum_
import time

# Initialize Spark session
spark = SparkSession.builder.appName("NetflixGenreAnalysis").getOrCreate()

# Function to fetch stock price
def fetch_stock_price():
    try:
        stock_df = yf.download("NFLX", period="5d", interval="1d")
        print("stock data:\n", stock_df)
        if stock_df.columns.nlevels > 1:
            stock_df.columns = stock_df.columns.get_level_values(0)
        close_prices = stock_df['Close']
        avg_close_price = close_prices.mean()
        return stock_df, avg_close_price
    except Exception as e:
        print(f"Error fetching stock data: {e}")
        return None, None

# Function to fetch trend interest for multiple genres
def fetch_trend_interest(genres):
    try:
        pytrends = TrendReq(hl='en-US', tz=360)
        trend_data = []
        for genre in genres:
            keyword = f"Netflix {genre}"
            pytrends.build_payload(kw_list=[keyword], timeframe="now 7-d")
            trend_df = pytrends.interest_over_time()
            time.sleep(2)  # Avoid rate limits
            if trend_df.empty:
                print(f"⚠️ Trend data for {keyword} is empty!")
                avg_interest = 10.0  # Fallback value
            else:
                avg_interest = trend_df[keyword].mean()
            trend_data.append({"genre": genre, "avg_interest": avg_interest})
        return pd.DataFrame(trend_data)
    except Exception as e:
        print(f"⚠️ Error fetching trend data: {e}")
        return pd.DataFrame()

# Define genres and audience mapping
genres = ["Comedy", "Thriller", "Drama", "Action"]
genre_audience_map = {
    "Comedy": "Youth",
    "Thriller": "Youth",
    "Drama": "Adult",
    "Action": "Adult"
}

# Fetch stock price
stock_df, avg_price = fetch_stock_price()
print(f"\nAverage Stock Price: {avg_price}")

# Fetch trend interest for each genre
trend_df_pandas = fetch_trend_interest(genres)

# Create base_df with genre and audience
base_df_data = [(genre, genre_audience_map[genre]) for genre in genres]
base_df = spark.createDataFrame(base_df_data, ["genre", "audience"])

# Convert trend_df_pandas to PySpark DataFrame
if not trend_df_pandas.empty:
    trend_df = spark.createDataFrame(trend_df_pandas)
else:
    print("⚠️ No trend data available!")
    trend_df = spark.createDataFrame([(genre, 10.0) for genre in genres], ["genre", "avg_interest"])

# Join base_df with trend_df
df = base_df.join(trend_df, "genre", "left")

# Check if data is valid
if avg_price is None or avg_price == 0.0 or df.filter(col("avg_interest") > 0).count() == 0:
    print("⚠️ Invalid data, please check the data fetching functions.")
else:
    # Calculate total interest for normalization
    total_interest = df.select(sum_(col("avg_interest")).alias("total_interest")).collect()[0]["total_interest"]

    # Add weight based on avg_interest
    df = df.withColumn("weight", col("avg_interest") / total_interest)

    # Adjust avg_price based on weight
    df = df.withColumn("avg_price", lit(avg_price) * col("weight"))

    # Add interest_level
    df = df.withColumn(
        "interest_level",
        when(col("avg_interest") >= 60, "High")
        .when(col("avg_interest") >= 50, "Moderate")
        .otherwise("Low")
    )

    # Add price_level
    df = df.withColumn(
        "price_level",
        when(col("avg_price") >= 400, "High")
        .when(col("avg_price") >= 300, "Moderate")
        .otherwise("Low")
    )

    # Select final columns
    df = df.select("genre", "audience", "avg_price", "avg_interest", "interest_level", "price_level")

    # Show Result
    print("Final PySpark DataFrame:")
    df.show(truncate=False)

    # Save to CSV
    df_pd = df.toPandas()
    df_pd.to_csv("netflix_genre_analysis.csv", index=False)
    print("✅ CSV saved as 'netflix_genre_analysis.csv'")

# Stop the Spark session
spark.stop()

[*********************100%***********************]  1 of 1 completed
ERROR:yfinance:
1 Failed download:
ERROR:yfinance:['NFLX']: YFRateLimitError('Too Many Requests. Rate limited. Try after a while.')


stock data:
 Empty DataFrame
Columns: [(Adj Close, NFLX), (Close, NFLX), (High, NFLX), (Low, NFLX), (Open, NFLX), (Volume, NFLX)]
Index: []

Average Stock Price: nan


  df = df.fillna(False)
  df = df.fillna(False)
  df = df.fillna(False)
  df = df.fillna(False)


Final PySpark DataFrame:
+--------+--------+---------+-----------------+--------------+-----------+
|genre   |audience|avg_price|avg_interest     |interest_level|price_level|
+--------+--------+---------+-----------------+--------------+-----------+
|Thriller|Youth   |NaN      |36.55029585798817|Low           |High       |
|Comedy  |Youth   |NaN      |32.0414201183432 |Low           |High       |
|Drama   |Adult   |NaN      |64.16568047337279|High          |High       |
|Action  |Adult   |NaN      |53.82248520710059|Moderate      |High       |
+--------+--------+---------+-----------------+--------------+-----------+

✅ CSV saved as 'netflix_genre_analysis.csv'


# 2nd approach

In [None]:
# Install required libraries
# !pip install yfinance pytrends pyspark

import yfinance as yf
from pytrends.request import TrendReq
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, when, sum as sum_
import time
import random
import requests

# Initialize Spark session
spark = SparkSession.builder.appName("NetflixGenreAnalysis").getOrCreate()

# Function to fetch stock price with retry and User-Agent rotation
def fetch_stock_price(ticker="NFLX", retries=3, initial_delay=5):
    user_agents = [
        'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
        'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0.3 Safari/605.1.15',
        'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.107 Safari/537.36'
    ]
    for attempt in range(retries):
        try:
            # Set random User-Agent
            yf.pdr_override()  # Ensure compatibility
            session = requests.Session()
            session.headers.update({'User-Agent': random.choice(user_agents)})
            stock_df = yf.download(ticker, period="5d", interval="1d", session=session)
            if stock_df.empty:
                print(f"⚠️ Empty stock data for {ticker}!")
                raise ValueError("Empty DataFrame")
            if stock_df.columns.nlevels > 1:
                stock_df.columns = stock_df.columns.get_level_values(0)
            close_prices = stock_df['Close']
            avg_close_price = close_prices.mean()
            return stock_df, avg_close_price
        except Exception as e:
            print(f"Attempt {attempt + 1}/{retries} failed for {ticker}: {e}")
            if attempt < retries - 1:
                delay = initial_delay * (2 ** attempt)  # Exponential backoff
                print(f"Retrying after {delay} seconds...")
                time.sleep(delay)
            else:
                print(f"⚠️ Failed to fetch stock data for {ticker} after {retries} attempts.")
                return None, None

# Function to fetch trend interest for multiple genres
def fetch_trend_interest(genres):
    try:
        pytrends = TrendReq(hl='en-US', tz=360)
        trend_data = []
        for genre in genres:
            keyword = f"Netflix {genre}"
            pytrends.build_payload(kw_list=[keyword], timeframe="now 7-d")
            time.sleep(2)  # Avoid PyTrends rate limits
            trend_df = pytrends.interest_over_time()
            if trend_df.empty:
                print(f"⚠️ Trend data for {keyword} is empty!")
                avg_interest = 10.0  # Fallback value
            else:
                avg_interest = trend_df[keyword].mean()
            trend_data.append({"genre": genre, "avg_interest": avg_interest})
        return pd.DataFrame(trend_data)
    except Exception as e:
        print(f"⚠️ Error fetching trend data: {e}")
        return pd.DataFrame()

# Define genres and audience mapping
genres = ["Comedy", "Thriller", "Drama", "Action"]
genre_audience_map = {
    "Comedy": "Youth",
    "Thriller": "Youth",
    "Drama": "Adult",
    "Action": "Adult"
}

# Fetch stock price
stock_df, avg_price = fetch_stock_price()
if avg_price is None:
    print("⚠️ Using fallback avg_price due to fetch failure.")
    avg_price = 436.22  # Fallback to align with your example

# Fetch trend interest for each genre
trend_df_pandas = fetch_trend_interest(genres)

# Create base_df with genre and audience
base_df_data = [(genre, genre_audience_map[genre]) for genre in genres]
base_df = spark.createDataFrame(base_df_data, ["genre", "audience"])

# Convert trend_df_pandas to PySpark DataFrame
if not trend_df_pandas.empty:
    trend_df = spark.createDataFrame(trend_df_pandas)
else:
    print("⚠️ No trend data available!")
    trend_df = spark.createDataFrame([(genre, 10.0) for genre in genres], ["genre", "avg_interest"])

# Join base_df with trend_df
df = base_df.join(trend_df, "genre", "left")

# Check if data is valid
if avg_price == 0.0 or df.filter(col("avg_interest") > 0).count() == 0:
    print("⚠️ Invalid data, please check the data fetching functions.")
else:
    # Calculate total interest for normalization
    total_interest = df.select(sum_(col("avg_interest")).alias("total_interest")).collect()[0]["total_interest"]

    # Add weight based on avg_interest
    df = df.withColumn("weight", col("avg_interest") / total_interest)

    # Adjust avg_price based on weight
    df = df.withColumn("avg_price", lit(avg_price) * col("weight"))

    # Add interest_level
    df = df.withColumn(
        "interest_level",
        when(col("avg_interest") >= 60, "High")
        .when(col("avg_interest") >= 50, "Moderate")
        .otherwise("Low")
    )

    # Add price_level
    df = df.withColumn(
        "price_level",
        when(col("avg_price") >= 400, "High")
        .when(col("avg_price") >= 300, "Moderate")
        .otherwise("Low")
    )

    # Select final columns
    df = df.select("genre", "audience", "avg_price", "avg_interest", "interest_level", "price_level")

    # Show Result
    print("Final PySpark DataFrame:")
    df.show(truncate=False)

    # Save to CSV
    df_pd = df.toPandas()
    df_pd.to_csv("netflix_genre_analysis.csv", index=False)
    print("✅ CSV saved as 'netflix_genre_analysis.csv'")

# Stop the Spark session
spark.stop()

Attempt 1/3 failed for NFLX: module 'yfinance' has no attribute 'pdr_override'
Retrying after 5 seconds...
Attempt 2/3 failed for NFLX: module 'yfinance' has no attribute 'pdr_override'
Retrying after 10 seconds...
Attempt 3/3 failed for NFLX: module 'yfinance' has no attribute 'pdr_override'
⚠️ Failed to fetch stock data for NFLX after 3 attempts.
⚠️ Using fallback avg_price due to fetch failure.


  df = df.fillna(False)
  df = df.fillna(False)
  df = df.fillna(False)
  df = df.fillna(False)


Final PySpark DataFrame:
+--------+--------+------------------+------------------+--------------+-----------+
|genre   |audience|avg_price         |avg_interest      |interest_level|price_level|
+--------+--------+------------------+------------------+--------------+-----------+
|Thriller|Youth   |85.3903935309973  |36.52662721893491 |Low           |Low        |
|Comedy  |Youth   |74.97423180592993 |32.071005917159766|Low           |Low        |
|Drama   |Adult   |150.00379514824797|64.16568047337279 |High          |Low        |
|Action  |Adult   |125.8515795148248 |53.83431952662722 |Moderate      |Low        |
+--------+--------+------------------+------------------+--------------+-----------+

✅ CSV saved as 'netflix_genre_analysis.csv'


In [None]:
# Install required libraries
# !pip install yfinance pytrends pyspark

import yfinance as yf
from pytrends.request import TrendReq
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, when, sum as sum_
import time
import random
import requests

# Initialize Spark session
spark = SparkSession.builder.appName("NetflixGenreAnalysis").getOrCreate()

# Function to fetch stock price with retry and User-Agent rotation
def fetch_stock_price(ticker="NFLX", retries=3, initial_delay=5):
    user_agents = [
        'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
        'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0.3 Safari/605.1.15',
        'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.107 Safari/537.36'
    ]
    for attempt in range(retries):
        try:
            # Create a session with random User-Agent
            session = requests.Session()
            session.headers.update({'User-Agent': random.choice(user_agents)})
            # Fetch stock data using yfinance with custom session
            stock_df = yf.download(ticker, period="5d", interval="1d", session=session)
            print("\nstock dataframe:\n", stock_df)
            if stock_df.empty:
                print(f"⚠️ Empty stock data for {ticker}!")
                raise ValueError("Empty DataFrame")
            if stock_df.columns.nlevels > 1:
                stock_df.columns = stock_df.columns.get_level_values(0)
            close_prices = stock_df['Close']
            avg_close_price = close_prices.mean()
            return stock_df, avg_close_price
        except Exception as e:
            print(f"Attempt {attempt + 1}/{retries} failed for {ticker}: {e}")
            if attempt < retries - 1:
                delay = initial_delay * (2 ** attempt)  # Exponential backoff
                print(f"Retrying after {delay} seconds...")
                time.sleep(delay)
            else:
                print(f"⚠️ Failed to fetch stock data for {ticker} after {retries} attempts.")
                return None, None

# Function to fetch trend interest for multiple genres
def fetch_trend_interest(genres):
    try:
        pytrends = TrendReq(hl='en-US', tz=360)
        trend_data = []
        for genre in genres:
            keyword = f"Netflix {genre}"
            pytrends.build_payload(kw_list=[keyword], timeframe="now 7-d")
            time.sleep(2)  # Avoid PyTrends rate limits
            trend_df = pytrends.interest_over_time()
            if trend_df.empty:
                print(f"⚠️ Trend data for {keyword} is empty!")
                avg_interest = 10.0  # Fallback value
            else:
                avg_interest = trend_df[keyword].mean()
            trend_data.append({"genre": genre, "avg_interest": avg_interest})
        return pd.DataFrame(trend_data)
    except Exception as e:
        print(f"⚠️ Error fetching trend data: {e}")
        return pd.DataFrame()

# Define genres and audience mapping
genres = ["Comedy", "Thriller", "Drama", "Action"]
genre_audience_map = {
    "Comedy": "Youth",
    "Thriller": "Youth",
    "Drama": "Adult",
    "Action": "Adult"
}

# Fetch stock price
stock_df, avg_price = fetch_stock_price()
print(f"\nbefore avg price: {avg_price}")
if avg_price is None:
    print("⚠️ Using fallback avg_price due to fetch failure.")
    avg_price = 436.22  # Fallback to align with your example

print(f"\nafter avg price: {avg_price}")

# Fetch trend interest for each genre
trend_df_pandas = fetch_trend_interest(genres)

# Create base_df with genre and audience
base_df_data = [(genre, genre_audience_map[genre]) for genre in genres]
base_df = spark.createDataFrame(base_df_data, ["genre", "audience"])

# Convert trend_df_pandas to PySpark DataFrame
if not trend_df_pandas.empty:
    trend_df = spark.createDataFrame(trend_df_pandas)
else:
    print("⚠️ No trend data available!")
    trend_df = spark.createDataFrame([(genre, 10.0) for genre in genres], ["genre", "avg_interest"])

# Join base_df with trend_df
df = base_df.join(trend_df, "genre", "left")

# Check if data is valid
if avg_price == 0.0 or df.filter(col("avg_interest") > 0).count() == 0:
    print("⚠️ Invalid data, please check the data fetching functions.")
else:
    # Calculate total interest for normalization
    total_interest = df.select(sum_(col("avg_interest")).alias("total_interest")).collect()[0]["total_interest"]

    # Add weight based on avg_interest
    df = df.withColumn("weight", col("avg_interest") / total_interest)

    # Adjust avg_price based on weight
    df = df.withColumn("avg_price", lit(avg_price) * col("weight"))

    # Add interest_level
    df = df.withColumn(
        "interest_level",
        when(col("avg_interest") >= 60, "High")
        .when(col("avg_interest") >= 50, "Moderate")
        .otherwise("Low")
    )

    # Add price_level
    df = df.withColumn(
        "price_level",
        when(col("avg_price") >= 400, "High")
        .when(col("avg_price") >= 300, "Moderate")
        .otherwise("Low")
    )

    # Select final columns
    df = df.select("genre", "audience", "avg_price", "avg_interest", "interest_level", "price_level")

    # Show Result
    print("Final PySpark DataFrame:")
    df.show(truncate=False)

    # Save to CSV
    df_pd = df.toPandas()
    df_pd.to_csv("netflix_genre_analysis.csv", index=False)
    print("✅ CSV saved as 'netflix_genre_analysis.csv'")

# Stop the Spark session
spark.stop()

[*********************100%***********************]  1 of 1 completed
ERROR:yfinance:
1 Failed download:
ERROR:yfinance:['NFLX']: YFRateLimitError('Too Many Requests. Rate limited. Try after a while.')



stock dataframe:
 Empty DataFrame
Columns: [(Adj Close, NFLX), (Close, NFLX), (High, NFLX), (Low, NFLX), (Open, NFLX), (Volume, NFLX)]
Index: []
⚠️ Empty stock data for NFLX!
Attempt 1/3 failed for NFLX: Empty DataFrame
Retrying after 5 seconds...


[*********************100%***********************]  1 of 1 completed
ERROR:yfinance:
1 Failed download:
ERROR:yfinance:['NFLX']: YFRateLimitError('Too Many Requests. Rate limited. Try after a while.')



stock dataframe:
 Empty DataFrame
Columns: [(Adj Close, NFLX), (Close, NFLX), (High, NFLX), (Low, NFLX), (Open, NFLX), (Volume, NFLX)]
Index: []
⚠️ Empty stock data for NFLX!
Attempt 2/3 failed for NFLX: Empty DataFrame
Retrying after 10 seconds...


[*********************100%***********************]  1 of 1 completed
ERROR:yfinance:
1 Failed download:
ERROR:yfinance:['NFLX']: YFRateLimitError('Too Many Requests. Rate limited. Try after a while.')



stock dataframe:
 Empty DataFrame
Columns: [(Adj Close, NFLX), (Close, NFLX), (High, NFLX), (Low, NFLX), (Open, NFLX), (Volume, NFLX)]
Index: []
⚠️ Empty stock data for NFLX!
Attempt 3/3 failed for NFLX: Empty DataFrame
⚠️ Failed to fetch stock data for NFLX after 3 attempts.

before avg price: None
⚠️ Using fallback avg_price due to fetch failure.

after avg price: 436.22


  df = df.fillna(False)
  df = df.fillna(False)
  df = df.fillna(False)
  df = df.fillna(False)


Final PySpark DataFrame:
+--------+--------+------------------+------------------+--------------+-----------+
|genre   |audience|avg_price         |avg_interest      |interest_level|price_level|
+--------+--------+------------------+------------------+--------------+-----------+
|Thriller|Youth   |87.45045505420357 |37.094674556213015|Low           |Low        |
|Comedy  |Youth   |77.51829298711266 |32.88165680473373 |Low           |Low        |
|Drama   |Adult   |145.89955485913467|61.887573964497044|High          |Low        |
|Action  |Adult   |125.35169709954911|53.171597633136095|Moderate      |Low        |
+--------+--------+------------------+------------------+--------------+-----------+

✅ CSV saved as 'netflix_genre_analysis.csv'


In [None]:
# Install required libraries
# !pip install yfinance pytrends pyspark

import yfinance as yf
from pytrends.request import TrendReq
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, when, sum as sum_
import time
import random
import requests

# Initialize Spark session
spark = SparkSession.builder.appName("NetflixGenreAnalysis").getOrCreate()

# Function to fetch stock price with retry and User-Agent rotation
def fetch_stock_price(ticker="NFLX", retries=5, initial_delay=10):
    user_agents = [
        'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
        'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0.3 Safari/605.1.15',
        'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.107 Safari/537.36'
    ]
    for attempt in range(retries):
        try:
            # Create a session with random User-Agent
            session = requests.Session()
            session.headers.update({'User-Agent': random.choice(user_agents)})
            # Fetch stock data
            stock_df = yf.download(ticker, period="5d", interval="1d", session=session)
            if stock_df.empty:
                print(f"⚠️ Empty stock data for {ticker}!")
                raise ValueError("Empty DataFrame")
            if stock_df.columns.nlevels > 1:
                stock_df.columns = stock_df.columns.get_level_values(0)
            close_prices = stock_df['Close']
            avg_close_price = close_prices.mean()
            return stock_df, avg_close_price
        except Exception as e:
            print(f"Attempt {attempt + 1}/{retries} failed for {ticker}: {e}")
            if attempt < retries - 1:
                delay = initial_delay * (2 ** attempt)  # Exponential backoff: 10s, 20s, 40s, 80s, 160s
                print(f"Retrying after {delay} seconds...")
                time.sleep(delay)
            else:
                print(f"⚠️ Failed to fetch stock data for {ticker} after {retries} attempts.")
                return None, None

# Function to fetch trend interest for multiple genres
def fetch_trend_interest(genres):
    try:
        pytrends = TrendReq(hl='en-US', tz=360)
        trend_data = []
        for genre in genres:
            keyword = f"Netflix {genre}"
            pytrends.build_payload(kw_list=[keyword], timeframe="now 7-d")
            time.sleep(2)  # Avoid PyTrends rate limits
            trend_df = pytrends.interest_over_time()
            if trend_df.empty:
                print(f"⚠️ Trend data for {keyword} is empty!")
                avg_interest = 10.0  # Fallback value
            else:
                avg_interest = trend_df[keyword].mean()
            trend_data.append({"genre": genre, "avg_interest": avg_interest})
        return pd.DataFrame(trend_data)
    except Exception as e:
        print(f"⚠️ Error fetching trend data: {e}")
        return pd.DataFrame()

# Define genres and audience mapping
genres = ["Comedy", "Thriller", "Drama", "Action"]
genre_audience_map = {
    "Comedy": "Youth",
    "Thriller": "Youth",
    "Drama": "Adult",
    "Action": "Adult"
}

# Pause to avoid cumulative rate limits
print("Pausing for 60 seconds to avoid Yahoo Finance rate limits...")
time.sleep(60)

# Fetch stock price
stock_df, avg_price = fetch_stock_price()
if avg_price is None:
    print("⚠️ Using fallback avg_price due to fetch failure.")
    avg_price = 436.22  # Fallback to align with your example

# Fetch trend interest for each genre
trend_df_pandas = fetch_trend_interest(genres)

# Create base_df with genre and audience
base_df_data = [(genre, genre_audience_map[genre]) for genre in genres]
base_df = spark.createDataFrame(base_df_data, ["genre", "audience"])

# Convert trend_df_pandas to PySpark DataFrame
if not trend_df_pandas.empty:
    trend_df = spark.createDataFrame(trend_df_pandas)
else:
    print("⚠️ No trend data available!")
    trend_df = spark.createDataFrame([(genre, 10.0) for genre in genres], ["genre", "avg_interest"])

# Join base_df with trend_df
df = base_df.join(trend_df, "genre", "left")

# Check if data is valid
if avg_price == 0.0 or df.filter(col("avg_interest") > 0).count() == 0:
    print("⚠️ Invalid data, please check the data fetching functions.")
else:
    # Calculate total interest for normalization
    total_interest = df.select(sum_(col("avg_interest")).alias("total_interest")).collect()[0]["total_interest"]

    # Add weight based on avg_interest
    df = df.withColumn("weight", col("avg_interest") / total_interest)

    # Adjust avg_price based on weight
    df = df.withColumn("avg_price", lit(avg_price) * col("weight"))

    # Add interest_level
    df = df.withColumn(
        "interest_level",
        when(col("avg_interest") >= 60, "High")
        .when(col("avg_interest") >= 50, "Moderate")
        .otherwise("Low")
    )

    # Add price_level
    df = df.withColumn(
        "price_level",
        when(col("avg_price") >= 400, "High")
        .when(col("avg_price") >= 300, "Moderate")
        .otherwise("Low")
    )

    # Select final columns
    df = df.select("genre", "audience", "avg_price", "avg_interest", "interest_level", "price_level")

    # Show Result
    print("Final PySpark DataFrame:")
    df.show(truncate=False)

    # Save to CSV
    df_pd = df.toPandas()
    df_pd.to_csv("netflix_genre_analysis.csv", index=False)
    print("✅ CSV saved as 'netflix_genre_analysis.csv'")

# Stop the Spark session
spark.stop()

Pausing for 60 seconds to avoid Yahoo Finance rate limits...


[*********************100%***********************]  1 of 1 completed
ERROR:yfinance:
1 Failed download:
ERROR:yfinance:['NFLX']: YFRateLimitError('Too Many Requests. Rate limited. Try after a while.')


⚠️ Empty stock data for NFLX!
Attempt 1/5 failed for NFLX: Empty DataFrame
Retrying after 10 seconds...


[*********************100%***********************]  1 of 1 completed
ERROR:yfinance:
1 Failed download:
ERROR:yfinance:['NFLX']: YFRateLimitError('Too Many Requests. Rate limited. Try after a while.')


⚠️ Empty stock data for NFLX!
Attempt 2/5 failed for NFLX: Empty DataFrame
Retrying after 20 seconds...


[*********************100%***********************]  1 of 1 completed
ERROR:yfinance:
1 Failed download:
ERROR:yfinance:['NFLX']: YFRateLimitError('Too Many Requests. Rate limited. Try after a while.')


⚠️ Empty stock data for NFLX!
Attempt 3/5 failed for NFLX: Empty DataFrame
Retrying after 40 seconds...


[*********************100%***********************]  1 of 1 completed
ERROR:yfinance:
1 Failed download:
ERROR:yfinance:['NFLX']: YFRateLimitError('Too Many Requests. Rate limited. Try after a while.')


⚠️ Empty stock data for NFLX!
Attempt 4/5 failed for NFLX: Empty DataFrame
Retrying after 80 seconds...


[*********************100%***********************]  1 of 1 completed
ERROR:yfinance:
1 Failed download:
ERROR:yfinance:['NFLX']: YFRateLimitError('Too Many Requests. Rate limited. Try after a while.')


⚠️ Empty stock data for NFLX!
Attempt 5/5 failed for NFLX: Empty DataFrame
⚠️ Failed to fetch stock data for NFLX after 5 attempts.
⚠️ Using fallback avg_price due to fetch failure.


  df = df.fillna(False)
  df = df.fillna(False)
  df = df.fillna(False)
  df = df.fillna(False)


Final PySpark DataFrame:
+--------+--------+-----------------+-----------------+--------------+-----------+
|genre   |audience|avg_price        |avg_interest     |interest_level|price_level|
+--------+--------+-----------------+-----------------+--------------+-----------+
|Thriller|Youth   |87.7450001604055 |37.10059171597633|Low           |Low        |
|Comedy  |Youth   |77.5990471912996 |32.81065088757396|Low           |Low        |
|Drama   |Adult   |145.6818902184723|61.59763313609467|High          |Low        |
|Action  |Adult   |125.1940624298226|52.93491124260355|Moderate      |Low        |
+--------+--------+-----------------+-----------------+--------------+-----------+

✅ CSV saved as 'netflix_genre_analysis.csv'


# Project

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, when, lit
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType, DateType
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from textblob import TextBlob
import requests
import yfinance as yf
from datetime import datetime, date, timedelta
import random

# --- Initialize Spark ---
spark = SparkSession.builder \
    .appName("NewsStockAnalysisPrediction") \
    .config("spark.sql.shuffle.partitions", "2") \
    .getOrCreate()

# --- Analysis Functions ---

def get_sentiment(text):
    if not text or text == 'No headline':
        return 'Unknown'
    polarity = TextBlob(text).sentiment.polarity
    if polarity > 0.2:
        return 'Positive'
    elif polarity < -0.2:
        return 'Negative'
    else:
        return 'Neutral'

def infer_genre(text):
    if not text or text == 'No headline':
        return 'Unknown'
    if 'thriller' in text.lower(): return 'Thriller'
    elif 'drama' in text.lower(): return 'Drama'
    elif 'comedy' in text.lower(): return 'Comedy'
    elif 'romance' in text.lower(): return 'Romance'
    else: return 'Other'

def infer_audience(text):
    if not text or text == 'No headline':
        return 'Unknown'
    if any(x in text.lower() for x in ['teen', 'gen z', 'youth']): return 'Youth'
    elif 'adult' in text.lower(): return 'Adults'
    else: return 'General'

def calculate_impact(sentiment, open_price, close_price):
    if sentiment == 'Unknown' or open_price is None or close_price is None:
        return 'Unknown'
    change = (close_price - open_price) / open_price * 100
    if sentiment == 'Positive' and change > 0:
        return 'Positive'
    elif sentiment == 'Negative' and change < 0:
        return 'Negative'
    else:
        return 'Neutral'

# Register UDFs
sentiment_udf = udf(get_sentiment, StringType())
genre_udf = udf(infer_genre, StringType())
audience_udf = udf(infer_audience, StringType())
impact_udf = udf(calculate_impact, StringType())

# --- Data Fetching Functions ---

def is_weekend(date_obj):
    return date_obj.weekday() >= 5

def fetch_news_by_date(api_key, date_str):
    url = f"https://newsapi.org/v2/everything?q=Netflix&from={date_str}&to={date_str}&language=en&sortBy=publishedAt&apiKey={api_key}"
    try:
        print(f"Fetching news for {date_str}")
        response = requests.get(url)
        response.raise_for_status()
        articles = response.json().get('articles', [])
        if not articles:
            print(f"No articles found for {date_str}")
            return None, None
        article = articles[0]
        return article['title'] or 'No headline', article['publishedAt']
    except requests.RequestException as e:
        print(f"Error fetching news for {date_str}: {e}")
        return None, None

# def mock_stock_price(date_str):
#     print(f"Using mock stock data for {date_str}")
#     open_price = 600.00 + random.uniform(-10, 10)
#     close_price = open_price + random.uniform(-5, 5)
#     return round(open_price, 2), round(close_price, 2), datetime.strptime(date_str, "%Y-%m-%d").date()

def fetch_stock_price_by_date(ticker='NFLX', date_str='2025-05-01', use_mock=False):
    target_date = datetime.strptime(date_str, "%Y-%m-%d").date()
    # if use_mock:
    #     return mock_stock_price(date_str)
    try:
        stock = yf.Ticker(ticker)
        df = stock.history(start=date_str, end=(target_date + timedelta(days=1)).strftime("%Y-%m-%d"))
        if df.empty:
            print(f"No stock data for {ticker} on {date_str}")
            return None, None, None
        stock_date = df.index[0].date()
        if stock_date != target_date:
            print(f"Stock data date {stock_date} does not match {target_date}")
            return None, None, None
        return round(df['Open'].iloc[0], 2), round(df['Close'].iloc[0], 2), stock_date
    except Exception as e:
        print(f"Error fetching stock data for {date_str}: {e}")
        return None, None, None

# --- Prediction Function ---

def train_stock_prediction_model(historical_data):
    # Prepare features: sentiment score, previous day's close, open
    data = historical_data.withColumn("sentiment_score",
        when(col("sentiment") == "Positive", 1.0)
        .when(col("sentiment") == "Negative", -1.0)
        .otherwise(0.0))

    # Create feature vector
    assembler = VectorAssembler(
        inputCols=["sentiment_score", "price_close", "price_open"],
        outputCol="features",
        handleInvalid="skip"
    )

    # Shift data to predict next day's close
    from pyspark.sql.window import Window
    from pyspark.sql.functions import lag
    w = Window.orderBy("date")
    data = data.withColumn("next_close", lag("price_close", -1).over(w))

    # Drop rows with null values
    data = data.dropna(subset=["next_close", "price_close", "price_open", "sentiment_score"])

    # Assemble features
    data = assembler.transform(data)

    # Train Linear Regression
    lr = LinearRegression(featuresCol="features", labelCol="next_close")
    model = lr.fit(data)
    return model, assembler

# --- Main Analysis ---

def analyze_news_stock_range(api_key, start_date, end_date, use_mock_stock=False):
    # Generate date range
    start_date_obj = datetime.strptime(start_date, "%Y-%m-%d").date()
    end_date_obj = datetime.strptime(end_date, "%Y-%m-%d").date()
    if start_date_obj > end_date_obj:
        print("Error: Start date must be before end date")
        return None

    # Collect data
    data = []
    current_date = start_date_obj
    while current_date <= end_date_obj:
        date_str = current_date.strftime("%Y-%m-%d")
        print(f"\nProcessing date: {date_str}")

        # Check future or weekend
        is_future = current_date > datetime.now().date()
        is_weekend_day = is_weekend(current_date)

        if is_future and not use_mock_stock:
            data.append((date_str, "Future date", None, None, None, None, "Future date"))
            current_date += timedelta(days=1)
            continue
        if is_weekend_day and not use_mock_stock:
            data.append((date_str, "Non-trading day", None, None, None, None, "Weekend date"))
            current_date += timedelta(days=1)
            continue

        # Fetch news
        headline, published_at = fetch_news_by_date(api_key, date_str)
        if not headline:
            data.append((date_str, "No news found", None, None, None, None, "No news found"))
            current_date += timedelta(days=1)
            continue

        # Parse news date
        try:
            news_dt = datetime.fromisoformat(published_at.replace("Z", "+00:00"))
            news_date = news_dt.date()
            if news_date != current_date:
                print(f"News date {news_date} does not match {current_date}")
                data.append((date_str, headline, published_at, None, None, None, "News date mismatch"))
                current_date += timedelta(days=1)
                continue
        except Exception as e:
            print(f"Error parsing publishedAt: {e}")
            data.append((date_str, headline, published_at, None, None, None, "Date parsing error"))
            current_date += timedelta(days=1)
            continue

        # Fetch stock
        open_price, close_price, stock_date = fetch_stock_price_by_date('NFLX', date_str, use_mock=use_mock_stock)
        if stock_date is None or stock_date != current_date:
            print(f"Stock date {stock_date} does not match {current_date}")
            data.append((date_str, headline, published_at, None, None, None, "Stock date mismatch or no stock data"))
            current_date += timedelta(days=1)
            continue

        data.append((date_str, headline, published_at, open_price, close_price, stock_date, None))
        current_date += timedelta(days=1)

    # Log data for debugging
    # print("Collected data:", data)

    # Define explicit schema
    schema = StructType([
        StructField("date", StringType(), True),
        StructField("headline", StringType(), True),
        StructField("publishedAt", StringType(), True),
        StructField("price_open", DoubleType(), True),
        StructField("price_close", DoubleType(), True),
        StructField("stock_date", DateType(), True),
        StructField("impact", StringType(), True)
    ])

    # Create Spark DataFrame
    try:
        df = spark.createDataFrame(data, schema)
    except Exception as e:
        print(f"Error creating DataFrame: {e}")
        return None

    # Apply analysis
    df = df.withColumn("sentiment", sentiment_udf(col("headline"))) \
           .withColumn("genre", genre_udf(col("headline"))) \
           .withColumn("audience", audience_udf(col("headline"))) \
           .withColumn("impact",
                       when(col("impact").isNull(),
                            impact_udf(col("sentiment"), col("price_open"), col("price_close"))
                           ).otherwise(col("impact")))

    # Select final columns
    df = df.select("date", "headline", "publishedAt", "genre", "audience", "sentiment", "price_close", "impact")

    return df

# --- Fetch Historical Data for Training ---

def fetch_historical_data(ticker='NFLX', start_date="2024-04-01", end_date="2024-04-30", news_api_key=None):
    # Fetch stock data
    stock = yf.Ticker(ticker)
    df = stock.history(start=start_date, end=end_date)
    stock_data = [
        (row.name.strftime("%Y-%m-%d"), row.Open, row.Close, row.name.date(), "No headline", None)
        for row in df.itertuples()
    ]

    # Fetch news for sentiment
    current_date = datetime.strptime(start_date, "%Y-%m-%d").date()
    end_date_obj = datetime.strptime(end_date, "%Y-%m-%d").date()
    while current_date <= end_date_obj:
        date_str = current_date.strftime("%Y-%m-%d")
        headline, published_at = fetch_news_by_date(news_api_key, date_str)
        if headline:
            stock_data = [
                (d, o, c, sd, headline, published_at) if d == date_str else (d, o, c, sd, h, p)
                for d, o, c, sd, h, p in stock_data
            ]
        current_date += timedelta(days=1)

    # Define schema for historical data
    schema = StructType([
        StructField("date", StringType(), True),
        StructField("price_open", DoubleType(), True),
        StructField("price_close", DoubleType(), True),
        StructField("stock_date", DateType(), True),
        StructField("headline", StringType(), True),
        StructField("publishedAt", StringType(), True)
    ])

    # Create Spark DataFrame
    stock_df = spark.createDataFrame(stock_data, schema)

    # Apply sentiment analysis
    stock_df = stock_df.withColumn("sentiment", sentiment_udf(col("headline")))

    return stock_df

# --- Main Execution ---

API_KEY = "4c5f88c03cba44bf95380e7ee2cb7002"  # Replace with your NewsAPI key
start_date = "2025-05-01"
end_date = "2025-05-10"

# Analyze range
analysis_df = analyze_news_stock_range(API_KEY, start_date, end_date, use_mock_stock=True)
if analysis_df is not None:
    analysis_df.show(truncate=False)
else:
    print("Failed to create analysis DataFrame")

# Train prediction model
historical_df = fetch_historical_data('NFLX', "2024-04-01", "2024-04-30", API_KEY)
model, assembler = train_stock_prediction_model(historical_df)

# Predict for 2025-05-11
latest_data = analysis_df.filter(col("date") == "2025-05-10") \
    .select("price_close", "price_open", "sentiment") \
    .withColumn("sentiment_score",
                when(col("sentiment") == "Positive", 1.0)
                .when(col("sentiment") == "Negative", -1.0)
                .otherwise(0.0)) \
    .select("price_close", "price_open", "sentiment_score")

if latest_data.count() > 0:
    latest_features = assembler.transform(latest_data)
    prediction = model.transform(latest_features)
    predicted_price = prediction.select("prediction").collect()[0][0]
    print(f"\nPredicted NFLX closing price for 2025-05-11: ${predicted_price:.2f}")
else:
    print("\nNo valid data for 2025-05-10 to make a prediction")


Processing date: 2025-05-01
Fetching news for 2025-05-01

Processing date: 2025-05-02
Fetching news for 2025-05-02

Processing date: 2025-05-03
Fetching news for 2025-05-03


ERROR:yfinance:$NFLX: possibly delisted; no price data found  (1d 2025-05-03 -> 2025-05-04)


No stock data for NFLX on 2025-05-03
Stock date None does not match 2025-05-03

Processing date: 2025-05-04
Fetching news for 2025-05-04


ERROR:yfinance:$NFLX: possibly delisted; no price data found  (1d 2025-05-04 -> 2025-05-05)


No stock data for NFLX on 2025-05-04
Stock date None does not match 2025-05-04

Processing date: 2025-05-05
Fetching news for 2025-05-05

Processing date: 2025-05-06
Fetching news for 2025-05-06

Processing date: 2025-05-07
Fetching news for 2025-05-07

Processing date: 2025-05-08
Fetching news for 2025-05-08

Processing date: 2025-05-09
Fetching news for 2025-05-09

Processing date: 2025-05-10
Fetching news for 2025-05-10


ERROR:yfinance:$NFLX: possibly delisted; no price data found  (1d 2025-05-10 -> 2025-05-11)


No stock data for NFLX on 2025-05-10
Stock date None does not match 2025-05-10
Error creating DataFrame: [CANNOT_ACCEPT_OBJECT_IN_TYPE] `DoubleType()` can not accept object `1122.52` in type `float64`.
Failed to create analysis DataFrame


AttributeError: 'Pandas' object has no attribute 'name'

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, when
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, DateType
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from textblob import TextBlob
import pandas as pd
from datetime import datetime, timedelta
import random
import os
import requests
import time

# --- Initialize Spark ---
spark = SparkSession.builder \
    .appName("NewsStockPrediction") \
    .config("spark.sql.shuffle.partitions", "2") \
    .getOrCreate()

# --- Analysis Functions ---

def get_sentiment(text):
    if not text or text == 'No headline':
        return 'Unknown'
    polarity = TextBlob(text).sentiment.polarity
    if polarity > 0.2:
        return 'Positive'
    elif polarity < -0.2:
        return 'Negative'
    else:
        return 'Neutral'

# Register UDF
sentiment_udf = udf(get_sentiment, StringType())

# --- Data Preparation Functions ---

def is_weekend(date_obj):
    return date_obj.weekday() >= 5

def mock_news_by_date(date_str):
    """Simulate news headlines."""
    headlines = [
        "Netflix Releases Blockbuster Thriller Series",
        "Netflix Announces New Comedy Special",
        "Netflix Stock Surges on Strong Earnings",
        "Netflix Faces Criticism Over Price Hike",
        "Netflix Partners with Major Studio",
        "Netflix Expands Youth Programming"
    ]
    published_at = f"{date_str}T{random.randint(8, 18):02d}:00:00Z"
    headline = random.choice(headlines)
    print(f"Mock news for {date_str}: {headline}")
    return headline, published_at

def fetch_news_by_date(api_key, date_str, max_retries=3):
    """Fetch real news from NewsAPI."""
    url = f"https://newsapi.org/v2/everything?q=Netflix&from={date_str}&to={date_str}&language=en&sortBy=publishedAt&apiKey={api_key}"
    for attempt in range(max_retries):
        try:
            print(f"Fetching news for {date_str}, attempt {attempt + 1}")
            response = requests.get(url)
            response.raise_for_status()
            articles = response.json().get('articles', [])
            if not articles:
                print(f"No articles found for {date_str}")
                return None, None
            article = articles[0]
            return article['title'] or 'No headline', article['publishedAt']
        except requests.RequestException as e:
            print(f"Error fetching news for {date_str}: {e}")
            if attempt < max_retries - 1:
                time.sleep(2 ** attempt)
            else:
                return None, None

# --- Cache Functions ---

def cache_data(data, filename):
    if not data:
        print(f"No data to cache in {filename}")
        return
    try:
        pd.DataFrame(data).to_csv(filename, index=False)
        print(f"Cached data to {filename}")
    except Exception as e:
        print(f"Error caching data to {filename}: {e}")

def load_cached_data(filename):
    if not os.path.exists(filename):
        print(f"No cache file found: {filename}")
        return None
    try:
        df = pd.read_csv(filename)
        if df.empty or df.columns.empty:
            print(f"Cache file {filename} is empty or invalid")
            return None
        data = df.to_dict('records')
        for row in data:
            if pd.notnull(row.get('stock_date')):
                row['stock_date'] = pd.to_datetime(row['stock_date']).date()
        print(f"Loaded cached data from {filename}")
        return data
    except Exception as e:
        print(f"Error loading cache file {filename}: {e}")
        return None

# --- Fetch Historical Data ---

def fetch_historical_data(start_date="2025-04-01", end_date="2025-04-29", news_api_key=None):
    cache_file = "historical_cache.csv"
    cached_data = load_cached_data(cache_file)
    if cached_data:
        stock_data = cached_data
    else:
        # Provided historical stock data
        stock_data_raw = [
            {"Date": "2025-04-01", "Open": 927.50, "Close": 928.38},
            {"Date": "2025-04-02", "Open": 923.00, "Close": 935.52},
            {"Date": "2025-04-03", "Open": 901.80, "Close": 917.05},
            {"Date": "2025-04-04", "Open": 896.50, "Close": 855.86},
            {"Date": "2025-04-07", "Open": 827.85, "Close": 867.83},
            {"Date": "2025-04-08", "Open": 912.44, "Close": 870.40},
            {"Date": "2025-04-09", "Open": 855.93, "Close": 945.47},
            {"Date": "2025-04-10", "Open": 931.94, "Close": 921.17},
            {"Date": "2025-04-11", "Open": 920.00, "Close": 918.29},
            {"Date": "2025-04-14", "Open": 932.70, "Close": 931.28},
            {"Date": "2025-04-15", "Open": 950.00, "Close": 976.28},
            {"Date": "2025-04-16", "Open": 976.28, "Close": 961.63},
            {"Date": "2025-04-17", "Open": 969.00, "Close": 973.03},
            {"Date": "2025-04-21", "Open": 984.40, "Close": 987.91},
            {"Date": "2025-04-22", "Open": 1005.30, "Close": 1040.34},
            {"Date": "2025-04-23", "Open": 1047.22, "Close": 1049.59},
            {"Date": "2025-04-24", "Open": 1048.00, "Close": 1096.87},
            {"Date": "2025-04-25", "Open": 1097.04, "Close": 1101.53},
            {"Date": "2025-04-28", "Open": 1100.00, "Close": 1110.38},
            {"Date": "2025-04-29", "Open": 1103.92, "Close": 1125.64}
        ]
        stock_data = []
        start_date_obj = datetime.strptime(start_date, "%Y-%m-%d").date()
        end_date_obj = datetime.strptime(end_date, "%Y-%m-%d").date()
        current_date = start_date_obj
        while current_date <= end_date_obj:
            date_str = current_date.strftime("%Y-%m-%d")
            if is_weekend(current_date):
                stock_data.append((date_str, None, None, None, "Non-trading day", None))
                current_date += timedelta(days=1)
                continue
            # Find stock data for the date
            stock_entry = next((entry for entry in stock_data_raw if entry["Date"] == date_str), None)
            if stock_entry:
                open_price = round(float(stock_entry["Open"]), 2)
                close_price = round(float(stock_entry["Close"]), 2)
                stock_date = datetime.strptime(date_str, "%Y-%m-%d").date()
                # Try NewsAPI for April 12, else use mock news
                if date_str == "2025-04-12" and news_api_key:
                    headline, published_at = fetch_news_by_date(news_api_key, date_str)
                    if not headline:
                        print(f"Falling back to mock news for {date_str}")
                        headline, published_at = mock_news_by_date(date_str)
                else:
                    headline, published_at = mock_news_by_date(date_str)
                stock_data.append((date_str, open_price, close_price, stock_date, headline or "No headline", published_at))
            else:
                stock_data.append((date_str, None, None, None, "No data", None))
            current_date += timedelta(days=1)
        # Cache data
        cache_data(stock_data, cache_file)
    # Define schema
    schema = StructType([
        StructField("date", StringType(), True),
        StructField("price_open", DoubleType(), True),
        StructField("price_close", DoubleType(), True),
        StructField("stock_date", DateType(), True),
        StructField("headline", StringType(), True),
        StructField("publishedAt", StringType(), True)
    ])
    # Create Spark DataFrame
    try:
        stock_df = spark.createDataFrame(stock_data, schema)
    except Exception as e:
        print(f"Error creating historical DataFrame: {e}")
        return None
    # Apply sentiment analysis
    stock_df = stock_df.withColumn("sentiment", sentiment_udf(col("headline")))
    return stock_df

# --- Prediction Function ---

def train_stock_prediction_model(historical_data):
    if historical_data is None:
        print("No historical data to train model")
        return None, None
    # Prepare features: sentiment score, previous day's close, open
    data = historical_data.withColumn("sentiment_score",
        when(col("sentiment") == "Positive", 1.0)
        .when(col("sentiment") == "Negative", -1.0)
        .otherwise(0.0))
    # Create feature vector
    assembler = VectorAssembler(
        inputCols=["sentiment_score", "price_close", "price_open"],
        outputCol="features",
        handleInvalid="skip"
    )
    # Shift data to predict next day's close
    from pyspark.sql.window import Window
    from pyspark.sql.functions import lag
    w = Window.orderBy("date")
    data = data.withColumn("next_close", lag("price_close", -1).over(w))
    # Drop rows with null values
    data = data.dropna(subset=["next_close", "price_close", "price_open", "sentiment_score"])
    # Check if data is empty
    if data.count() == 0:
        print("No valid data for training after cleaning")
        return None, None
    # Assemble features
    data = assembler.transform(data)
    # Train Linear Regression
    lr = LinearRegression(featuresCol="features", labelCol="next_close")
    model = lr.fit(data)
    return model, assembler

# --- Analyze Patterns ---

def analyze_sentiment_patterns(historical_df):
    """Analyze correlation between sentiment and price changes."""
    if historical_df is None:
        return None
    # Calculate daily price change
    df = historical_df.withColumn("price_change",
        (col("price_close") - col("price_open")) / col("price_open") * 100)
    # Filter trading days
    df = df.filter(~col("headline").isin(["Non-trading day", "No data"]))
    # Group by sentiment
    sentiment_stats = df.groupBy("sentiment").agg(
        {"price_change": "avg", "date": "count"}
    ).withColumnRenamed("avg(price_change)", "avg_price_change") \
     .withColumnRenamed("count(date)", "num_days")
    # Calculate correlation: Positive sentiment → price increase
    positive_up = df.filter((col("sentiment") == "Positive") & (col("price_change") > 0)).count()
    positive_total = df.filter(col("sentiment") == "Positive").count()
    negative_down = df.filter((col("sentiment") == "Negative") & (col("price_change") < 0)).count()
    negative_total = df.filter(col("sentiment") == "Negative").count()
    patterns = {
        "sentiment_stats": sentiment_stats.collect(),
        "positive_up_pct": (positive_up / positive_total * 100) if positive_total > 0 else 0,
        "negative_down_pct": (negative_down / negative_total * 100) if negative_total > 0 else 0
    }
    return patterns

# --- Predict Next Trading Day ---

def predict_next_trading_day(historical_df, model, assembler, news_api_key, predict_date="2025-04-12"):
    """Predict closing price for the next trading day after predict_date."""
    predict_date_obj = datetime.strptime(predict_date, "%Y-%m-%d").date()
    # Find next trading day
    next_trading_date = predict_date_obj
    while is_weekend(next_trading_date) or next_trading_date > datetime.strptime("2025-04-29", "%Y-%m-%d").date():
        next_trading_date += timedelta(days=1)
    next_trading_date_str = next_trading_date.strftime("%Y-%m-%d")
    print(f"Predicting for next trading day: {next_trading_date_str}")
    # Get news for predict_date
    headline, published_at = fetch_news_by_date(news_api_key, predict_date) if news_api_key else (None, None)
    if not headline:
        print(f"Falling back to mock news for {predict_date}")
        headline, published_at = mock_news_by_date(predict_date)
    sentiment = get_sentiment(headline)
    # Get last trading day's data (before predict_date)
    last_trading_date = predict_date_obj - timedelta(days=1)
    while is_weekend(last_trading_date) or last_trading_date < datetime.strptime("2025-04-01", "%Y-%m-%d").date():
        last_trading_date -= timedelta(days=1)
    last_trading_date_str = last_trading_date.strftime("%Y-%m-%d")
    last_day = historical_df.filter(col("date") == last_trading_date_str) \
        .select("price_close", "price_open", "sentiment") \
        .collect()
    if not last_day:
        print(f"No data for {last_trading_date_str} to make prediction")
        return None, None, None
    last_row = last_day[0]
    # Create prediction DataFrame
    prediction_data = [{
        "price_close": last_row["price_close"],
        "price_open": last_row["price_open"],
        "sentiment": sentiment
    }]
    schema = StructType([
        StructField("price_close", DoubleType(), True),
        StructField("price_open", DoubleType(), True),
        StructField("sentiment", StringType(), True)
    ])
    pred_df = spark.createDataFrame(prediction_data, schema)
    # Add sentiment score
    pred_df = pred_df.withColumn("sentiment_score",
        when(col("sentiment") == "Positive", 1.0)
        .when(col("sentiment") == "Negative", -1.0)
        .otherwise(0.0)) \
        .select("price_close", "price_open", "sentiment_score")
    # Transform and predict
    pred_features = assembler.transform(pred_df)
    prediction = model.transform(pred_features)
    predicted_price = prediction.select("prediction").collect()[0][0]
    return predicted_price, headline, sentiment

# --- Main Execution ---

# Set NewsAPI key (replace with your actual key)
API_KEY = "4c5f88c03cba44bf95380e7ee2cb7002"

# Fetch historical data
historical_df = fetch_historical_data("2025-04-01", "2025-04-29", API_KEY)
if historical_df is None:
    print("Failed to create historical DataFrame")
else:
    # print(highest_data = spark.createDataFrame(stock_data, schema))
    print("dfdfdfdfdf")
    # except Exception as e:
    #     print(f"Error creating historical DataFrame: {e}")
    #     return None
    # Apply sentiment analysis
    stock_df = stock_df.withColumn("sentiment", sentiment_udf(col("headline")))
    # return stock_df

# --- Prediction Function ---

def train_stock_prediction_model(historical_data):
    if historical_data is None:
        print("No historical data to train model")
        return None, None
    # Prepare features: sentiment score, previous day's close, open
    data = historical_data.withColumn("sentiment_score",
        when(col("sentiment") == "Positive", 1.0)
        .when(col("sentiment") == "Negative", -1.0)
        .otherwise(0.0))
    # Create feature vector
    assembler = VectorAssembler(
        inputCols=["sentiment_score", "price_close", "price_open"],
        outputCol="features",
        handleInvalid="skip"
    )
    # Shift data to predict next day's close
    from pyspark.sql.window import Window
    from pyspark.sql.functions import lag
    w = Window.orderBy("date")
    data = data.withColumn("next_close", lag("price_close", -1).over(w))
    # Drop rows with null values
    data = data.dropna(subset=["next_close", "price_close", "price_open", "sentiment_score"])
    # Check if data is empty
    if data.count() == 0:
        print("No valid data for training after cleaning")
        return None, None
    # Assemble features
    data = assembler.transform(data)
    # Train Linear Regression
    lr = LinearRegression(featuresCol="features", labelCol="next_close")
    model = lr.fit(data)
    return model, assembler

# --- Analyze Patterns ---

def analyze_sentiment_patterns(historical_df):
    """Analyze correlation between sentiment and price changes."""
    if historical_df is None:
        return None
    # Calculate daily price change
    df = historical_df.withColumn("price_change",
        (col("price_close") - col("price_open")) / col("price_open") * 100)
    # Filter trading days
    df = df.filter(~col("headline").isin(["Non-trading day", "No data"]))
    # Group by sentiment
    sentiment_stats = df.groupBy("sentiment").agg(
        {"price_change": "avg", "date": "count"}
    ).withColumnRenamed("avg(price_change)", "avg_price_change") \
     .withColumnRenamed("count(date)", "num_days")
    # Calculate correlation: Positive sentiment → price increase
    positive_up = df.filter((col("sentiment") == "Positive") & (col("price_change") > 0)).count()
    positive_total = df.filter(col("sentiment") == "Positive").count()
    negative_down = df.filter((col("sentiment") == "Negative") & (col("price_change") < 0)).count()
    negative_total = df.filter(col("sentiment") == "Negative").count()
    patterns = {
        "sentiment_stats": sentiment_stats.collect(),
        "positive_up_pct": (positive_up / positive_total * 100) if positive_total > 0 else 0,
        "negative_down_pct": (negative_down / negative_total * 100) if negative_total > 0 else 0
    }
    return patterns

# --- Predict Next Trading Day ---

def predict_next_trading_day(historical_df, model, assembler, news_api_key, predict_date="2025-04-12"):
    """Predict closing price for the next trading day after predict_date."""
    predict_date_obj = datetime.strptime(predict_date, "%Y-%m-%d").date()
    # Find next trading day
    next_trading_date = predict_date_obj
    while is_weekend(next_trading_date) or next_trading_date > datetime.strptime("2025-04-29", "%Y-%m-%d").date():
        next_trading_date += timedelta(days=1)
    next_trading_date_str = next_trading_date.strftime("%Y-%m-%d")
    print(f"Predicting for next trading day: {next_trading_date_str}")
    # Get news for predict_date
    headline, published_at = fetch_news_by_date(news_api_key, predict_date) if news_api_key else (None, None)
    if not headline:
        print(f"Falling back to mock news for {predict_date}")
        headline, published_at = mock_news_by_date(predict_date)
    sentiment = get_sentiment(headline)
    # Get last trading day's data (before predict_date)
    last_trading_date = predict_date_obj - timedelta(days=1)
    while is_weekend(last_trading_date) or last_trading_date < datetime.strptime("2025-04-01", "%Y-%m-%d").date():
        last_trading_date -= timedelta(days=1)
    last_trading_date_str = last_trading_date.strftime("%Y-%m-%d")
    last_day = historical_df.filter(col("date") == last_trading_date_str) \
        .select("price_close", "price_open", "sentiment") \
        .collect()
    if not last_day:
        print(f"No data for {last_trading_date_str} to make prediction")
        return None, None, None
    last_row = last_day[0]
    # Create prediction DataFrame
    prediction_data = [{
        "price_close": last_row["price_close"],
        "price_open": last_row["price_open"],
        "sentiment": sentiment
    }]
    schema = StructType([
        StructField("price_close", DoubleType(), True),
        StructField("price_open", DoubleType(), True),
        StructField("sentiment", StringType(), True)
    ])
    pred_df = spark.createDataFrame(prediction_data, schema)
    # Add sentiment score
    pred_df = pred_df.withColumn("sentiment_score",
        when(col("sentiment") == "Positive", 1.0)
        .when(col("sentiment") == "Negative", -1.0)
        .otherwise(0.0)) \
        .select("price_close", "price_open", "sentiment_score")
    # Transform and predict
    pred_features = assembler.transform(pred_df)
    prediction = model.transform(pred_features)
    predicted_price = prediction.select("prediction").collect()[0][0]
    return predicted_price, headline, sentiment

# --- Main Execution ---

# Set NewsAPI key (replace with your actual key)
API_KEY = "4c5f88c03cba44bf95380e7ee2cb7002"

# Fetch historical data
historical_df = fetch_historical_data("2025-04-01", "2025-04-29", API_KEY)
if historical_df is None:
    print("Failed to create historical DataFrame")
else:
    print("\nHistorical Data:")
    historical_df.select("date", "headline", "sentiment", "price_open", "price_close").show(truncate=False)

    # Train model
    model, assembler = train_stock_prediction_model(historical_df)

    # Analyze sentiment patterns
    patterns = analyze_sentiment_patterns(historical_df)
    if patterns:
        print("\nSentiment Patterns:")
        for row in patterns["sentiment_stats"]:
            print(f"Sentiment: {row['sentiment']}, Days: {row['num_days']}, Avg Price Change: {row['avg_price_change']:.2f}%")
        print(f"Positive sentiment → Price increase: {patterns['positive_up_pct']:.2f}% of days")
        print(f"Negative sentiment → Price decrease: {patterns['negative_down_pct']:.2f}% of days")

    # Predict for April 14, 2025 (next trading day after April 12)
    if model is not None and assembler is not None:
        predicted_price, headline, sentiment = predict_next_trading_day(historical_df, model, assembler, API_KEY, "2025-04-12")
        if predicted_price:
            print(f"\nPrediction for 2025-04-14:")
            print(f"News headline (2025-04-12): {headline}")
            print(f"Sentiment: {sentiment}")
            print(f"Predicted NFLX closing price: ${predicted_price:.2f}")
            # Compare with actual (if available)
            actual = historical_df.filter(col("date") == "2025-04-14").select("price_close").collect()
            if actual:
                print(f"Actual closing price: ${actual[0]['price_close']:.2f}")
        else:
            print("Cannot predict due to missing data")
    else:
        print("Failed to train prediction model")

Loaded cached data from historical_cache.csv
dfdfdfdfdf


NameError: name 'stock_df' is not defined

## GPT Output

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import DoubleType
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from textblob import TextBlob
import pandas as pd
from datetime import datetime

# --- Initialize Spark Session ---
spark = SparkSession.builder \
    .appName("NewsStockAnalysisPrediction") \
    .getOrCreate()

# --- Sample Stock Data for April 2025 ---
data = [
    ("2025-04-01", 927.50, 932.29, 911.50, 928.38, 3520000),
    ("2025-04-02", 923.00, 949.55, 916.11, 935.52, 3256900),
    ("2025-04-03", 901.80, 946.59, 900.47, 917.05, 5864600),
    ("2025-04-04", 896.50, 906.63, 853.87, 855.86, 6798800),
    ("2025-04-07", 827.85, 906.74, 821.10, 867.83, 6656800),
    ("2025-04-08", 912.44, 922.42, 857.70, 870.40, 5625400),
    ("2025-04-09", 855.93, 951.43, 854.40, 945.47, 7498000),
    ("2025-04-10", 931.94, 941.24, 894.00, 921.17, 5129800),
    ("2025-04-11", 920.00, 944.86, 906.68, 918.29, 4073600),
    ("2025-04-14", 932.70, 948.98, 919.50, 931.28, 4035900),
    ("2025-04-15", 950.00, 993.45, 948.00, 976.28, 7712000),
    ("2025-04-16", 976.28, 981.21, 949.17, 961.63, 6066100),
    ("2025-04-17", 969.00, 984.70, 956.00, 973.03, 8763200),
    ("2025-04-21", 984.40, 1019.00, 973.05, 987.91, 9775700),
    ("2025-04-22", 1005.30, 1064.97, 1004.52, 1040.34, 9368900),
    ("2025-04-23", 1047.22, 1061.25, 1032.00, 1049.59, 6471900),
    ("2025-04-24", 1048.00, 1101.00, 1047.02, 1096.87, 6381900),
    ("2025-04-25", 1097.04, 1106.80, 1091.00, 1101.53, 3950000),
    ("2025-04-28", 1100.00, 1114.00, 1082.62, 1110.38, 3831100),
    ("2025-04-29", 1103.92, 1127.81, 1095.48, 1125.64, 3777300),
]


headlines = [
    "Netflix Releases New Thriller Series",
    "Netflix Stock Surges on New Drama Release",
    "Netflix Faces Criticism Over Content",
    "Netflix Announces New Comedy Special",
    "Netflix Expands Youth Programming",
    "Netflix Signs Deal with Disney",
    "Netflix Reports Record Profits",
    "Netflix Delays Major Release",
    "Netflix Drops Surprise Documentary",
    "Netflix Gets Mixed Reviews on New Show",
    "Netflix Enters Gaming Market",
    "Netflix Launches Gen Z Content",
    "Netflix Cancels Popular Series",
    "Netflix Renews Fan Favorite",
    "Netflix Faces User Backlash",
    "Netflix Adds Interactive Show",
    "Netflix Partners with HBO",
    "Netflix Cuts Subscription Price",
    "Netflix Faces Ad Boycott"
]

# --- Convert to Pandas ---
columns = ["Date", "Open", "High", "Low", "Close", "Volume"]
pdf = pd.DataFrame(data, columns=columns)
pdf["Date"] = pd.to_datetime(pdf["Date"])
mock_headlines = (headlines * ((len(pdf) // len(headlines)) + 1))[:len(pdf)]
pdf["headline"] = mock_headlines

# --- Sentiment Calculation ---
def get_sentiment(text):
    return TextBlob(text).sentiment.polarity if text else 0.0

pdf["sentiment"] = pdf["headline"].apply(get_sentiment)

# --- Use Previous Day's Sentiment ---
pdf["prev_sentiment"] = pdf["sentiment"].shift(1)
pdf = pdf.dropna().reset_index(drop=True)  # Remove first row (no prev sentiment)

# --- Convert to Spark DataFrame ---
sdf = spark.createDataFrame(pdf)

# --- Feature Vector ---
assembler = VectorAssembler(
    inputCols=["Open", "High", "Low", "Volume", "prev_sentiment"],
    outputCol="features"
)
assembled_df = assembler.transform(sdf)

# --- Train Model ---
train_data = assembled_df.select("features", "Close")
lr = LinearRegression(featuresCol="features", labelCol="Close")
model = lr.fit(train_data)

# --- Predict for Next Day using last available day's stock + sentiment ---
last_row = pdf.iloc[-1]

features_dict = {
    "Open": float(last_row["Open"]),
    "High": float(last_row["High"]),
    "Low": float(last_row["Low"]),
    "Volume": float(last_row["Volume"]),
    "prev_sentiment": float(last_row["sentiment"])
}

predict_df = spark.createDataFrame([features_dict])
predict_vector = assembler.transform(predict_df)
prediction = model.transform(predict_vector).collect()[0]["prediction"]

# --- Output ---
tomorrow_date = last_row["Date"].date() + timedelta(days=1)
print(f"\n📅 Prediction for {tomorrow_date} based on headline of {last_row['Date'].date()}:")
print(f"📰 Headline: {last_row['headline']}")
print(f"🧠 Sentiment Score: {last_row['sentiment']:.2f}")
print(f"📈 Predicted Close Price: {prediction:.2f}")


📅 Prediction for 2025-04-30 based on headline of 2025-04-29:
📰 Headline: Netflix Releases New Thriller Series
🧠 Sentiment Score: 0.14
📈 Predicted Close Price: 1121.64


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, when, lit
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, DateType
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from textblob import TextBlob
import pandas as pd
from datetime import datetime, date, timedelta
import random
import os

# --- Initialize Spark ---
spark = SparkSession.builder \
    .appName("NewsStockAnalysisPrediction") \
    .config("spark.sql.shuffle.partitions", "2") \
    .getOrCreate()

# --- Analysis Functions ---

def get_sentiment(text):
    if not text or text == 'No headline':
        return 'Unknown'
    polarity = TextBlob(text).sentiment.polarity
    if polarity > 0.2:
        return 'Positive'
    elif polarity < -0.2:
        return 'Negative'
    else:
        return 'Neutral'

def infer_genre(text):
    if not text or text == 'No headline':
        return 'Unknown'
    if 'thriller' in text.lower(): return 'Thriller'
    elif 'drama' in text.lower(): return 'Drama'
    elif 'comedy' in text.lower(): return 'Comedy'
    elif 'romance' in text.lower(): return 'Romance'
    else: return 'Other'

def infer_audience(text):
    if not text or text == 'No headline':
        return 'Unknown'
    if any(x in text.lower() for x in ['teen', 'gen z', 'youth']): return 'Youth'
    elif 'adult' in text.lower(): return 'Adults'
    else: return 'General'

def calculate_impact(sentiment, open_price, close_price):
    if sentiment == 'Unknown' or open_price is None or close_price is None:
        return 'Unknown'
    change = (close_price - open_price) / open_price * 100
    if sentiment == 'Positive' and change > 0:
        return 'Positive'
    elif sentiment == 'Negative' and change < 0:
        return 'Negative'
    else:
        return 'Neutral'

# Register UDFs
sentiment_udf = udf(get_sentiment, StringType())
genre_udf = udf(infer_genre, StringType())
audience_udf = udf(infer_audience, StringType())
impact_udf = udf(calculate_impact, StringType())

# --- Data Fetching Functions ---

def is_weekend(date_obj):
    return date_obj.weekday() >= 5

def mock_news_by_date(date_str):
    """Simulate news headlines."""
    headlines = [
        "Netflix Releases New Thriller Series",
        "Netflix Announces New Comedy Special",
        "Netflix Stock Surges on New Drama Release",
        "Netflix Faces Criticism Over Content",
        "Netflix Partners for New Series",
        "Netflix Expands Youth Programming"
    ]
    published_at = f"{date_str}T{random.randint(8, 18):02d}:00:00Z"
    if random.random() < 0.3:  # 30% chance of no news
        print(f"No mock news for {date_str}")
        return None, None
    headline = random.choice(headlines)
    print(f"Mock news for {date_str}: {headline}")
    return headline, published_at

def mock_stock_price(date_str, sentiment=None):
    """Simulate stock prices, tied to sentiment."""
    open_price = 600.00 + random.uniform(-10, 10)
    if sentiment == "Positive":
        close_price = open_price + random.uniform(0, 5)
    elif sentiment == "Negative":
        close_price = open_price + random.uniform(-5, 0)
    else:
        close_price = open_price + random.uniform(-5, 5)
    return round(open_price, 2), round(close_price, 2), datetime.strptime(date_str, "%Y-%m-%d").date()

# --- Cache Functions ---

def cache_data(data, filename):
    if not data:
        print(f"No data to cache in {filename}")
        return
    try:
        pd.DataFrame(data).to_csv(filename, index=False)
        print(f"Cached data to {filename}")
    except Exception as e:
        print(f"Error caching data to {filename}: {e}")

def load_cached_data(filename):
    if not os.path.exists(filename):
        print(f"No cache file found: {filename}")
        return None
    try:
        df = pd.read_csv(filename)
        if df.empty or df.columns.empty:
            print(f"Cache file {filename} is empty or invalid")
            return None
        data = df.to_dict('records')
        for row in data:
            if pd.notnull(row.get('stock_date')):
                row['stock_date'] = pd.to_datetime(row['stock_date']).date()
        print(f"Loaded cached data from {filename}")
        return data
    except Exception as e:
        print(f"Error loading cache file {filename}: {e}")
        return None

# --- Main Analysis ---

def analyze_news_stock_range(start_date, end_date):
    cache_file = "news_stock_cache.csv"
    cached_data = load_cached_data(cache_file)
    if cached_data:
        data = cached_data
    else:
        # Generate date range
        start_date_obj = datetime.strptime(start_date, "%Y-%m-%d").date()
        end_date_obj = datetime.strptime(end_date, "%Y-%m-%d").date()
        if start_date_obj > end_date_obj:
            print("Error: Start date must be before end date")
            return None

        # Collect data
        data = []
        current_date = start_date_obj
        while current_date <= end_date_obj:
            date_str = current_date.strftime("%Y-%m-%d")
            print(f"\nProcessing date: {date_str}")

            # Check weekend
            if is_weekend(current_date):
                data.append((date_str, "Non-trading day", None, None, None, None, "Weekend date"))
                current_date += timedelta(days=1)
                continue

            # Fetch mock news
            headline, published_at = mock_news_by_date(date_str)
            sentiment = get_sentiment(headline) if headline else "Unknown"
            if not headline:
                data.append((date_str, "No news found", None, None, None, None, "No news found"))
                current_date += timedelta(days=1)
                continue

            # Parse news date
            try:
                news_dt = datetime.fromisoformat(published_at.replace("Z", "+00:00"))
                news_date = news_dt.date()
                if news_date != current_date:
                    print(f"News date {news_date} does not match {current_date}")
                    data.append((date_str, headline, published_at, None, None, None, "News date mismatch"))
                    current_date += timedelta(days=1)
                    continue
            except Exception as e:
                print(f"Error parsing publishedAt: {e}")
                data.append((date_str, headline, published_at, None, None, None, "Date parsing error"))
                current_date += timedelta(days=1)
                continue

            # Fetch mock stock
            open_price, close_price, stock_date = mock_stock_price(date_str, sentiment)
            if stock_date is None or stock_date != current_date:
                print(f"Stock date {stock_date} does not match {current_date}")
                data.append((date_str, headline, published_at, None, None, None, "Stock date mismatch"))
                current_date += timedelta(days=1)
                continue

            data.append((date_str, headline, published_at, open_price, close_price, stock_date, None))
            current_date += timedelta(days=1)

        # Cache data
        cache_data(data, cache_file)

    # Define explicit schema
    schema = StructType([
        StructField("date", StringType(), True),
        StructField("headline", StringType(), True),
        StructField("publishedAt", StringType(), True),
        StructField("price_open", DoubleType(), True),
        StructField("price_close", DoubleType(), True),
        StructField("stock_date", DateType(), True),
        StructField("impact", StringType(), True)
    ])

    # Create Spark DataFrame
    try:
        df = spark.createDataFrame(data, schema)
    except Exception as e:
        print(f"Error creating DataFrame: {e}")
        return None

    # Apply analysis
    df = df.withColumn("sentiment", sentiment_udf(col("headline"))) \
           .withColumn("genre", genre_udf(col("headline"))) \
           .withColumn("audience", audience_udf(col("headline"))) \
           .withColumn("impact",
                       when(col("impact").isNull(),
                            impact_udf(col("sentiment"), col("price_open"), col("price_close"))
                           ).otherwise(col("impact")))

    # Select final columns
    df = df.select("date", "headline", "publishedAt", "genre", "audience", "sentiment", "price_close", "impact")

    return df

# --- Fetch Historical Data for Training ---

def fetch_historical_data(ticker='NFLX', start_date="2024-04-01", end_date="2024-04-30"):
    cache_file = "historical_cache.csv"
    cached_data = load_cached_data(cache_file)
    if cached_data:
        stock_data = cached_data
    else:
        # Generate date range
        start_date_obj = datetime.strptime(start_date, "%Y-%m-%d").date()
        end_date_obj = datetime.strptime(end_date, "%Y-%m-%d").date()
        stock_data = []
        current_date = start_date_obj
        while current_date <= end_date_obj:
            date_str = current_date.strftime("%Y-%m-%d")
            if is_weekend(current_date):
                stock_data.append((date_str, None, None, None, "Non-trading day", None))
                current_date += timedelta(days=1)
                continue
            # Fetch mock news
            headline, published_at = mock_news_by_date(date_str)
            sentiment = get_sentiment(headline) if headline else "Unknown"
            # Fetch mock stock
            open_price, close_price, stock_date = mock_stock_price(date_str, sentiment)
            stock_data.append((date_str, open_price, close_price, stock_date, headline or "No headline", published_at))
            current_date += timedelta(days=1)

        # Cache data
        cache_data(stock_data, cache_file)

    # Define schema for historical data
    schema = StructType([
        StructField("date", StringType(), True),
        StructField("price_open", DoubleType(), True),
        StructField("price_close", DoubleType(), True),
        StructField("stock_date", DateType(), True),
        StructField("headline", StringType(), True),
        StructField("publishedAt", StringType(), True)
    ])

    # Create Spark DataFrame
    try:
        stock_df = spark.createDataFrame(stock_data, schema)
    except Exception as e:
        print(f"Error creating historical DataFrame: {e}")
        return None

    # Apply sentiment analysis
    stock_df = stock_df.withColumn("sentiment", sentiment_udf(col("headline")))

    return stock_df

# --- Prediction Function ---

def train_stock_prediction_model(historical_data):
    if historical_data is None:
        print("No historical data to train model")
        return None, None

    # Prepare features: sentiment score, previous day's close, open
    data = historical_data.withColumn("sentiment_score",
        when(col("sentiment") == "Positive", 1.0)
        .when(col("sentiment") == "Negative", -1.0)
        .otherwise(0.0))

    # Create feature vector
    assembler = VectorAssembler(
        inputCols=["sentiment_score", "price_close", "price_open"],
        outputCol="features",
        handleInvalid="skip"
    )

    # Shift data to predict next day's close
    from pyspark.sql.window import Window
    from pyspark.sql.functions import lag
    w = Window.orderBy("date")
    data = data.withColumn("next_close", lag("price_close", -1).over(w))

    # Drop rows with null values
    data = data.dropna(subset=["next_close", "price_close", "price_open", "sentiment_score"])

    # Check if data is empty
    if data.count() == 0:
        print("No valid data for training after cleaning")
        return None, None

    # Assemble features
    data = assembler.transform(data)

    # Train Linear Regression
    lr = LinearRegression(featuresCol="features", labelCol="next_close")
    model = lr.fit(data)
    return model, assembler

# --- Main Execution ---

# Analyze range
start_date = "2025-05-01"
end_date = "2025-05-14"  # Extended to include current date (2025-05-14)
analysis_df = analyze_news_stock_range(start_date, end_date)
if analysis_df is not None:
    analysis_df.show(truncate=False)
else:
    print("Failed to create analysis DataFrame")

# Train prediction model
historical_df = fetch_historical_data('NFLX', "2024-04-01", "2024-04-30")
model, assembler = train_stock_prediction_model(historical_df)

# Dynamic prediction based on current date
if model is not None and assembler is not None:
    # Get current date and previous day
    current_date = datetime.now().date()
    previous_date = current_date - timedelta(days=1)
    previous_date_str = previous_date.strftime("%Y-%m-%d")
    tomorrow_date = current_date + timedelta(days=1)

    # Fetch previous day's data
    previous_data = analysis_df.filter(col("date") == previous_date_str) \
        .select("date", "headline", "sentiment", "price_close", "price_open") \
        .withColumn("sentiment_score",
                    when(col("sentiment") == "Positive", 1.0)
                    .when(col("sentiment") == "Negative", -1.0)
                    .otherwise(0.0)) \
        .select("date", "headline", "sentiment", "price_close", "price_open", "sentiment_score")

    if previous_data.count() > 0:
        # Convert to pandas for easier access
        previous_row = previous_data.toPandas().iloc[0]

        # Print previous day's data
        print(f"\n📅 Data for {previous_date}:")
        print(f"📰 Headline: {previous_row['headline']}")
        print(f"🧠 Sentiment: {previous_row['sentiment']}")
        print(f"📈 Close Price: ${previous_row['price_close']:.2f}")

        # Prepare features for prediction
        latest_features = assembler.transform(previous_data)
        prediction = model.transform(latest_features)
        predicted_price = prediction.select("prediction").collect()[0][0]

        # Print prediction for tomorrow
        print(f"\n📅 Prediction for {tomorrow_date}:")
        print(f"📰 Based on Headline from {previous_date}: {previous_row['headline']}")
        print(f"🧠 Sentiment: {previous_row['sentiment']}")
        print(f"📈 Predicted Close Price: ${predicted_price:.2f}")
    else:
        print(f"No valid data for {previous_date_str} to make a prediction")
else:
    print("Failed to train prediction model")

No cache file found: news_stock_cache.csv

Processing date: 2025-05-01
No mock news for 2025-05-01

Processing date: 2025-05-02
Mock news for 2025-05-02: Netflix Announces New Comedy Special

Processing date: 2025-05-03

Processing date: 2025-05-04

Processing date: 2025-05-05
Mock news for 2025-05-05: Netflix Releases New Thriller Series

Processing date: 2025-05-06
Mock news for 2025-05-06: Netflix Partners for New Series

Processing date: 2025-05-07
Mock news for 2025-05-07: Netflix Stock Surges on New Drama Release

Processing date: 2025-05-08
No mock news for 2025-05-08

Processing date: 2025-05-09
Mock news for 2025-05-09: Netflix Expands Youth Programming

Processing date: 2025-05-10

Processing date: 2025-05-11

Processing date: 2025-05-12
Mock news for 2025-05-12: Netflix Stock Surges on New Drama Release

Processing date: 2025-05-13
Mock news for 2025-05-13: Netflix Partners for New Series

Processing date: 2025-05-14
Mock news for 2025-05-14: Netflix Announces New Comedy Spe

AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `price_open` cannot be resolved. Did you mean one of the following? [`price_close`, `genre`, `audience`, `date`, `headline`].;
'Project [date#0, headline#1, sentiment#15, price_close#4, 'price_open]
+- Filter (date#0 = 2025-05-13)
   +- Project [date#0, headline#1, publishedAt#2, genre#25, audience#36, sentiment#15, price_close#4, impact#48]
      +- Project [date#0, headline#1, publishedAt#2, price_open#3, price_close#4, stock_date#5, CASE WHEN isnull(impact#6) THEN calculate_impact(sentiment#15, price_open#3, price_close#4)#47 ELSE impact#6 END AS impact#48, sentiment#15, genre#25, audience#36]
         +- Project [date#0, headline#1, publishedAt#2, price_open#3, price_close#4, stock_date#5, impact#6, sentiment#15, genre#25, infer_audience(headline#1)#35 AS audience#36]
            +- Project [date#0, headline#1, publishedAt#2, price_open#3, price_close#4, stock_date#5, impact#6, sentiment#15, infer_genre(headline#1)#24 AS genre#25]
               +- Project [date#0, headline#1, publishedAt#2, price_open#3, price_close#4, stock_date#5, impact#6, get_sentiment(headline#1)#14 AS sentiment#15]
                  +- LogicalRDD [date#0, headline#1, publishedAt#2, price_open#3, price_close#4, stock_date#5, impact#6], false


## Working Code

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, when, lit
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, DateType
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from textblob import TextBlob
import pandas as pd
from datetime import datetime, date, timedelta
import random
import os

# --- Initialize Spark ---
spark = SparkSession.builder \
    .appName("NewsStockAnalysisPrediction") \
    .config("spark.sql.shuffle.partitions", "2") \
    .getOrCreate()


def get_sentiment(text):
    if not text or text == 'No headline':
        return 'Unknown'
    polarity = TextBlob(text).sentiment.polarity
    if polarity > 0.2:
        return 'Positive'
    elif polarity < -0.2:
        return 'Negative'
    else:
        return 'Neutral'

def infer_genre(text):
    if not text or text == 'No headline':
        return 'Unknown'
    if 'thriller' in text.lower(): return 'Thriller'
    elif 'drama' in text.lower(): return 'Drama'
    elif 'comedy' in text.lower(): return 'Comedy'
    elif 'romance' in text.lower(): return 'Romance'
    else: return 'Other'

def infer_audience(text):
    if not text or text == 'No headline':
        return 'Unknown'
    if any(x in text.lower() for x in ['teen', 'gen z', 'youth']): return 'Youth'
    elif 'adult' in text.lower(): return 'Adults'
    else: return 'General'

def calculate_impact(sentiment, open_price, close_price):
    if sentiment == 'Unknown' or open_price is None or close_price is None:
        return 'Unknown'
    change = (close_price - open_price) / open_price * 100
    if sentiment == 'Positive' and change > 0:
        return 'Positive'
    elif sentiment == 'Negative' and change < 0:
        return 'Negative'
    else:
        return 'Neutral'

sentiment_udf = udf(get_sentiment, StringType())
genre_udf = udf(infer_genre, StringType())
audience_udf = udf(infer_audience, StringType())
impact_udf = udf(calculate_impact, StringType())


def is_weekend(date_obj):
    return date_obj.weekday() >= 5

def mock_news_by_date(date_str):
    """Simulate news headlines."""
    headlines = [
        "Netflix Releases New Thriller Series",
        "Netflix Announces New Comedy Special",
        "Netflix Stock Surges on New Drama Release",
        "Netflix Faces Criticism Over Content",
        "Netflix Partners for New Series",
        "Netflix Expands Youth Programming"
    ]
    published_at = f"{date_str}T{random.randint(8, 18):02d}:00:00Z"
    if random.random() < 0.3:
        # print(f"No news for {date_str}")
        return None, None
    headline = random.choice(headlines)
    # print(f"News for {date_str}: {headline}")
    return headline, published_at

def mock_stock_price(date_str, sentiment=None):
    """Simulate stock prices, tied to sentiment."""
    open_price = 600.00 + random.uniform(-10, 10)
    if sentiment == "Positive":
        close_price = open_price + random.uniform(0, 5)
    elif sentiment == "Negative":
        close_price = open_price + random.uniform(-5, 0)
    else:
        close_price = open_price + random.uniform(-5, 5)
    return round(open_price, 2), round(close_price, 2), datetime.strptime(date_str, "%Y-%m-%d").date()

def cache_data(data, filename):
    if not data:
        # print(f"No data to cache in {filename}")
        return
    try:
        pd.DataFrame(data).to_csv(filename, index=False)
        # print(f"Cached data to {filename}")
    except Exception as e:
        print(f"Error caching data to {filename}: {e}")

def load_cached_data(filename):
    if not os.path.exists(filename):
        # print(f"No cache file found: {filename}")
        return None
    try:
        df = pd.read_csv(filename)
        if df.empty or df.columns.empty:
            # print(f"Cache file {filename} is empty or invalid")
            return None
        data = df.to_dict('records')
        for row in data:
            if pd.notnull(row.get('stock_date')):
                row['stock_date'] = pd.to_datetime(row['stock_date']).date()
        # print(f"Loaded cached data from {filename}")
        return data
    except Exception as e:
        print(f"Error loading cache file {filename}: {e}")
        return None

def analyze_news_stock_range(start_date, end_date):
    cache_file = "news_stock_cache.csv"
    cached_data = load_cached_data(cache_file)
    if cached_data:
        data = cached_data
    else:
        start_date_obj = datetime.strptime(start_date, "%Y-%m-%d").date()
        end_date_obj = datetime.strptime(end_date, "%Y-%m-%d").date()
        if start_date_obj > end_date_obj:
            print("Error: Start date must be before end date")
            return None

        data = []
        current_date = start_date_obj
        while current_date <= end_date_obj:
            date_str = current_date.strftime("%Y-%m-%d")
            # print(f"\nProcessing date: {date_str}")

            if is_weekend(current_date):
                data.append((date_str, "Non-trading day", None, None, None, None, "Weekend date"))
                current_date += timedelta(days=1)
                continue

            headline, published_at = mock_news_by_date(date_str)
            sentiment = get_sentiment(headline) if headline else "Unknown"
            if not headline:
                data.append((date_str, "No news found", None, None, None, None, "No news found"))
                current_date += timedelta(days=1)
                continue

            try:
                news_dt = datetime.fromisoformat(published_at.replace("Z", "+00:00"))
                news_date = news_dt.date()
                if news_date != current_date:
                    # print(f"News date {news_date} does not match {current_date}")
                    data.append((date_str, headline, published_at, None, None, None, "News date mismatch"))
                    current_date += timedelta(days=1)
                    continue
            except Exception as e:
                print(f"Error parsing publishedAt: {e}")
                data.append((date_str, headline, published_at, None, None, None, "Date parsing error"))
                current_date += timedelta(days=1)
                continue

            open_price, close_price, stock_date = mock_stock_price(date_str, sentiment)
            if stock_date is None or stock_date != current_date:
                # print(f"Stock date {stock_date} does not match {current_date}")
                data.append((date_str, headline, published_at, None, None, None, "Stock date mismatch"))
                current_date += timedelta(days=1)
                continue

            data.append((date_str, headline, published_at, open_price, close_price, stock_date, None))
            current_date += timedelta(days=1)

        cache_data(data, cache_file)

    schema = StructType([
        StructField("date", StringType(), True),
        StructField("headline", StringType(), True),
        StructField("publishedAt", StringType(), True),
        StructField("price_open", DoubleType(), True),
        StructField("price_close", DoubleType(), True),
        StructField("stock_date", DateType(), True),
        StructField("impact", StringType(), True)
    ])

    try:
        df = spark.createDataFrame(data, schema)
    except Exception as e:
        print(f"Error creating DataFrame: {e}")
        return None

    df = df.withColumn("sentiment", sentiment_udf(col("headline"))) \
           .withColumn("genre", genre_udf(col("headline"))) \
           .withColumn("audience", audience_udf(col("headline"))) \
           .withColumn("impact",
                       when(col("impact").isNull(),
                            impact_udf(col("sentiment"), col("price_open"), col("price_close"))
                           ).otherwise(col("impact")))

    df = df.select("date", "headline", "publishedAt", "genre", "audience", "sentiment", "price_open", "price_close", "impact")
    return df

def fetch_historical_data(ticker='NFLX', start_date="2024-04-01", end_date="2024-04-30"):
    cache_file = "historical_cache.csv"
    cached_data = load_cached_data(cache_file)
    if cached_data:
        stock_data = cached_data
    else:
        start_date_obj = datetime.strptime(start_date, "%Y-%m-%d").date()
        end_date_obj = datetime.strptime(end_date, "%Y-%m-%d").date()
        stock_data = []
        current_date = start_date_obj
        while current_date <= end_date_obj:
            date_str = current_date.strftime("%Y-%m-%d")
            if is_weekend(current_date):
                stock_data.append((date_str, None, None, None, "Non-trading day", None))
                current_date += timedelta(days=1)
                continue
            # Fetch mock news
            headline, published_at = mock_news_by_date(date_str)
            sentiment = get_sentiment(headline) if headline else "Unknown"
            # Fetch mock stock
            open_price, close_price, stock_date = mock_stock_price(date_str, sentiment)
            stock_data.append((date_str, open_price, close_price, stock_date, headline or "No headline", published_at))
            current_date += timedelta(days=1)

        cache_data(stock_data, cache_file)

    schema = StructType([
        StructField("date", StringType(), True),
        StructField("price_open", DoubleType(), True),
        StructField("price_close", DoubleType(), True),
        StructField("stock_date", DateType(), True),
        StructField("headline", StringType(), True),
        StructField("publishedAt", StringType(), True)
    ])

    try:
        stock_df = spark.createDataFrame(stock_data, schema)
    except Exception as e:
        print(f"Error creating historical DataFrame: {e}")
        return None

    # Apply sentiment analysis
    stock_df = stock_df.withColumn("sentiment", sentiment_udf(col("headline")))

    return stock_df


def train_stock_prediction_model(historical_data):
    if historical_data is None:
        print("No historical data to train model")
        return None, None

    # Prepare features: sentiment score, previous day's close, open
    data = historical_data.withColumn("sentiment_score",
        when(col("sentiment") == "Positive", 1.0)
        .when(col("sentiment") == "Negative", -1.0)
        .otherwise(0.0))

    # Create feature vector
    assembler = VectorAssembler(
        inputCols=["sentiment_score", "price_close", "price_open"],
        outputCol="features",
        handleInvalid="skip"
    )

    # Shift data to predict next day's close
    from pyspark.sql.window import Window
    from pyspark.sql.functions import lag
    w = Window.orderBy("date")
    data = data.withColumn("next_close", lag("price_close", -1).over(w))

    data = data.dropna(subset=["next_close", "price_close", "price_open", "sentiment_score"])

    if data.count() == 0:
        print("No valid data for training after cleaning")
        return None, None

    data = assembler.transform(data)

    # Train Linear Regression
    lr = LinearRegression(featuresCol="features", labelCol="next_close")
    model = lr.fit(data)
    return model, assembler


start_date = "2025-05-01"
end_date = (datetime.today() - timedelta(days=1)).strftime("%Y-%m-%d")
# end_date = "2025-05-14"  # Includes current date (2025-05-14)
analysis_df = analyze_news_stock_range(start_date, end_date)
if analysis_df is not None:
    analysis_df.show(truncate=False)
else:
    print("Failed to create analysis DataFrame")

# Train prediction model
historical_df = fetch_historical_data('NFLX', "2024-04-01", "2024-04-30")
model, assembler = train_stock_prediction_model(historical_df)

# Dynamic prediction based on current date
if model is not None and assembler is not None:
    current_date = datetime.now().date()
    previous_date = current_date - timedelta(days=1)
    previous_date_str = previous_date.strftime("%Y-%m-%d")
    tomorrow_date = current_date + timedelta(days=1)

    # Fetch previous day's data
    previous_data = analysis_df.filter(col("date") == previous_date_str) \
        .select("date", "headline", "sentiment", "price_close", "price_open") \
        .withColumn("sentiment_score",
                    when(col("sentiment") == "Positive", 1.0)
                    .when(col("sentiment") == "Negative", -1.0)
                    .otherwise(0.0)) \
        .select("date", "headline", "sentiment", "price_close", "price_open", "sentiment_score")

    if previous_data.count() > 0:
        # Convert to pandas for easier access
        previous_row = previous_data.toPandas().iloc[0]

        # Print previous day's data
        print(f"\n📅 Data for {previous_date}:")
        print(f"📰 Headline: {previous_row['headline']}")
        print(f"🧠 Sentiment: {previous_row['sentiment']}")
        print(f"📈 Close Price: ${previous_row['price_close']:.2f}")

        # Prepare features for prediction
        latest_features = assembler.transform(previous_data)
        prediction = model.transform(latest_features)
        predicted_price = prediction.select("prediction").collect()[0][0]

        # Print prediction for tomorrow
        print(f"\n📅 Prediction for {tomorrow_date}:")
        print(f"📰 Based on Headline from {previous_date}: {previous_row['headline']}")
        print(f"🧠 Sentiment: {previous_row['sentiment']}")
        print(f"📈 Predicted Close Price: ${predicted_price:.2f}")
    else:
        print(f"No valid data for {previous_date_str} to make a prediction")
else:
    print("Failed to train prediction model")

+----------+-----------------------------------------+--------------------+--------+--------+---------+----------+-----------+-------------+
|date      |headline                                 |publishedAt         |genre   |audience|sentiment|price_open|price_close|impact       |
+----------+-----------------------------------------+--------------------+--------+--------+---------+----------+-----------+-------------+
|2025-05-01|No news found                            |NULL                |Other   |General |Neutral  |NULL      |NULL       |No news found|
|2025-05-02|Netflix Releases New Thriller Series     |2025-05-02T12:00:00Z|Thriller|General |Neutral  |591.91    |590.06     |Neutral      |
|2025-05-03|Non-trading day                          |NULL                |Other   |General |Neutral  |NULL      |NULL       |Weekend date |
|2025-05-04|Non-trading day                          |NULL                |Other   |General |Neutral  |NULL      |NULL       |Weekend date |
|2025-05-05|N

## py file code

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, when, lit
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, DateType
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from textblob import TextBlob
import pandas as pd
from datetime import datetime, date, timedelta
import random
import os

# --- Initialize Spark ---
spark = SparkSession.builder \
    .appName("NewsStockAnalysisPrediction") \
    .config("spark.sql.shuffle.partitions", "2") \
    .getOrCreate()


def get_sentiment(text):
    if not text or text == 'No headline':
        return 'Unknown'
    polarity = TextBlob(text).sentiment.polarity
    if polarity > 0.2:
        return 'Positive'
    elif polarity < -0.2:
        return 'Negative'
    else:
        return 'Neutral'

def infer_genre(text):
    if not text or text == 'No headline':
        return 'Unknown'
    if 'thriller' in text.lower(): return 'Thriller'
    elif 'drama' in text.lower(): return 'Drama'
    elif 'comedy' in text.lower(): return 'Comedy'
    elif 'romance' in text.lower(): return 'Romance'
    else: return 'Other'

def infer_audience(text):
    if not text or text == 'No headline':
        return 'Unknown'
    if any(x in text.lower() for x in ['teen', 'gen z', 'youth']): return 'Youth'
    elif 'adult' in text.lower(): return 'Adults'
    else: return 'General'

def calculate_impact(sentiment, open_price, close_price):
    if sentiment == 'Unknown' or open_price is None or close_price is None:
        return 'Unknown'
    change = (close_price - open_price) / open_price * 100
    if sentiment == 'Positive' and change > 0:
        return 'Positive'
    elif sentiment == 'Negative' and change < 0:
        return 'Negative'
    else:
        return 'Neutral'

sentiment_udf = udf(get_sentiment, StringType())
genre_udf = udf(infer_genre, StringType())
audience_udf = udf(infer_audience, StringType())
impact_udf = udf(calculate_impact, StringType())


def is_weekend(date_obj):
    return date_obj.weekday() >= 5

def mock_news_by_date(date_str):
    """Simulate news headlines."""
    headlines = [
        "Netflix Releases New Thriller Series",
        "Netflix Announces New Comedy Special",
        "Netflix Stock Surges on New Drama Release",
        "Netflix Faces Criticism Over Content",
        "Netflix Partners for New Series",
        "Netflix Expands Youth Programming"
    ]
    published_at = f"{date_str}T{random.randint(8, 18):02d}:00:00Z"
    if random.random() < 0.3:
        # print(f"No news for {date_str}")
        return None, None
    headline = random.choice(headlines)
    # print(f"News for {date_str}: {headline}")
    return headline, published_at

def mock_stock_price(date_str, sentiment=None):
    """Simulate stock prices, tied to sentiment."""
    open_price = 600.00 + random.uniform(-10, 10)
    if sentiment == "Positive":
        close_price = open_price + random.uniform(0, 5)
    elif sentiment == "Negative":
        close_price = open_price + random.uniform(-5, 0)
    else:
        close_price = open_price + random.uniform(-5, 5)
    return round(open_price, 2), round(close_price, 2), datetime.strptime(date_str, "%Y-%m-%d").date()

def cache_data(data, filename):
    if not data:
        # print(f"No data to cache in {filename}")
        return
    try:
        pd.DataFrame(data).to_csv(filename, index=False)
        # print(f"Cached data to {filename}")
    except Exception as e:
        print(f"Error caching data to {filename}: {e}")

def load_cached_data(filename):
    if not os.path.exists(filename):
        # print(f"No cache file found: {filename}")
        return None
    try:
        df = pd.read_csv(filename)
        if df.empty or df.columns.empty:
            # print(f"Cache file {filename} is empty or invalid")
            return None
        data = df.to_dict('records')
        for row in data:
            if pd.notnull(row.get('stock_date')):
                row['stock_date'] = pd.to_datetime(row['stock_date']).date()
        # print(f"Loaded cached data from {filename}")
        return data
    except Exception as e:
        print(f"Error loading cache file {filename}: {e}")
        return None

def analyze_news_stock_range(start_date, end_date):
    cache_file = "news_stock_cache.csv"
    cached_data = load_cached_data(cache_file)
    if cached_data:
        data = cached_data
    else:
        start_date_obj = datetime.strptime(start_date, "%Y-%m-%d").date()
        end_date_obj = datetime.strptime(end_date, "%Y-%m-%d").date()
        if start_date_obj > end_date_obj:
            print("Error: Start date must be before end date")
            return None

        data = []
        current_date = start_date_obj
        while current_date <= end_date_obj:
            date_str = current_date.strftime("%Y-%m-%d")
            # print(f"\nProcessing date: {date_str}")

            if is_weekend(current_date):
                data.append((date_str, "Non-trading day", None, None, None, None, "Weekend date"))
                current_date += timedelta(days=1)
                continue

            headline, published_at = mock_news_by_date(date_str)
            sentiment = get_sentiment(headline) if headline else "Unknown"
            if not headline:
                data.append((date_str, "No news found", None, None, None, None, "No news found"))
                current_date += timedelta(days=1)
                continue

            try:
                news_dt = datetime.fromisoformat(published_at.replace("Z", "+00:00"))
                news_date = news_dt.date()
                if news_date != current_date:
                    # print(f"News date {news_date} does not match {current_date}")
                    data.append((date_str, headline, published_at, None, None, None, "News date mismatch"))
                    current_date += timedelta(days=1)
                    continue
            except Exception as e:
                print(f"Error parsing publishedAt: {e}")
                data.append((date_str, headline, published_at, None, None, None, "Date parsing error"))
                current_date += timedelta(days=1)
                continue

            open_price, close_price, stock_date = mock_stock_price(date_str, sentiment)
            if stock_date is None or stock_date != current_date:
                # print(f"Stock date {stock_date} does not match {current_date}")
                data.append((date_str, headline, published_at, None, None, None, "Stock date mismatch"))
                current_date += timedelta(days=1)
                continue

            data.append((date_str, headline, published_at, open_price, close_price, stock_date, None))
            current_date += timedelta(days=1)

        cache_data(data, cache_file)

    schema = StructType([
        StructField("date", StringType(), True),
        StructField("headline", StringType(), True),
        StructField("publishedAt", StringType(), True),
        StructField("price_open", DoubleType(), True),
        StructField("price_close", DoubleType(), True),
        StructField("stock_date", DateType(), True),
        StructField("impact", StringType(), True)
    ])

    try:
        df = spark.createDataFrame(data, schema)
    except Exception as e:
        print(f"Error creating DataFrame: {e}")
        return None

    df = df.withColumn("sentiment", sentiment_udf(col("headline"))) \
           .withColumn("genre", genre_udf(col("headline"))) \
           .withColumn("audience", audience_udf(col("headline"))) \
           .withColumn("impact",
                       when(col("impact").isNull(),
                            impact_udf(col("sentiment"), col("price_open"), col("price_close"))
                           ).otherwise(col("impact")))

    df = df.select("date", "headline", "publishedAt", "genre", "audience", "sentiment", "price_open", "price_close", "impact")
    return df

def fetch_historical_data(ticker='NFLX', start_date="2024-04-01", end_date="2024-04-30"):
    cache_file = "historical_cache.csv"
    cached_data = load_cached_data(cache_file)
    if cached_data:
        stock_data = cached_data
    else:
        start_date_obj = datetime.strptime(start_date, "%Y-%m-%d").date()
        end_date_obj = datetime.strptime(end_date, "%Y-%m-%d").date()
        stock_data = []
        current_date = start_date_obj
        while current_date <= end_date_obj:
            date_str = current_date.strftime("%Y-%m-%d")
            if is_weekend(current_date):
                stock_data.append((date_str, None, None, None, "Non-trading day", None))
                current_date += timedelta(days=1)
                continue
            # Fetch mock news
            headline, published_at = mock_news_by_date(date_str)
            sentiment = get_sentiment(headline) if headline else "Unknown"
            # Fetch mock stock
            open_price, close_price, stock_date = mock_stock_price(date_str, sentiment)
            stock_data.append((date_str, open_price, close_price, stock_date, headline or "No headline", published_at))
            current_date += timedelta(days=1)

        cache_data(stock_data, cache_file)

    schema = StructType([
        StructField("date", StringType(), True),
        StructField("price_open", DoubleType(), True),
        StructField("price_close", DoubleType(), True),
        StructField("stock_date", DateType(), True),
        StructField("headline", StringType(), True),
        StructField("publishedAt", StringType(), True)
    ])

    try:
        stock_df = spark.createDataFrame(stock_data, schema)
    except Exception as e:
        print(f"Error creating historical DataFrame: {e}")
        return None

    # Apply sentiment analysis
    stock_df = stock_df.withColumn("sentiment", sentiment_udf(col("headline")))

    return stock_df


def train_stock_prediction_model(historical_data):
    if historical_data is None:
        print("No historical data to train model")
        return None, None

    # Prepare features: sentiment score, previous day's close, open
    data = historical_data.withColumn("sentiment_score",
        when(col("sentiment") == "Positive", 1.0)
        .when(col("sentiment") == "Negative", -1.0)
        .otherwise(0.0))

    # Create feature vector
    assembler = VectorAssembler(
        inputCols=["sentiment_score", "price_close", "price_open"],
        outputCol="features",
        handleInvalid="skip"
    )

    # Shift data to predict next day's close
    from pyspark.sql.window import Window
    from pyspark.sql.functions import lag
    w = Window.orderBy("date")
    data = data.withColumn("next_close", lag("price_close", -1).over(w))

    data = data.dropna(subset=["next_close", "price_close", "price_open", "sentiment_score"])

    if data.count() == 0:
        print("No valid data for training after cleaning")
        return None, None

    data = assembler.transform(data)

    # Train Linear Regression
    lr = LinearRegression(featuresCol="features", labelCol="next_close")
    model = lr.fit(data)
    return model, assembler


start_date = "2025-05-01"
end_date = (datetime.today() - timedelta(days=1)).strftime("%Y-%m-%d")
# end_date = "2025-05-14"  # Includes current date (2025-05-14)
analysis_df = analyze_news_stock_range(start_date, end_date)
if analysis_df is not None:
    analysis_df.show(truncate=False)
    finall_df = analysis_df.toPandas()
else:
    print("Failed to create analysis DataFrame")

# Train prediction model
historical_df = fetch_historical_data('NFLX', "2024-04-01", "2024-04-30")
model, assembler = train_stock_prediction_model(historical_df)

# Dynamic prediction based on current date
if model is not None and assembler is not None:
    current_date = datetime.now().date()
    previous_date = current_date - timedelta(days=1)
    previous_date_str = previous_date.strftime("%Y-%m-%d")
    tomorrow_date = current_date + timedelta(days=1)

    # Fetch previous day's data
    previous_data = analysis_df.filter(col("date") == previous_date_str) \
        .select("date", "headline", "sentiment", "price_close", "price_open") \
        .withColumn("sentiment_score",
                    when(col("sentiment") == "Positive", 1.0)
                    .when(col("sentiment") == "Negative", -1.0)
                    .otherwise(0.0)) \
        .select("date", "headline", "sentiment", "price_close", "price_open", "sentiment_score")

    if previous_data.count() > 0:
        # Convert to pandas for easier access
        previous_row = previous_data.toPandas().iloc[0]

        # Print previous day's data
        print(f"\n📅 Data for {previous_date}:")
        print(f"📰 Headline: {previous_row['headline']}")
        print(f"🧠 Sentiment: {previous_row['sentiment']}")
        print(f"📈 Close Price: ${previous_row['price_close']:.2f}")

        # Prepare features for prediction
        latest_features = assembler.transform(previous_data)
        prediction = model.transform(latest_features)
        predicted_price = prediction.select("prediction").collect()[0][0]

        # Print prediction for tomorrow
        print(f"\n📅 Prediction for {tomorrow_date}:")
        print(f"📰 Based on Headline from {previous_date}: {previous_row['headline']}")
        print(f"🧠 Sentiment: {previous_row['sentiment']}")
        print(f"📈 Predicted Close Price: ${predicted_price:.2f}")
    else:
        print(f"No valid data for {previous_date_str} to make a prediction")
else:
    print("Failed to train prediction model")

+----------+-----------------------------------------+--------------------+--------+--------+---------+----------+-----------+-------------+
|date      |headline                                 |publishedAt         |genre   |audience|sentiment|price_open|price_close|impact       |
+----------+-----------------------------------------+--------------------+--------+--------+---------+----------+-----------+-------------+
|2025-05-01|Netflix Stock Surges on New Drama Release|2025-05-01T08:00:00Z|Drama   |General |Neutral  |595.98    |597.23     |Neutral      |
|2025-05-02|No news found                            |NULL                |Other   |General |Neutral  |NULL      |NULL       |No news found|
|2025-05-03|Non-trading day                          |NULL                |Other   |General |Neutral  |NULL      |NULL       |Weekend date |
|2025-05-04|Non-trading day                          |NULL                |Other   |General |Neutral  |NULL      |NULL       |Weekend date |
|2025-05-05|N

In [None]:
finall_df.head()

Unnamed: 0,date,headline,publishedAt,genre,audience,sentiment,price_open,price_close,impact
0,2025-05-01,Netflix Stock Surges on New Drama Release,2025-05-01T08:00:00Z,Drama,General,Neutral,595.98,597.23,Neutral
1,2025-05-02,No news found,,Other,General,Neutral,,,No news found
2,2025-05-03,Non-trading day,,Other,General,Neutral,,,Weekend date
3,2025-05-04,Non-trading day,,Other,General,Neutral,,,Weekend date
4,2025-05-05,Netflix Partners for New Series,2025-05-05T12:00:00Z,Other,General,Neutral,596.3,592.78,Neutral


In [None]:
finall_df.to_csv('analysis_data.csv', index=False)