**Student: Thomas Balsalobre Lucas Arriesse Groupe: DATA&IA 3**

# Big Data Analysis
# Nasdaq tech stockst

Import libraries needed

In [1]:
%pip install -r requirments.txt

Note: you may need to restart the kernel to use updated packages.


In [2]:
import yfinance as yf
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.types import StructType, StructField, StringType, DateType, FloatType, IntegerType
from pyspark.sql import functions as F
from pyspark.sql.functions import date_trunc, avg, round
from pyspark.sql.window import Window

from enum import Enum
from datetime import datetime, timedelta
import calendar

import requests_cache
import logging
from requests import Session
from requests_cache import CacheMixin, SQLiteCache
from requests_ratelimiter import LimiterMixin, MemoryQueueBucket
from pyrate_limiter import Duration, RequestRate, Limiter

import streamlit as st
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

In [3]:
class ColumnNames(Enum):
    DATE = "Date"
    TICKER = "Ticker"
    OPEN = "Open"
    HIGH = "High"
    LOW = "Low"
    CLOSE = "Close"
    ADJ_CLOSE = "Adj Close"
    VOLUME = "Volume"
    COMPANY_NAME = "Company"
    INDUSTRY = "Industry"
    SECTOR = "Sector"

In [4]:
class EnumPeriod(Enum):
    YEAR = "year"        
    QUARTER = "quarter"  
    MONTH = "month"      
    WEEK = "week"        
    DAY = "day"

    def get_format(self):
        formats = {
            "year": "yyyy",
            "quarter": "yyyy-'Q'Q",
            "month": "yyyy-MM",
            "week": "yyyy-'W'ww",
            "day": "yyyy-MM-dd"
        }
        return formats[self.value]

In [5]:
def format_period_column(period: EnumPeriod, date_column: str):
    if period == EnumPeriod.WEEK:
        year_col = F.year(date_column).cast("string")
        week_col = F.format_string("%02d", F.weekofyear(date_column))
        return F.concat(year_col, F.lit("-W"), week_col)
    else:
        truncated_col = F.date_trunc(period.value, date_column)
        return F.date_format(truncated_col, period.get_format())

In [6]:
def add_period(start_date: str, period: EnumPeriod, amount: int) -> str:
    date_obj = datetime.strptime(start_date, '%Y-%m-%d')
    
    if period == EnumPeriod.DAY:
        new_date = date_obj + timedelta(days=amount)
    elif period == EnumPeriod.WEEK:
        new_date = date_obj + timedelta(weeks=amount)
    elif period == EnumPeriod.MONTH:
        new_month = (date_obj.month + amount - 1) % 12 + 1
        new_year = date_obj.year + (date_obj.month + amount - 1) // 12
        last_day_of_new_month = calendar.monthrange(new_year, new_month)[1]
        new_day = min(date_obj.day, last_day_of_new_month) 
        new_date = date_obj.replace(year=new_year, month=new_month, day=new_day)
    elif period == EnumPeriod.QUARTER:
        new_month = (date_obj.month + amount * 3 - 1) % 12 + 1
        new_year = date_obj.year + (date_obj.month + amount * 3 - 1) // 12
        last_day_of_new_month = calendar.monthrange(new_year, new_month)[1]
        new_day = min(date_obj.day, last_day_of_new_month)
        new_date = date_obj.replace(year=new_year, month=new_month, day=new_day)
    elif period == EnumPeriod.YEAR:
        new_date = date_obj.replace(year=date_obj.year + amount)
    else:
        raise ValueError(f"Unknown period: {period}")

    return new_date.strftime('%Y-%m-%d')

In [7]:
class CachedLimiterSession(CacheMixin, LimiterMixin, Session):
    pass

