In [0]:
#01_data_ingestion
#ingests data from yfinance and store it inside 

#dependencies installation
%pip install yfinance

#imports
import yfinance as yf
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp, date_format, from_utc_timestamp
import datetime



In [0]:
#Widgets

#Tickers
dbutils.widgets.text("tickers", "AAPL,MSFT,TSLA", "Enter Tickers (comma-separated)")

#Start/End Dates
dbutils.widgets.text("start_date", "2025-01-01", "Enter Start Date (YYYY-MM-DD)")
dbutils.widgets.text("end_date", "2025-09-20", "Enter End Date (YYYY-MM-DD)")

#Intervals
valid_intervals = ["1m", "5m", "15m", "30m", "1h", "1d", "1wk", "1mo"]
dbutils.widgets.dropdown("interval", "1d", valid_intervals, "Select Interval")

In [0]:
spark = SparkSession.builder.getOrCreate()
path = "/Volumes/workspace/default/yfinance-stock-data"

#Widget Data
#Tickers
tickers = dbutils.widgets.get("tickers").split(",")

#Start/End Date
start_date = dbutils.widgets.get("start_date")
end_date = dbutils.widgets.get("end_date")

#Validate Date Inputs
for d in [start_date, end_date]:
    try:
        datetime.datetime.strptime(d, "%Y-%m-%d")
    except ValueError:
        raise ValueError(f"Invalid date format: {d}. Use YYYY-MM-DD")

#Interval
interval = dbutils.widgets.get("interval")

all_data = []

for t in tickers:
    ticker = t.strip()

    #Pass in parameters to download
    df = yf.download(
        ticker,
        start=start_date,
        end=end_date,
        interval=interval,
        auto_adjust=True
    )
    
    df.reset_index(inplace=True)

    if isinstance(df.columns, pd.MultiIndex):
        df.columns = ["_".join([str(i) for i in col if i]) for col in df.columns.values]
    
    rename_map = {}
    for c in df.columns:
        lower = c.lower()
        if "open" in lower:
            rename_map[c] = "Open"
        elif "high" in lower:
            rename_map[c] = "High"
        elif "low" in lower:
            rename_map[c] = "Low"
        elif "close" in lower:
            rename_map[c] = "Close"
        elif "volume" in lower:
            rename_map[c] = "Volume"
        elif "date" in lower or "datetime" in lower:
            rename_map[c] = "Datetime"
    df.rename(columns=rename_map, inplace=True)
    df["ticker"] = ticker
    df = df[["ticker", "Datetime", "Open", "High", "Low", "Close", "Volume"]]
    
    all_data.append(df)

long_pdf = pd.concat(all_data, ignore_index=True)
sdf = spark.createDataFrame(long_pdf) \
    .withColumn("Updated_On", date_format(from_utc_timestamp(current_timestamp(), "Asia/Singapore"), "yyyy-MM-dd HH:mm:ss"))

#drop delta, for testing
# dbutils.fs.rm(path, True) 

#save as delta
sdf.write.format("delta") \
    .mode("append") \
    .save(path)


In [0]:
#Create view to verify data ingestion
# sdf.createOrReplaceTempView("stock_data")

# spark.sql("""
#     SELECT 
#         -- ticker, 
#         -- Datetime, 
#         -- Close
#         -- MAX(Datetime)
#         *
#     FROM 
#         stock_data
#     WHERE TRUE 
#         AND ticker IN ('MSFT','AAPL')
#         AND Datetime >= '2025-09-10'
#         -- AND ticker LIKE '%AAPL%'
#     ORDER BY 
#         Datetime DESC
# """).show(20)

#Select from Delta
spark.sql("""
            SELECT 
                * 
            FROM 
                delta.`/Volumes/workspace/default/yfinance-stock-data`
            WHERE TRUE
                AND Close IS NOT NULL
          """).show(100)

In [0]:
%sql
SELECT DISTINCT
  *
FROM 
  delta.`/Volumes/workspace/default/yfinance-stock-data`
WHERE TRUE
  AND Close IS NOT NULL
QUALIFY
  ROW_NUMBER() OVER (PARTITION BY TICKER, DATETIME ORDER BY UPDATED_ON) = 1