# It is not stable, tried a lot but don't know why! 
# Spark=3.5.2, pySpark=3.5.2, awssdk=2.24.8
# Don't waste your time anymore!
# Everytime you restart container, you do better close the browser and restart JupyterLab UI again to avoid the issues caused by caching!!!

In [59]:
import os
import yaml
import nbimporter
from datetime import datetime, date
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType,  DateType, TimestampType
from pyspark.sql.functions import to_date, to_timestamp

from lab_schema_manager import SchemaManager
import yfinance as yf
from lab_pg_database_manager import PGDatabaseManager
from lab_spark import create_spark_session
from lab_raw_yfinance import RawYFinance
import time
import random

In [60]:
def fetch_yfinance_record(symbol_date_pair):
    try:
     
        symbol, start_date = symbol_date_pair
        # Fetch stock data using yfinance
        quote = yf.Ticker(symbol)
        current_date = date.today()
        hist = quote.history(start=start_date, end=current_date)

        # Reset index to include Date as a column and format it
        hist.reset_index(inplace=True)
        hist["Date"] = hist["Date"].dt.date
        
        # limit and stablize the fields of hist
        hist = hist[['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Dividends', 'Stock Splits']]
        
        # Add symbol and import_time to each row
        record_list = [
            tuple(row) + (symbol, import_time) for row in hist.itertuples(index=False)
        ]
        
        random_sleep_time = random.uniform(0.1, 0.9)
        time.sleep(random_sleep_time)
     
        return record_list

    except Exception as e:
        print(f"Error fetching data for {symbol}: {e}")
        return []  # Return an empty list on error

In [61]:
def parallel_fetch_yfinance_record(symbol_date_pairs, record_schema):
    try:
        # Distribute (symbol, start_date) pairs across Spark workers
        record_rdd = spark.sparkContext.parallelize(symbol_date_pairs)
        
        # Fetch data in parallel
        mapped_record_rdd = record_rdd.flatMap(fetch_yfinance_record)
        
        # if fetch_yfinance_record is updated to fetch_yfinance_record(pair, equity_type)(pair, equity_type)
        # method 1:
        # mapped_record_rdd = record_rdd.flatMap(lambda pair: fetch_yfinance_record(pair, equity_type))
        # # Partially apply the equity_type to fetch_with_equity_type
        # method 2:
        # fetch_function = partial(fetch_with_equity_type, equity_type=equity_type)
        # # Fetch data in parallel using the wrapper
        # mapped_record_rdd = record_rdd.flatMap(fetch_function)
        
        # Convert RDD to DataFrame
        result_df = spark.createDataFrame(mapped_record_rdd, schema=record_schema)  

        return result_df
    except Exception as e:
        print(f"Error paralleling fetch: {e}")
        return spark.createDataFrame([])

In [62]:
def load_iceberg_raw_eod_yfinance(symbol_date_pairs, iceberg_sink_table, schema_config_file):
    try: 
        schema_manager=SchemaManager(schema_config_file)
        schema_struct_type=schema_manager.get_struct_type("tables", iceberg_sink_table)  
        # print(schema_struct_type)
        
        create_table_script = schema_manager.get_create_table_query("tables", iceberg_sink_table)
        # print(create_table_script)
        spark.sql(create_table_script)

        df_source=parallel_fetch_yfinance_record(symbol_date_pairs, schema_struct_type)        
        # df_source.writeTo(iceberg_sink_table).append()
        df_source.write.mode("overwrite").saveAsTable(iceberg_sink_table)    

        print(f"{iceberg_sink_table} has been loaded")
        
    except Exception as e:
        print(f"Error loading lceberg raw table: {e}")
        

In [63]:
def load_pg_finalytics(iceberg_source_table, pg_url, pg_driver, pg_sink_table, pg_truncate_script, pg_merge_script):   
    try:     
        df_source=spark.read.table(iceberg_source_table)          
        finalytics.execute_sql_script(pg_truncate_script)
        
        # Write DataFrame to PostgreSQL
        df_source.write.jdbc(url=pg_url, table=pg_sink_table, mode="append", properties={"driver": pg_driver})        
        finalytics.execute_sql_script(pg_merge_script)
        
    except Exception as e:
        print(f"Error loading pg finalytics: {e}")        


## Load nessie.raw.stock_eod_yfinance

In [None]:
# symbol_start_date_pairs = [
#     ('TSLA', '2024-11-20'),
#     ('AAPL', '2024-11-20'),
#     ('C', '2024-11-20'),
# ]

# Get finalytics db connetion info
conn_config_file='cfg_connections.yaml'
finalytics=PGDatabaseManager(conn_config_file, 'finalytics')
pg_url=finalytics.jdbc_url
pg_driver=finalytics.driver

# Get symbol_start_date_pairs from finalytics db
# query="select symbol, start_date from fin.vw_etl_stock_eod_start_date limit 2500"
query="select symbol, start_date from fin.vw_etl_stock_eod_start_date"
symbol_start_date_pairs=finalytics.get_sql_script_result_list(query)
# print(symbol_start_date_pairs)


# Get iceberg table config info
table_schema_config_file='cfg_table_schemas.yaml'
iceberg_table='nessie.raw.stock_eod_yfinance'

# Set global import_time
import_time = datetime.now().isoformat()


# Create Spark Session
connection_config_file="cfg_connections.yaml"
spark_app_name="raw_yfinance"
spark=create_spark_session(connection_config_file, spark_app_name)
spark.sql("CREATE NAMESPACE IF NOT EXISTS nessie.raw;")

# Load nessie.raw.stock_eod_yfinance 
load_iceberg_raw_eod_yfinance(symbol_start_date_pairs, iceberg_table, table_schema_config_file)


[Stage 22:>                                                         (0 + 4) / 4]

## Load finalytics.stage.stock_eod_quote_yahoo and merge into fin.stock_eod_quote

In [None]:
pg_table = "stage.stock_eod_quote_yahoo"  # Replace with the PostgreSQL table name

pg_truncate_script = f"truncate table {pg_table};"

pg_merge_script = "call fin.usp_load_stock_eod();"

load_pg_finalytics(iceberg_table, pg_url, pg_driver, pg_table,  pg_truncate_script, pg_merge_script)

In [None]:
spark.sql(f'select count(*) from {iceberg_table}').show()

In [None]:
spark.sql('select * from nessie.raw.stock_eod_yfinance limit 10').show(truncate=False)

In [None]:
print(len(symbol_start_date_pairs))

In [None]:
quote = yf.Ticker('CLF')
current_date = date.today()
hist = quote.history(start='2025-01-03', end=current_date)
print(hist)

In [None]:
symbol_date_pair=('CLF', '2025-01-03')

fetch_yfinance_record(symbol_date_pair)