class App:
    _instance = None
    _app_name = "MyStockApp"
    _app_version = "1.0"

    def __init__(self):
        if App._instance is not None:
            raise Exception("App is a singleton! Use App.get_instance() instead.")
        self.logger = self._setup_logger()
        self.logger.info(f"Starting {App._app_name} version {App._app_version}")
        self.spark = SparkSession.builder.appName(App._app_name).getOrCreate()
        self.session = self._setup_session()
        App._instance = self

    @classmethod
    def get_instance(cls):
        if cls._instance is None:
            cls._instance = App()
        return cls._instance

    def _setup_logger(self):
        logger = logging.getLogger(f"{App._app_name} Logger")
        logger.setLevel(logging.INFO)
        if not logger.handlers:
            ch = logging.StreamHandler()
            ch.setLevel(logging.INFO)
            formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
            ch.setFormatter(formatter)
            logger.addHandler(ch)
    
        return logger


    def _setup_session(self):
        session = CachedLimiterSession(
                    limiter=Limiter(RequestRate(10, Duration.SECOND*5)),
                    bucket_class=MemoryQueueBucket,
                    backend=SQLiteCache("yfinance.cache"),
                )

        session.headers['User-agent'] = f"{App._app_name}/{App._app_version} (Windows NT 10.0; Win64; x64)"

        return session

    def get_logger(self):
        return self.logger

    def get_session(self):
        return self.session

    def get_spark_session(self):
        return self.spark

In [8]:
class NasdaqDF:    
    def __init__(self, csv_path, analysis_period):
        self.spark = App.get_instance().get_spark_session()
        self.logger = App.get_instance().get_logger()
        self.session = App.get_instance().get_session()

        self.list_nasdaq_path = csv_path
        self.stock_schema = self._define_stock_schema()
        self.nasdaq_schema = self._define_nasdaq_schema()
        self.analysis_period = analysis_period

    def _define_stock_schema(self):
        """Define the schema for the Spark DataFrame."""
        return StructType([
            StructField(ColumnNames.DATE.value, DateType(), True),
            StructField(ColumnNames.TICKER.value, StringType(), True),
            StructField(ColumnNames.OPEN.value, FloatType(), True),
            StructField(ColumnNames.HIGH.value, FloatType(), True),
            StructField(ColumnNames.LOW.value, FloatType(), True),
            StructField(ColumnNames.CLOSE.value, FloatType(), True),
            StructField(ColumnNames.ADJ_CLOSE.value, FloatType(), True),
            StructField(ColumnNames.VOLUME.value, IntegerType(), True)
        ])

    def _define_nasdaq_schema(self):
        """Define the schema for Nasdaq company information."""
        return StructType([
            StructField(ColumnNames.TICKER.value, StringType(), True),
            StructField(ColumnNames.COMPANY_NAME.value, StringType(), True),
            StructField(ColumnNames.INDUSTRY.value, StringType(), True),
            StructField(ColumnNames.SECTOR.value, StringType(), True)
        ])

    def load_companies_df(self):
        """Load the Nasdaq company data with error handling."""
        self.logger.info("Beginning download of companies Dataframe")
        nasdaq_df = None
        try:
            nasdaq_df = self.spark.read.option("delimiter", ";").csv(self.list_nasdaq_path, header=True, schema=self.nasdaq_schema)
            tickers_df = nasdaq_df.select(ColumnNames.TICKER.value).distinct()
            self.tickers = [row[ColumnNames.TICKER.value] for row in tickers_df.collect()]
            nasdaq_df = nasdaq_df.select(ColumnNames.COMPANY_NAME.value, ColumnNames.TICKER.value,
                                    ColumnNames.INDUSTRY.value, ColumnNames.SECTOR.value)
        except Exception as e:
            self.logger.error(f"Failed to load companies or tickers in DataFrame: {e}")
            return None

        finally:
            if nasdaq_df is None:
                self.logger.warning("The nasdaq_df is None. No data loaded.")
            else:
                ticker_count = nasdaq_df.select(ColumnNames.TICKER.value).distinct().count()
                total_rows = nasdaq_df.count()
    
                if total_rows == 0:
                    self.logger.warning("The nasdaq_df is empty after loading. No companies found.")
                elif ticker_count == 0:
                    self.logger.warning("No distinct tickers found in nasdaq_df.")
                elif total_rows < len(self.tickers):
                    self.logger.warning(f"Loaded DataFrame has fewer rows ({total_rows}) than expected tickers ({len(self.tickers)}).")
                self.logger.info(f"Companies DataFrame loaded successfully for {ticker_count} distinct tickers")
            return nasdaq_df

    def _download_single_ticker(self, ticker):
        stock_data = None
        try:
            stock_data = yf.download(ticker, period=self.analysis_period, rounding=True, session=self.session, progress=False)
            
            if stock_data.empty:
                self.logger.warning(f"No data returned for ticker: {ticker}")
                return None
            
            stock_data.columns = stock_data.columns.get_level_values(0)
            stock_data.columns.name = None
            stock_data.index = stock_data.index.tz_convert('UTC')
            stock_data['Date'] = stock_data.index.date
            stock_data['Ticker'] = ticker

            stock_data = stock_data[['Date', 'Ticker', 'Open', 'High', 'Low', 'Close', 'Adj Close', 'Volume']].reset_index(drop=True)

        except Exception as e:
            self.logger.error(f"Download failed for {ticker}: {e}")
            return None

        finally:
            nan_count = stock_data.isnull().sum().sum()
            if nan_count > 0:
                self.logger.warning(f"Found {nan_count} NaN values in the data for ticker: {ticker} at date: {stock_data['Date']}")
            return stock_data  

    def load_stock_df(self):
        """Download data for all tickers with error handling and timeout."""
        self.logger.info("Beginning download of stock Dataframe")
        stock_df = None
        combined_rdd = self.spark.sparkContext.emptyRDD()
        try:
            for ticker in self.tickers:
                data = self._download_single_ticker(ticker)
                if data is not None:
                    rows = [tuple(row) for row in data.to_numpy()]
                    rdd = self.spark.sparkContext.parallelize(rows)
                    combined_rdd = combined_rdd.union(rdd)
            stock_df = self.spark.createDataFrame(combined_rdd, schema=self.stock_schema)
            stock_df = stock_df.repartition(len(self.tickers), F.col(ColumnNames.TICKER.value))

        except Exception as e:
            self.logger.error(f"Failed to load stock DataFrame: {e}")
            return None

        finally:
            if stock_df is None:
                self.logger.warning("The stock_df is None. No data loaded.")
            else:
                if stock_df.count() < len(self.tickers):
                    self.logger.warning(f"Downloaded data contains fewer rows ({stock_df.count()}) than tickers ({len(self.tickers)})")
                self.logger.info("Stock DataFrame loaded successfully.")
            return stock_df

    def merge_dataframes(self, stock_df, companies_df):
        """Merge stock_df and companies_df with error handling."""
        self.logger.info("Merging stock and companies Dataframe")
        merged_df = None
        try:
            if stock_df is None or companies_df is None:
                self.logger.warning("DataFrames stock and/or companies not loaded.")
            merged_df = stock_df.join(companies_df, on=ColumnNames.TICKER.value, how='inner')
            
        except Exception as e:
            self.logger.error(f"Failed to merge DataFrames: {e}")
            return None

        finally:
            if merged_df is not None:
                self.logger.info("Merged stock and companies DataFrame successfully.")  
            return merged_df

