In [1]:
import os
import yaml
import nbimporter
from datetime import datetime, date
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType,  DateType, TimestampType
from pyspark.sql.functions import to_date, to_timestamp
from lab_table_manager import TableManager
import yfinance as yf
import time
import random
from lab_pg_database_manager import PGDatabaseManager

In [2]:
with open("cfg_connections.yaml","r") as file:
    config=yaml.safe_load(file)
    catalog_uri = config['docker_env']['catalog_uri'] 
    warehouse = config['docker_env']['warehouse']     # Minio Address to Write to
    storage_uri = config['docker_env']['storage_uri'] # Minio IP address from docker inspec

# Configure Spark with necessary packages and Iceberg/Nessie settings
conf = (
    pyspark.SparkConf()
        .setAppName('finalytics_app')
        # Include necessary packages
        .set('spark.jars.packages',
             'org.postgresql:postgresql:42.7.3,'
             'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.0,'
             'org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.77.1,'
             'software.amazon.awssdk:bundle:2.24.8,'
             'software.amazon.awssdk:url-connection-client:2.24.8')
        # Enable Iceberg and Nessie extensions
        .set('spark.sql.extensions', 
             'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,'
             'org.projectnessie.spark.extensions.NessieSparkSessionExtensions')
        # Configure Nessie catalog
        .set('spark.sql.catalog.nessie', 'org.apache.iceberg.spark.SparkCatalog')
        .set('spark.sql.catalog.nessie.uri', catalog_uri)
        .set('spark.sql.catalog.nessie.ref', 'main')
        .set('spark.sql.catalog.nessie.authentication.type', 'NONE')
        .set('spark.sql.catalog.nessie.catalog-impl', 'org.apache.iceberg.nessie.NessieCatalog')
        # Set Minio as the S3 endpoint for Iceberg storage
        .set('spark.sql.catalog.nessie.s3.endpoint', storage_uri)
        .set('spark.sql.catalog.nessie.warehouse', warehouse)
        .set('spark.sql.catalog.nessie.io-impl', 'org.apache.iceberg.aws.s3.S3FileIO')       
)   

# Start Spark session
spark = SparkSession.builder.config(conf=conf).getOrCreate()  
# Create the "raw" namespace
spark.sql("CREATE NAMESPACE IF NOT EXISTS nessie.raw;")



:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.postgresql#postgresql added as a dependency
org.apache.iceberg#iceberg-spark-runtime-3.5_2.12 added as a dependency
org.projectnessie.nessie-integrations#nessie-spark-extensions-3.5_2.12 added as a dependency
software.amazon.awssdk#bundle added as a dependency
software.amazon.awssdk#url-connection-client added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-62283837-6afd-4e53-9051-7102958a1c83;1.0
	confs: [default]
	found org.postgresql#postgresql;42.7.3 in central
	found org.checkerframework#checker-qual;3.42.0 in central
	found org.apache.iceberg#iceberg-spark-runtime-3.5_2.12;1.5.0 in central
	found org.projectnessie.nessie-integrations#nessie-spark-extensions-3.5_2.12;0.77.1 in central
	found software.amazon.awssdk#bundle;2.24.8 in central
	found software.amazon.awssdk#url-connection-client;2.24.8 in central
	found software.amazon.awssdk#utils;2.24

DataFrame[]

In [3]:
def fetch_yfinance_record(symbol_date_pairs):
    try:
        symbol, start_date = symbol_date_pairs
        # Fetch stock data using yfinance
        quote = yf.Ticker(symbol)
        current_date = date.today()
        hist = quote.history(start=start_date, end=current_date)

        # Reset index to include Date as a column and format it
        hist.reset_index(inplace=True)
        hist["Date"] = hist["Date"].dt.date
        
        # limit and stablize the fields of hist
        hist = hist[['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Dividends', 'Stock Splits']]
        
        # Add symbol and import_time to each row
        record_list = [
            tuple(row) + (symbol, import_time) for row in hist.itertuples(index=False)
        ]
        random_sleep_time = random.uniform(0.1, 0.9)
        time.sleep(random_sleep_time)

        # print(record_list)
        return record_list

    except Exception as e:
        print(f"Error fetching data for {symbol}: {e}")
        return []  # Return an empty list on error

In [4]:
def parallel_fetch_yfinance_record(symbol_date_pairs, record_schema):
    try:
        # Distribute (symbol, start_date) pairs across Spark workers
        record_rdd = spark.sparkContext.parallelize(symbol_date_pairs)

        
        # Fetch data in parallel
        mapped_record_rdd = record_rdd.flatMap(fetch_yfinance_record)

        # Convert RDD to DataFrame
        result_df = spark.createDataFrame(mapped_record_rdd, schema=record_schema)
        # result_df = spark.createDataFrame(mapped_record_rdd)

        # Show or save the results
        # result_df.show()
        return result_df
    except Exception as e:
        print(f"Error paralleling fetch: {e}")
        return spark.createDataFrame([])

