In [1]:
import sys
import os
# Get the absolute path of the project root
project_root = os.path.abspath("..")  # Adjust if necessary

# Add the src directory to Python's path
sys.path.append(os.path.join(project_root, "src"))

if project_root not in sys.path:
    sys.path.append(project_root)

import pyspark as ps
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import *
from pyspark.sql.types import StringType, BooleanType
from src.models.fine_tuning import * 
from pyspark import SparkContext

sc = SparkContext.getOrCreate()
sc.addPyFile("../src.zip")
from src.models.inference import apply_inference_to_dataframe

from src.data.historic_price_collector import collect_historical_data
import pandas as pd

2025-04-11 14:48:02.846384: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2025-04-11 14:48:02.854769: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:467] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1744375682.863804   36834 cuda_dnn.cc:8579] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1744375682.866435   36834 cuda_blas.cc:1407] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
W0000 00:00:1744375682.873249   36834 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking 

In [2]:
spark = (
    SparkSession.builder
        .master("local")
        .appName("Stock Analysis")
        .getOrCreate()
)

In [3]:
analyst_nlp = pd.read_parquet("../data/processed/processed_news_analyst.parquet")

In [4]:
analyst_nlp.describe()

In [3]:
nlp_analyst = spark.read.parquet("../data/processed/processed_news_analyst.parquet")

In [4]:
nlp_analyst.describe()

DataFrame[summary: string, id: string, raw_text: string, date: string, stock: string, final_text: string]

In [5]:
nlp_analyst = nlp_analyst.withColumn("date", col("date").cast("date"))

In [6]:
nlp_analyst.describe()

DataFrame[summary: string, id: string, raw_text: string, stock: string, final_text: string]

In [7]:
nlp_analyst.show(10)

+---+--------------------+----------+-----+--------------------+--------------------+
| id|            raw_text|      date|stock|     finished_tokens|          final_text|
+---+--------------------+----------+-----+--------------------+--------------------+
|  0|Stocks That Hit 5...|2020-06-05|    A|[stock, hit, 52we...|stock hit 52week ...|
|  1|Stocks That Hit 5...|2020-06-03|    A|[stock, hit, 52we...|stock hit 52week ...|
|  2|71 Biggest Movers...|2020-05-26|    A|[71, big, mover, ...| 71 big mover friday|
|  3|46 Stocks Moving ...|2020-05-22|    A|[46, stock, move,...|46 stock move fri...|
|  4|B of A Securities...|2020-05-22|    A|[b, security, mai...|b security mainta...|
|  5|CFRA Maintains Ho...|2020-05-22|    A|[cfra, maintain, ...|cfra maintain hol...|
|  6|UBS Maintains Neu...|2020-05-22|    A|[ubs, maintain, n...|ubs maintain neut...|
|  7|Agilent Technolog...|2020-05-22|    A|[agilent, technol...|agilent technolog...|
|  8|Wells Fargo Maint...|2020-05-22|    A|[well, farg

In [9]:
ticker_ranges = nlp_analyst.groupBy("stock").agg(
    min("date").alias("start_date"),
    max("date").alias("end_date")
)

In [10]:
ticker_ranges.show(truncate=False)

+-----+----------+----------+
|stock|start_date|end_date  |
+-----+----------+----------+
|ACFN |2009-11-09|2015-11-16|
|ADEP |2010-04-19|2015-10-09|
|ALXN |2016-03-03|2020-06-08|
|AWAY |2011-05-31|2020-02-14|
|CCK  |2009-11-18|2020-05-16|
|CRS  |2009-08-13|2020-06-04|
|CYNI |2013-05-09|2015-06-16|
|EFR  |2011-04-05|2020-03-19|
|EIO  |2018-09-24|2018-10-24|
|EMCR |2012-10-17|2017-05-10|
|ESSX |2010-03-30|2015-12-08|
|FAV  |2011-07-12|2014-07-10|
|FINU |2012-07-12|2016-07-26|
|FMY  |2018-08-02|2020-03-09|
|GIS  |2009-08-07|2020-06-11|
|HBNC |2010-04-08|2020-05-12|
|HWAY |2009-10-14|2017-01-06|
|K    |2009-07-16|2020-06-11|
|LEN  |2009-09-21|2020-06-11|
|MHF  |2011-05-05|2020-03-12|
+-----+----------+----------+
only showing top 20 rows



In [11]:
ticker_df = ticker_ranges.toPandas()

In [13]:
output_path = "../data/raw/price_data" 

### Collection all the historical price movement for the tickers present in the news df within the dates of news also present in the df

In [15]:
for _, row in ticker_df.iterrows():
    ticker = row["stock"]
    start_date = row["start_date"].strftime('%Y-%m-%d')
    end_date = row["end_date"].strftime('%Y-%m-%d')

    # Call the collector directly for one ticker
    collect_historical_data(
        tickers=[ticker],
        start_date=start_date,
        end_date=end_date,
        output_path=output_path,
        interval="1d"  # Or another, like "1wk"
    )

2025-04-11 15:16:52 INFO: Downloading data for ACFN from 2009-11-09 to 2015-11-16...


YF.download() has changed argument auto_adjust default to True


[*********************100%***********************]  1 of 1 completed
2025-04-11 15:16:54 INFO: Successfully downloaded data for ACFN.
2025-04-11 15:16:54 INFO: Data for ACFN saved to ../data/raw/price_data/ACFN_historical.csv.
2025-04-11 15:16:54 INFO: Downloading data for ADEP from 2010-04-19 to 2015-10-09...
[*********************100%***********************]  1 of 1 completed
2025-04-11 15:16:54 ERROR: 
1 Failed download:
2025-04-11 15:16:54 ERROR: ['ADEP']: YFPricesMissingError('possibly delisted; no price data found  (1d 2010-04-19 -> 2015-10-09)')
2025-04-11 15:16:54 INFO: Downloading data for ALXN from 2016-03-03 to 2020-06-08...
[*********************100%***********************]  1 of 1 completed
2025-04-11 15:16:57 ERROR: 
1 Failed download:
2025-04-11 15:16:57 ERROR: ['ALXN']: YFTzMissingError('possibly delisted; no timezone found')
2025-04-11 15:16:57 INFO: Downloading data for AWAY from 2011-05-31 to 2020-02-14...
[*********************100%***********************]  1 of 1 co