In [9]:
app = App.get_instance()
nasdaq_data = NasdaqDF(csv_path="nasdaq_100_list.csv", analysis_period="1y")
companies_df = nasdaq_data.load_companies_df()
stock_df = nasdaq_data.load_stock_df()
merged_df = nasdaq_data.merge_dataframes(stock_df, companies_df)

2024-11-02 15:40:42,516 - MyStockApp Logger - INFO - Starting MyStockApp version 1.0
2024-11-02 15:40:48,275 - MyStockApp Logger - INFO - Beginning download of companies Dataframe
2024-11-02 15:40:57,099 - MyStockApp Logger - INFO - Companies DataFrame loaded successfully for 101 distinct tickers
2024-11-02 15:40:57,101 - MyStockApp Logger - INFO - Beginning download of stock Dataframe
2024-11-02 15:42:12,501 - MyStockApp Logger - INFO - Stock DataFrame loaded successfully.
2024-11-02 15:42:12,502 - MyStockApp Logger - INFO - Merging stock and companies Dataframe
2024-11-02 15:42:12,594 - MyStockApp Logger - INFO - Merged stock and companies DataFrame successfully.


In [10]:
class NasdaqAnalysis:
    
    def __init__(self, stock_df):
        self.stock_df = stock_df
        self.logger = App.get_instance().get_logger()

    def display_stock_bounds(self, ticker: str, num_rows: int = 40):
        """
        Display the first and last `num_rows` rows for the specified ticker in the DataFrame.
        
        Args:
            ticker (str): The stock ticker symbol to filter data.
            num_rows (int): Number of rows to display at the start and end of the data. Default is 40.
        
        Returns:
            None
        """
        try:
            self.logger.info(f"Displaying first and last {num_rows} rows for ticker: {ticker}")
            df_filtered = self.stock_df.filter(F.col(ColumnNames.TICKER.value) == ticker)
            
            print(f"First {num_rows} rows for ticker: {ticker}")
            df_filtered.orderBy(F.asc(ColumnNames.DATE.value)).show(num_rows)
            
            print(f"Last {num_rows} rows for ticker: {ticker}")
            df_filtered.orderBy(F.desc(ColumnNames.DATE.value)).show(num_rows)
        
        except Exception as e:
            self.logger.error(f"Error displaying bounds for ticker {ticker}: {e}")

    def count_observations(self):
        """
        Count the total number of observations (rows) in the DataFrame.
        
        Returns:
            int: Total number of observations.
        """
        try:
            total_count = self.stock_df.count()
            self.logger.info(f"Total observations: {total_count}")
            return total_count
        except Exception as e:
            self.logger.error(f"Error counting observations: {e}")
            return None

    def deduce_data_period(self):
        """
        Calculate both the average and most common data period in days based on date differences.
        
        Returns:
            dict: A dictionary with "average_period" as a float and "most_common_period" as an integer.
        """
        df = self.stock_df
        try:
            logger = App.get_instance().get_logger()
            logger.info("Inferring data period based on date differences.")
    
            window_spec = Window.partitionBy("Ticker").orderBy("Date")
    
            df_with_diff = df.withColumn("date_diff", F.datediff(F.col("Date"), F.lag("Date", 1).over(window_spec)))
    
            avg_diff = df_with_diff.agg(F.mean("date_diff")).first()[0]
    
            most_common_diff = (
                df_with_diff.groupBy("date_diff")
                .count()
                .orderBy(F.desc("count"))
                .first()
            )
            most_common_period = most_common_diff["date_diff"]
    
            logger.info(f"Inferred average data period: {avg_diff} days")
            logger.info(f"Inferred most common data period: {most_common_period} days")
            
            return {"average_period": round(avg_diff, 2), "most_common_period": most_common_period}
            
        except Exception as e:
            logger.error(f"Error inferring data period: {e}")
            return None

    def descriptive_statistics(self):
        """
        Display basic descriptive statistics (min, max, mean, standard deviation) for numeric columns.

        Returns:
            None
        """
        try:
            self.logger.info("Calculating descriptive statistics for numeric columns.")
            self.stock_df.describe()
        except Exception as e:
            self.logger.error(f"Error calculating descriptive statistics: {e}")

    def count_missing_values(self):
        """
        Count the number of missing values for each column in the DataFrame.

        Returns:
            DataFrame: A DataFrame containing counts of missing values per column.
        """
        try:
            self.logger.info("Counting missing values for each column.")
            missing_df = self.stock_df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in self.stock_df.columns])
            return missing_df
        except Exception as e:
            self.logger.error(f"Error counting missing values: {e}")
            return None

    def calculate_correlation(self):
        """
        Calculate correlations between numeric columns in the DataFrame.

        Returns:
            dict: A dictionary with tuple keys (column1, column2) and correlation values.
        """
        try:
            self.logger.info("Calculating correlations between numeric columns.")
            numeric_columns = [field.name for field in self.stock_df.schema.fields if isinstance(field.dataType, (FloatType, IntegerType))]
            correlations = {}
            for i in range(len(numeric_columns)):
                for j in range(i + 1, len(numeric_columns)):
                    col1, col2 = numeric_columns[i], numeric_columns[j]
                    corr_value = self.stock_df.corr(col1, col2)
                    correlations[(col1, col2)] = corr_value
            return correlations
        except Exception as e:
            self.logger.error(f"Error calculating correlations: {e}")
            return {}