In [5]:
def load_iceberg_raw_eod_yfinance(symbol_date_pairs, iceberg_sink_table, schema_config_file):
    try: 
        table_manager=TableManager(schema_config_file)
        schema_struct_type=table_manager.get_struct_type(iceberg_sink_table)   

        # print(schema_struct_type)
        create_table_script = table_manager.get_create_table_query(iceberg_sink_table)
        spark.sql(create_table_script)

        df_source=parallel_fetch_yfinance_record(symbol_date_pairs, schema_struct_type)
        
        # df_source.writeTo(iceberg_sink_table).append()
        df_source.write.mode("overwrite").saveAsTable(iceberg_sink_table)    
        # df_source.writeTo(iceberg_sink_table).overwritePartitions() 

        print(f"{iceberg_sink_table} has been loaded")
        
    except Exception as e:
        print(f"Error loading lceberg raw table: {e}")
        

In [6]:
def load_pg_finalytics(iceberg_source_table, pg_url, pg_driver, pg_sink_table, pg_truncate_script, pg_merge_script):   
    try:     
        df_source=spark.read.table(iceberg_source_table)          
        finalytics.execute_sql_script(pg_truncate_script)
        
        # Write DataFrame to PostgreSQL
        df_source.write.jdbc(url=pg_url, table=pg_sink_table, mode="append", properties={"driver": pg_driver})        
        finalytics.execute_sql_script(pg_merge_script)
    except Exception as e:
        print(f"Error loading pg finalytics: {e}")        


## Load nessie.raw.stock_eod_yfinance

In [7]:
# symbol_start_date_pairs = [
#     ('PRMW', '2024-12-20'),
#     ('CATC', '2024-12-20'),
#     ('SQSP', '2024-12-20'),
# ]

# Get finalytics connetion info
conn_config_file='cfg_connections.yaml'
finalytics=PGDatabaseManager(conn_config_file, 'finalytics')
pg_url=finalytics.jdbc_url
pg_driver=finalytics.driver

# Get symbol_start_date_pairs from finalytics
query="select symbol, start_date from fin.vw_etl_stock_eod_start_date limit 5"
# query="select symbol, start_date from fin.vw_etl_stock_eod_start_date"
symbol_start_date_pairs=finalytics.get_sql_script_result_list(query)
# print(symbol_start_date_pairs)


# Get iceberg table config info
table_schema_config_file='cfg_table_schemas.yaml'
iceberg_table='nessie.raw.stock_eod_yfinance'

# Set global import_time
import_time = datetime.now().isoformat()

# Load nessie.raw.stock_eod_yfinance 
load_iceberg_raw_eod_yfinance(symbol_start_date_pairs, iceberg_table, table_schema_config_file)


SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
$NYCB: possibly delisted; no timezone found                         (0 + 4) / 4]
Error fetching data for NYCB: Can only use .dt accessor with datetimelike values
                                                                                

nessie.raw.stock_eod_yfinance has been loaded


## Load finalytics.stage.stock_eod_quote_yahoo and merge into fin.stock_eod_quote

In [8]:
pg_table = "stage.stock_eod_quote_yahoo"  # Replace with the PostgreSQL table name

pg_truncate_script = f"truncate table {pg_table};"

pg_merge_script = "call fin.usp_load_stock_eod();"

load_pg_finalytics(iceberg_table, pg_url, pg_driver, pg_table,  pg_truncate_script, pg_merge_script)

                                                                                

In [9]:
spark.sql(f'select count(*) from {iceberg_table}').show()

+--------+
|count(1)|
+--------+
|       4|
+--------+



In [10]:
spark.sql('select * from nessie.raw.stock_eod_yfinance').show(truncate=False)

+----------+-----+-----+------+-----+--------+---------+------------+------+--------------------------+
|date      |open |high |low   |close|volume  |dividends|stock_splits|symbol|import_time               |
+----------+-----+-----+------+-----+--------+---------+------------+------+--------------------------+
|2024-11-14|1.9  |1.725|1.725 |1.725|129682  |0.0      |0.0         |EIGR  |2025-01-04T22:55:14.743363|
|2024-11-26|44.66|44.72|44.565|44.63|1429608 |0.0      |0.0         |VSTO  |2025-01-04T22:55:14.743363|
|2024-11-21|29.11|29.37|28.43 |28.55|36027438|0.0      |0.0         |MRO   |2025-01-04T22:55:14.743363|
|2024-12-10|12.86|14.08|12.42 |12.42|6127    |0.0      |0.0         |THCPU |2025-01-04T22:55:14.743363|
+----------+-----+-----+------+-----+--------+---------+------------+------+--------------------------+