In [11]:
analysis = NasdaqAnalysis(stock_df)
analysis.display_stock_bounds('AAPL')
nb_count = analysis.count_observations()
dict_period = analysis.deduce_data_period()
df_stats = analysis.descriptive_statistics()
missing_df = analysis.count_missing_values()
correlations = analysis.calculate_correlation()

2024-11-02 15:42:12,893 - MyStockApp Logger - INFO - Displaying first and last 40 rows for ticker: AAPL


First 40 rows for ticker: AAPL
+----------+------+------+------+------+------+---------+---------+
|      Date|Ticker|  Open|  High|   Low| Close|Adj Close|   Volume|
+----------+------+------+------+------+------+---------+---------+
|2023-11-02|  AAPL|175.52|177.78|175.46|177.57|   176.67| 77334800|
|2023-11-03|  AAPL|174.24|176.82|173.35|176.65|   175.75| 79763700|
|2023-11-06|  AAPL|176.38|179.43|176.21|179.23|   178.32| 63841300|
|2023-11-07|  AAPL|179.18|182.44|178.97|181.82|   180.89| 70530000|
|2023-11-08|  AAPL|182.35|183.45|181.59|182.89|   181.96| 49340300|
|2023-11-09|  AAPL|182.96|184.12|181.81|182.41|   181.48| 53763500|
|2023-11-10|  AAPL|183.97|186.57|183.53| 186.4|    185.7| 66133400|
|2023-11-13|  AAPL|185.82|186.03|184.21| 184.8|    184.1| 43627500|
|2023-11-14|  AAPL| 187.7|188.11| 186.3|187.44|   186.73| 60108400|
|2023-11-15|  AAPL|187.85| 189.5|187.78|188.01|    187.3| 53790500|
|2023-11-16|  AAPL|189.57|190.96|188.65|189.71|   188.99| 54412900|
|2023-11-17|  AAP

2024-11-02 15:43:07,389 - MyStockApp Logger - INFO - Total observations: 25452
2024-11-02 15:43:07,392 - MyStockApp Logger - INFO - Inferring data period based on date differences.
2024-11-02 15:43:46,115 - MyStockApp Logger - INFO - Inferred average data period: 1.454183266932271 days
2024-11-02 15:43:46,116 - MyStockApp Logger - INFO - Inferred most common data period: 1 days
2024-11-02 15:43:46,117 - MyStockApp Logger - ERROR - Error inferring data period: Invalid argument, not a string or column: 1.454183266932271 of type <class 'float'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.
2024-11-02 15:43:46,118 - MyStockApp Logger - INFO - Calculating descriptive statistics for numeric columns.
2024-11-02 15:43:46,398 - MyStockApp Logger - INFO - Counting missing values for each column.
2024-11-02 15:43:46,505 - MyStockApp Logger - INFO - Calculating correlations between numeric columns.


In [12]:
class DataFrameOperations:
    def __init__(self, logger: logging.Logger, stock_df: DataFrame):
        self.logger = logger
        self.stock_df = stock_df

    def df_periodic_change(self, period: EnumPeriod, column: ColumnNames) -> DataFrame:
        self.logger.info("Calculating periodic change for column: %s with period: %s", column.value, period.value)

        if period == EnumPeriod.DAY:
            window_spec = Window.partitionBy(ColumnNames.TICKER.value).orderBy(ColumnNames.DATE.value)
            daily_change_df = self.stock_df.withColumn(f"daily_change_{column.value}", 
                round(F.col(column.value) - F.lag(column.value, 1).over(window_spec), 2))
            return daily_change_df.select(ColumnNames.TICKER.value, ColumnNames.DATE.value, f"daily_change_{column.value}")

        period_col = format_period_column(period, ColumnNames.DATE.value)

        return (
            self.stock_df
            .groupBy(ColumnNames.TICKER.value, period_col.alias(f"{period.value}_period"))
            .agg(
                F.first(column.value).alias(f"first_{column.value}"),
                F.last(column.value).alias(f"last_{column.value}")
            )
            .withColumn(f"{period.value}_change_{column.value}", 
                        round(F.col(f"last_{column.value}") - F.col(f"first_{column.value}"), 2))
            .select(ColumnNames.TICKER.value, f"{period.value}_period", f"{period.value}_change_{column.value}")
        )

    def avg_open_close_by_period(self, period: EnumPeriod) -> DataFrame:
        self.logger.info("Calculating average Open and Close by period: %s", period.value)

        period_col = format_period_column(period, ColumnNames.DATE.value)

        return (
            self.stock_df
            .groupBy(ColumnNames.TICKER.value, period_col.alias(f"{period.value}_period"))
            .agg(
                round(F.avg(ColumnNames.OPEN.value), 2).alias("avg_open"), 
                round(F.avg(ColumnNames.CLOSE.value), 2).alias("avg_close")
            )
        )

    def calculate_return_rate(self, period: EnumPeriod, column: str) -> DataFrame:
        self.logger.info("Calculating return rate for column: %s with period: %s", column, period.value)
    
        if period == EnumPeriod.DAY:
            window_spec = Window.partitionBy(ColumnNames.TICKER.value).orderBy(ColumnNames.Date.value)
            daily_return_df = self.stock_df.withColumn(
                f"{period.value}_return_rate",
                F.round(((F.col(column.value) - F.lag(column.value, 1).over(window_spec)) / F.lag(column.value, 1).over(window_spec)) * 100, 2)
            )
            return daily_return_df.select(ColumnNames.TICKER.value, ColumnNames.DATE.value, f"{period.value}_return_rate")
        
        period_col = format_period_column(period, ColumnNames.DATE.value)
        
        return_rate_df = (
            self.stock_df
            .groupBy(ColumnNames.TICKER.value, period_col.alias(f"{period.value}_period"))
            .agg(
                F.first(column.value).alias(f"first_{column.value}"),
                F.last(column.value).alias(f"last_{column.value}")
            )
            .withColumn(
                f"{period.value}_return_rate",
                F.round(((F.col(f"last_{column.value}") - F.col(f"first_{column.value}")) / F.col(f"first_{column.value}")) * 100, 2)
            )
            .select(ColumnNames.TICKER.value, f"{period.value}_period", f"{period.value}_return_rate")
        )
        
        return return_rate_df

    def calculate_daily_return(self) -> DataFrame:
        self.logger.info("Calculating daily return for each stock.")
        
        new_stock_df = self.stock_df.withColumn("daily_return", 
            round(100 * ((F.col(ColumnNames.CLOSE.value) - F.col(ColumnNames.OPEN.value)) / F.col(ColumnNames.OPEN.value)), 4)
        )

        self.stock_df = new_stock_df
        
        return new_stock_df

    def avg_daily_return_by_period(self, period: EnumPeriod):
        self.logger.info("Calculating average daily return for each stock within the period.")
        
        if "daily_return" not in self.stock_df.columns:
            self.logger.warning("daily_return column not found. Calculating daily returns first.")
            self.calculate_daily_return()

        period_col = format_period_column(period, ColumnNames.DATE.value)

        return (self.stock_df
                .groupBy(ColumnNames.TICKER.value, period_col.alias(f"{period.value}_period"))
                .agg(round(F.avg("daily_return"), 4).alias("avg_daily_return"))
               )

    def stocks_with_highest_daily_return(self, daily_return_df: DataFrame, top_n: int = 5) -> DataFrame:
        self.logger.info("Finding stocks with the highest daily return.")

        if "daily_return" not in self.stock_df.columns:
            self.logger.warning("daily_return column not found. Calculating daily returns first.")
            self.calculate_daily_return()

        return daily_return_df.orderBy(F.desc("daily_return")).limit(top_n)

    def calculate_moving_average(self, column: str, num_days: int) -> DataFrame:
        self.logger.info(f"Calculating moving average on {num_days} days period.")

        moving_avg_window = Window.partitionBy(ColumnNames.TICKER.value).orderBy(ColumnNames.DATE.value).rowsBetween(-num_days + 1, 0)
        count_col = F.count(F.col(column)).over(moving_avg_window)
    
        return (self.stock_df
                .withColumn(f"{column}_moving_avg_{num_days}_days",
                F.when(count_col == num_days, F.avg(F.col(column)).over(moving_avg_window)).otherwise(None))
               )

    def calculate_correlation_pairs(self) -> DataFrame:
        self.logger.info("Calculating correlations between all possible ticker pairs.")

        if "daily_return" not in self.stock_df.columns:
            self.logger.warning("daily_return column not found. Calculating daily returns first.")
            self.calculate_daily_return()

        daily_return_df_a = self.stock_df.alias("a")
        daily_return_df_b = self.stock_df.alias("b")

        joined_df = (
            daily_return_df_a.join(daily_return_df_b, (F.col("a.Date") == F.col("b.Date")) & 
                                                     (F.col("a.Ticker") < F.col("b.Ticker")))
        )

        correlation_df = (
            joined_df.groupBy("a.Ticker", "b.Ticker")
                     .agg(F.corr("a.daily_return", "b.daily_return").alias("correlation"))
                     .filter(F.col("correlation").isNotNull())
                     .orderBy(F.desc("correlation"))
        )

        return correlation_df

    def calculate_periodic_return(self, period: str, price_column: str = "Close") -> DataFrame:
        period_col = format_period_column(period, ColumnNames.DATE.value)
        return_df = (stock_df
                     .groupBy(ColumnNames.TICKER.value, period_col.alias(f"{period.value}_period"))
                     .agg(
                         F.first(price_column).alias("first_price"),
                         F.last(price_column).alias("last_price")
                     )
                     .withColumn(f"{period}_return_rate", 
                                 (F.col("last_price") - F.col("first_price")) / F.col("first_price") * 100)
                     .select("Ticker", period_col, f"{period}_return_rate")
                    )
    
        return return_df

    def calculate_best_return_rate(self, start_date: str, period: EnumPeriod, column: ColumnNames):
        self.logger.info("Calculating best return rate from %s for period: %s", start_date, period.value)
    
        end_date = add_period(start_date, period, 1)
    
        filtered_df = self.stock_df.filter((F.col(ColumnNames.DATE.value) >= start_date) & (F.col(ColumnNames.DATE.value) <= end_date))
    
        return_rate_df = filtered_df.groupBy(ColumnNames.TICKER.value).agg(
            F.first(column.value).alias("first_price"),
            F.last(column.value).alias("last_price")
        ).withColumn(
            "return_rate",
            F.round(((F.col("last_price") - F.col("first_price")) / F.col("first_price")) * 100, 2)
        )
    
        return return_rate_df.orderBy(F.col("return_rate").desc())