In [11]:
# print(symbol_start_date_pairs)

In [12]:
symbol="c"
start_date ="2024-1-1"
# Fetch stock data using yfinance
quote = yf.Ticker(symbol)
current_date = date.today()
hist = quote.history(start=start_date, end=current_date)

# Reset index to include Date as a column and format it
hist.reset_index(inplace=True)
print(hist)
# hist["Date"] = hist["Date"].dt.date
# print(hist)

# # limit and stablize the fields of hist
# hist = hist[['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Dividends', 'Stock Splits']]

# # Add symbol and import_time to each row
# record_list = [
#     tuple(row) + (symbol, import_time) for row in hist.itertuples(index=False)
# ]

# # print(record_list)
# print(record_list)

                         Date       Open       High        Low      Close  \
0   2024-01-02 00:00:00-05:00  49.458668  51.242268  49.352616  51.136215   
1   2024-01-03 00:00:00-05:00  51.338675  52.418473  50.355286  51.714676   
2   2024-01-04 00:00:00-05:00  51.984625  52.784836  51.733959  51.840012   
3   2024-01-05 00:00:00-05:00  52.090682  52.707710  51.955708  52.379917   
4   2024-01-08 00:00:00-05:00  52.052121  52.148530  51.116936  52.071400   
..                        ...        ...        ...        ...        ...   
249 2024-12-27 00:00:00-05:00  70.860001  71.529999  70.540001  71.000000   
250 2024-12-30 00:00:00-05:00  70.180000  70.830002  69.790001  70.389999   
251 2024-12-31 00:00:00-05:00  70.550003  70.959999  70.150002  70.389999   
252 2025-01-02 00:00:00-05:00  70.940002  71.160004  69.650002  69.940002   
253 2025-01-03 00:00:00-05:00  70.879997  71.089996  69.849998  71.000000   

       Volume  Dividends  Stock Splits  
0    24784900        0.0          

In [13]:
import yfinance as yf
from datetime import date
import time
import random
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, DateType, DoubleType, StringType

# Define the schema for the DataFrame
record_schema = StructType([
    StructField("Date", DateType(), True),
    StructField("Open", DoubleType(), True),
    StructField("High", DoubleType(), True),
    StructField("Low", DoubleType(), True),
    StructField("Close", DoubleType(), True),
    StructField("Volume", DoubleType(), True),
    StructField("Dividends", DoubleType(), True),
    StructField("Stock Splits", DoubleType(), True),
    StructField("Symbol", StringType(), True),
    StructField("ImportTime", StringType(), True)
])

def xfetch_yfinance_record(symbol_date_pair):
    try:
        symbol, start_date = symbol_date_pair
        # Fetch stock data using yfinance
        quote = yf.Ticker(symbol)
        current_date = date.today()
        hist = quote.history(start=start_date, end=current_date)

        # Reset index to include Date as a column and format it
        hist.reset_index(inplace=True)
        hist["Date"] = hist["Date"].dt.date
        
        # Limit and stabilize the fields of hist
        hist = hist[['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Dividends', 'Stock Splits']]
        print(hist)
        
        # Add symbol and import_time to each row
        import_time = str(date.today())
        record_list = [
            tuple(row) + (symbol, import_time) for row in hist.itertuples(index=False)
        ]
        
        random_sleep_time = random.uniform(0.1, 0.9)
        time.sleep(random_sleep_time)
     
        return record_list

    except Exception as e:
        print(f"Error fetching data for {symbol}: {e}")
        return []  # Return an empty list on error

def xparallel_fetch_yfinance_record(symbol_date_pairs, record_schema):
    try:
        # Initialize Spark session

        connection_config_file="cfg_connections.yaml"
        spark_app_name="YFinanceDataFetcher"
        spark=create_spark_session(connection_config_file, spark_app_name)
        
        # spark = SparkSession.builder.appName("YFinanceDataFetcher").getOrCreate()
        
        # Distribute (symbol, start_date) pairs across Spark workers
        record_rdd = spark.sparkContext.parallelize(symbol_date_pairs)
        
        # Fetch data in parallel using mapPartitions to avoid broadcasting Spark session
        mapped_record_rdd = record_rdd.mapPartitions(lambda partition: [record for pair in partition for record in xfetch_yfinance_record(pair)])
        
        # Convert RDD to DataFrame on the driver node
        result_df = spark.createDataFrame(mapped_record_rdd, schema=record_schema)  

        return result_df
    except Exception as e:
        print(f"Error paralleling fetch: {e}")
        return spark.createDataFrame([], schema=record_schema)
    finally:
        spark.stop()

# Example usage
symbol_date_pairs = [("AAPL", "2023-01-01"), ("GOOGL", "2023-01-01")]
result_df = xparallel_fetch_yfinance_record(symbol_date_pairs, record_schema)
# result_df.show()