In [13]:
df_operations = DataFrameOperations(App.get_instance().get_logger(), stock_df)
period = EnumPeriod.MONTH
column = ColumnNames.CLOSE
df_periodic_change = df_operations.df_periodic_change(period, column)
df_avg_close = df_operations.avg_open_close_by_period(period)
df_month_return = df_operations.calculate_return_rate(period, column)
df_daily_return = df_operations.calculate_daily_return()
df_highest_daily_return = df_operations.stocks_with_highest_daily_return(df_daily_return)
df_avg_daily_return = df_operations.avg_daily_return_by_period(period)
df_mvg_avg = df_operations.calculate_moving_average(ColumnNames.OPEN.value, 5)
df_correlations = df_operations.calculate_correlation_pairs()
df_best_return_rate = df_operations.calculate_best_return_rate("2024-01-03", EnumPeriod.MONTH, ColumnNames.CLOSE)

2024-11-02 15:47:51,027 - MyStockApp Logger - INFO - Calculating periodic change for column: Close with period: month
2024-11-02 15:47:51,056 - MyStockApp Logger - INFO - Calculating average Open and Close by period: month
2024-11-02 15:47:51,069 - MyStockApp Logger - INFO - Calculating return rate for column: ColumnNames.CLOSE with period: month
2024-11-02 15:47:51,097 - MyStockApp Logger - INFO - Calculating daily return for each stock.
2024-11-02 15:47:51,106 - MyStockApp Logger - INFO - Finding stocks with the highest daily return.
2024-11-02 15:47:51,112 - MyStockApp Logger - INFO - Calculating average daily return for each stock within the period.
2024-11-02 15:47:51,124 - MyStockApp Logger - INFO - Calculating moving average on 5 days period.
2024-11-02 15:47:51,196 - MyStockApp Logger - INFO - Calculating correlations between all possible ticker pairs.
2024-11-02 15:47:51,294 - MyStockApp Logger - INFO - Calculating best return rate from 2024-01-03 for period: month


In [14]:
import streamlit as st
import seaborn as sns
import matplotlib.pyplot as plt

class NasdaqApp:
    def __init__(self, csv_path):
        self.nasdaq_data = NasdaqDF(csv_path=csv_path, analysis_period="1y")
        self.companies_df = self.nasdaq_data.load_companies_df()
        self.stock_df = self.nasdaq_data.load_stock_df()
        self.merged_df = self.nasdaq_data.merge_dataframes(self.stock_df, self.companies_df)
        self.analysis = NasdaqAnalysis(self.stock_df)

    def run(self):
        st.title("NASDAQ Analysis App")

        # General Statistics
        st.subheader("General Statistics")
        st.write(self.analysis.descriptive_statistics())
        
        # Buttons for different analyses
        if st.button("Count Observations"):
            st.write("Number of Observations:", self.analysis.count_observations())

        if st.button("Count Missing Values"):
            st.write(self.analysis.count_missing_values())

        if st.button("Calculate Correlation"):
            correlations = self.analysis.calculate_correlation()
            st.write(correlations)

        if st.button("Display Stock Bounds (e.g., AAPL)"):
            stock_ticker = st.text_input("Enter Stock Ticker", value='AAPL')
            st.write(self.analysis.display_stock_bounds(stock_ticker))

        # Visualizations with Seaborn or Matplotlib
        if st.button("Show Daily Returns"):
            df_daily_return = self.analysis.calculate_daily_return()
            plt.figure(figsize=(10, 5))
            sns.lineplot(data=df_daily_return, x='date', y='daily_return', label='Daily Return')
            plt.title('Daily Return')
            plt.xlabel('Date')
            plt.ylabel('Return Rate')
            st.pyplot(plt)

        # Add more buttons for other analyses and visualizations as needed

# To run the app
if __name__ == "__main__":
    app = NasdaqApp(csv_path="nasdaq_100_list.csv")
    app.run()

2024-11-02 15:47:51,324 - MyStockApp Logger - INFO - Beginning download of companies Dataframe
2024-11-02 15:47:52,017 - MyStockApp Logger - INFO - Companies DataFrame loaded successfully for 101 distinct tickers
2024-11-02 15:47:52,018 - MyStockApp Logger - INFO - Beginning download of stock Dataframe
2024-11-02 15:48:16,301 - MyStockApp Logger - INFO - Stock DataFrame loaded successfully.
2024-11-02 15:48:16,303 - MyStockApp Logger - INFO - Merging stock and companies Dataframe
2024-11-02 15:48:16,309 - MyStockApp Logger - INFO - Merged stock and companies DataFrame successfully.
2024-11-02 15:48:16.582 
  command:

    streamlit run /opt/conda/lib/python3.11/site-packages/ipykernel_launcher.py [ARGUMENTS]
2024-11-02 15:48:16,584 - MyStockApp Logger - INFO - Calculating descriptive statistics for numeric columns.


In [None]:
app_instance = App.get_instance()
logger = app_instance.get_logger()

st.title("Stock Analysis Dashboard")

if st.button("Load Data"):

        if stock_df is not None and companies_df is not None:
            merged_df = nasdaq_df.merge_dataframes(stock_df, companies_df)
            st.success("Data loaded successfully!")

            # Example analysis: Display first few rows of merged data
            st.subheader("Merged Data Preview")
            st.write(merged_df.limit(10).toPandas())

            # Create an instance of NasdaqAnalysis
            analysis = NasdaqAnalysis(merged_df)

            # Display statistics
            if st.button("Show Descriptive Statistics"):
                stats = analysis.descriptive_statistics()
                st.write(stats)

            # Functionality for displaying stock bounds
            ticker = st.selectbox("Select Ticker", options=nazdaq_df.tickers)
            if st.button("Show Stock Bounds"):
                analysis.display_stock_bounds(ticker)

            # Calculate correlation pairs
            if st.button("Calculate Correlation Pairs"):
                correlation_df = analysis.calculate_correlation_pairs()
                st.write(correlation_df.toPandas())

        else:
            st.error("Failed to load stock data.")
    else:
        st.warning("Please provide both CSV path and analysis period.")
