# Capital Markets Workflow (Simplified)

In [1]:
# 02/24/2025 12:08:00

# starting date with 1m data
# 2025-02-18 09:30:00-05:00

In [2]:
# Install packages
%pip install -U --quiet yfinance pandas 
%pip install -U --quiet langgraph langchain-community langchain-aws langchain-mongodb
%pip install -U --quiet pymongo
%pip install -U --quiet python-dotenv

Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


## Set environment variables

In [3]:
from dotenv import load_dotenv
import os

# Load environment variables
load_dotenv()

# Set environment variables
MONGODB_URI = os.getenv("MONGODB_URI")
AWS_REGION = os.getenv("AWS_REGION")
AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID")
AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY")

# Set database name
DATABASE_NAME = "agentic_capital_markets"


## Getting Market Data from yfinance

### yfinance documentation
https://ranaroussi.github.io/yfinance/reference/

Yahoo!, Y!Finance, and Yahoo! finance are registered trademarks of Yahoo, Inc.

yfinance is not affiliated, endorsed, or vetted by Yahoo, Inc. It’s an open-source tool that uses Yahoo’s publicly available APIs, and is intended for research and educational purposes.

You should refer to Yahoo!’s terms of use (here), (here), and (here) for details on your rights to use the actual data downloaded. Remember - the Yahoo! finance API is intended for personal use only.


## Important note

### $SPY: possibly delisted; no price data found  (period=2y) (Yahoo error = "1m data not available for startTime=1676894054 and endTime=1739966054. Only 8 days worth of 1m granularity data are allowed to be fetched per request.")

In [4]:
# Import libraries
import pandas as pd
import yfinance as yf

https://stackoverflow.com/questions/76605981/can-not-download-6-months-tsla-with-interval-of-1-minute-in-yfinance

### Getting equities data

In [5]:
HISTORIAL_START_DATE = "2024-01-01"
HISTORIAL_END_DATE = "2025-02-18"

In [6]:
EQUITIES = 'SPY QQQ EEM XLE'

equities = yf.Tickers(EQUITIES)

# period='max' equals to 1 week

spy = equities.tickers['SPY'].history(start=HISTORIAL_START_DATE, end=HISTORIAL_END_DATE, interval='1h').drop(columns=['Dividends', 'Stock Splits', 'Capital Gains'])
qqq = equities.tickers['QQQ'].history(start=HISTORIAL_START_DATE, end=HISTORIAL_END_DATE, interval='1h').drop(columns=['Dividends', 'Stock Splits', 'Capital Gains'])
eem = equities.tickers['EEM'].history(start=HISTORIAL_START_DATE, end=HISTORIAL_END_DATE, interval='1h').drop(columns=['Dividends', 'Stock Splits', 'Capital Gains'])
xle = equities.tickers['XLE'].history(start=HISTORIAL_START_DATE, end=HISTORIAL_END_DATE, interval='1h').drop(columns=['Dividends', 'Stock Splits', 'Capital Gains'])

spy.tail(5)

Unnamed: 0_level_0,Open,High,Low,Close,Volume
Datetime,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2025-02-14 11:30:00-05:00,610.01001,610.294983,609.27002,609.320129,2764489
2025-02-14 12:30:00-05:00,609.349976,610.090027,609.070007,609.929871,1443250
2025-02-14 13:30:00-05:00,609.929993,610.750671,609.880005,610.130005,2059667
2025-02-14 14:30:00-05:00,610.150024,610.302429,609.590027,609.669983,2430485
2025-02-14 15:30:00-05:00,609.659973,610.289978,609.544983,609.700012,5528441


In [7]:
spy.head(5)


Unnamed: 0_level_0,Open,High,Low,Close,Volume
Datetime,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2024-01-02 09:30:00-05:00,472.540009,472.799988,471.359985,471.76001,19608168
2024-01-02 10:30:00-05:00,471.769989,472.450012,471.299988,472.329987,9085308
2024-01-02 11:30:00-05:00,472.320007,473.359985,472.269989,472.975006,9373953
2024-01-02 12:30:00-05:00,472.954987,473.670013,472.279999,472.660492,7394613
2024-01-02 13:30:00-05:00,472.660004,472.959686,471.545013,471.880096,9946269


### Getting Bonds (U.S. Treasuries) data

In [8]:
BONDS = 'TLT LQD HYG'

bonds = yf.Tickers(BONDS)

tlt = bonds.tickers['TLT'].history(start=HISTORIAL_START_DATE, end=HISTORIAL_END_DATE, interval='1h').drop(columns=['Dividends', 'Stock Splits', 'Capital Gains'])
lqd = bonds.tickers['LQD'].history(start=HISTORIAL_START_DATE, end=HISTORIAL_END_DATE, interval='1h').drop(columns=['Dividends', 'Stock Splits', 'Capital Gains'])
hyg = bonds.tickers['HYG'].history(start=HISTORIAL_START_DATE, end=HISTORIAL_END_DATE, interval='1h').drop(columns=['Dividends', 'Stock Splits', 'Capital Gains'])

tlt.tail(5)

Unnamed: 0_level_0,Open,High,Low,Close,Volume
Datetime,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2025-02-14 11:30:00-05:00,89.644997,89.675003,89.239998,89.245003,3003815
2025-02-14 12:30:00-05:00,89.260002,89.339996,89.129997,89.129997,3241408
2025-02-14 13:30:00-05:00,89.135201,89.264999,89.120003,89.230003,1887286
2025-02-14 14:30:00-05:00,89.230003,89.309998,89.152802,89.195,1860117
2025-02-14 15:30:00-05:00,89.195,89.195099,89.129997,89.160004,3086403


### Getting Commodities data (Gold, Oil, etc.)

In [9]:
COMMODITIES = 'GLD USO'

commodities = yf.Tickers(COMMODITIES)

gld = commodities.tickers['GLD'].history(start=HISTORIAL_START_DATE, end=HISTORIAL_END_DATE, interval='1h').drop(columns=['Dividends', 'Stock Splits', 'Capital Gains'])
uso = commodities.tickers['USO'].history(start=HISTORIAL_START_DATE, end=HISTORIAL_END_DATE, interval='1h').drop(columns=['Dividends', 'Stock Splits', 'Capital Gains'])

gld.tail(5)

Unnamed: 0_level_0,Open,High,Low,Close,Volume
Datetime,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2025-02-14 11:30:00-05:00,266.920013,267.320007,266.529999,266.959991,885116
2025-02-14 12:30:00-05:00,266.970001,267.015015,266.040009,266.26001,927519
2025-02-14 13:30:00-05:00,266.290009,266.309998,265.894989,265.98999,660845
2025-02-14 14:30:00-05:00,265.98999,266.0,265.589996,265.931305,1392299
2025-02-14 15:30:00-05:00,265.929993,266.359985,265.450012,266.23999,2726240


### Getting Market Volatility Index data

In [10]:
MARKET_VOLATILITY = '^VIX'

market_volatility = yf.Tickers(MARKET_VOLATILITY)

vix = market_volatility.tickers['^VIX'].history(start=HISTORIAL_START_DATE, end=HISTORIAL_END_DATE, interval='1h').drop(columns=['Dividends', 'Stock Splits'])

vix.tail(5)

Unnamed: 0_level_0,Open,High,Low,Close,Volume
Datetime,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2025-02-17 06:00:00-06:00,15.44,15.47,15.37,15.44,0
2025-02-17 07:00:00-06:00,15.42,15.44,15.37,15.41,0
2025-02-17 08:00:00-06:00,15.41,15.57,15.39,15.52,0
2025-02-17 09:00:00-06:00,15.45,15.57,15.41,15.44,0
2025-02-17 10:00:00-06:00,15.46,15.48,15.37,15.37,0


## We can leverage Scheduled Triggers in MongoDB Atlas to run the script at a specific recurring time.

Scheduled Triggers allow you to execute server-side logic on a regular schedule that you define. You can use scheduled Triggers to do work that happens on a periodic basis, such as updating a document every minute, generating a nightly report, or sending an automated weekly email newsletter.

https://www.mongodb.com/docs/atlas/atlas-ui/triggers/scheduled-triggers/

## We can leverage Atlas Functions as well

An Atlas Function is a piece of server-side JavaScript code that you write to define your app's behavior. You can call your app's Functions directly from a client app or define services that integrate and call Functions automatically.

Functions can call other Functions and include a built-in client for working with data in MongoDB Atlas clusters. They also include helpful global utilities, support common Node.js built-in modules, and can import and use external packages from the npm registry.

https://www.mongodb.com/docs/atlas/atlas-ui/triggers/functions/

Video Tutorial: https://www.youtube.com/watch?v=3Q6J1J1JZ3A

## MongoDB Database Setup

In [11]:
# Import libraries
from pymongo import MongoClient, ASCENDING

In [12]:
def get_mongo_client(mongo_uri):
    """Establish connection to the MongoDB and ping the database."""

    # gateway to interacting with a MongoDB database cluster
    client = MongoClient(mongo_uri, appname="ist.demo.agentic_capital_markets.python", tz_aware=True)

    # Ping the database to ensure the connection is successful
    try:
        client.admin.command("ping")
        print("Connection to MongoDB successful")
    except Exception as e:
        print(f"Error connecting to MongoDB: {e}")
        return None

    return client


if not MONGODB_URI:
    print("MONGO_URI not set in environment variables")

mongo_client = get_mongo_client(MONGODB_URI)

if mongo_client:
    # Pymongo client of database
    db = mongo_client.get_database(DATABASE_NAME)
else:
    print("Failed to connect to MongoDB. Exiting...")
    exit(1)

Connection to MongoDB successful


### Create Timeseries Collection

In [13]:
# Check if the 'yfinanceMarketHistoricalData' collection exists
if "yfinanceMarketHistoricalData" in db.list_collection_names():
    db.yfinanceMarketHistoricalData.drop()  # ⚠️ This will delete all data!

from bson.codec_options import CodecOptions
from bson.datetime_ms import DatetimeConversion

# For more information on datetime conversion options, see:
# https://www.mongodb.com/docs/languages/python/pymongo-driver/current/data-formats/dates-and-times/
codec_options = CodecOptions(datetime_conversion=DatetimeConversion.DATETIME_AUTO)

# Check if the 'yfinanceMarketHistoricalData' collection exists
if "yfinanceMarketHistoricalData" not in db.list_collection_names():
    # Create the collection with a time-series configuration
    db.create_collection(
        "yfinanceMarketHistoricalData",
        timeseries={
            "timeField": "timestamp",
            "metaField": "symbol",
            "granularity": "minutes"
        },
        codec_options=codec_options
        # expireAfterSeconds=604800  # Document expires 7 days (604800 seconds)
    )
    
    # Create an index on the `symbol` and `timestamp` fields
    db.yfinanceMarketHistoricalData.create_index(
        [("symbol", ASCENDING), ("timestamp", ASCENDING)]
    )
    
    print("Collection and index created successfully.")
else:
    print("The 'yfinanceMarketHistoricalData' collection already exists.")

Collection and index created successfully.


## Data Ingestion

In [14]:
# Checking pandas DataFrame info
spy.info()

<class 'pandas.core.frame.DataFrame'>
DatetimeIndex: 1962 entries, 2024-01-02 09:30:00-05:00 to 2025-02-14 15:30:00-05:00
Data columns (total 5 columns):
 #   Column  Non-Null Count  Dtype  
---  ------  --------------  -----  
 0   Open    1962 non-null   float64
 1   High    1962 non-null   float64
 2   Low     1962 non-null   float64
 3   Close   1962 non-null   float64
 4   Volume  1962 non-null   int64  
dtypes: float64(4), int64(1)
memory usage: 92.0 KB


###  Transformations

In [15]:
from datetime import datetime
import pytz


def transform_data(symbol, df):
    """
    Transforms the given DataFrame by performing the following operations:
        - Reset the index (Datetime) and rename it to 'timestamp'.
        - Add a new column 'symbol' with the given value.
        - Convert all column names to lowercase.
        - Reorder columns as 'timestamp', 'symbol', 'open', 'high', 'low', 'close', 'volume'.
        - Convert 'timestamp' to UTC timezone if not already.
    Parameters:
        - symbol (str): The symbol name to be added in the 'symbol' column.
        - df (pd.DataFrame): The input DataFrame with 'Datetime' as the index.
    Returns:
        - pd.DataFrame: Transformed DataFrame.
    """
    # Reset index and explicitly rename 'Datetime' to 'timestamp'
    df = df.reset_index().rename(columns={'Datetime': 'timestamp'})

    # Convert timestamp to datetime if not already
    df['timestamp'] = pd.to_datetime(df['timestamp'])

    # Ensure timestamp is localized to US/Eastern only if it’s naive (has no timezone)
    if df['timestamp'].dt.tz is None:
        eastern = pytz.timezone('US/Eastern')
        df['timestamp'] = df['timestamp'].dt.tz_localize(
            eastern, ambiguous='NaT', nonexistent='shift_forward')

    # Convert 'timestamp' to UTC
    df['timestamp'] = df['timestamp'].dt.tz_convert('UTC')

    # Add the 'symbol' column
    df['symbol'] = symbol

    # Convert all column names to lowercase
    df.columns = df.columns.str.lower()

    # Reorder columns
    df = df[['timestamp', 'symbol', 'open', 'high', 'low', 'close', 'volume']]

    return df

In [16]:
# Equity transformation - SPY as example
df_spy = transform_data('SPY-USD', spy)
df_spy.head()

Unnamed: 0,timestamp,symbol,open,high,low,close,volume
0,2024-01-02 14:30:00+00:00,SPY-USD,472.540009,472.799988,471.359985,471.76001,19608168
1,2024-01-02 15:30:00+00:00,SPY-USD,471.769989,472.450012,471.299988,472.329987,9085308
2,2024-01-02 16:30:00+00:00,SPY-USD,472.320007,473.359985,472.269989,472.975006,9373953
3,2024-01-02 17:30:00+00:00,SPY-USD,472.954987,473.670013,472.279999,472.660492,7394613
4,2024-01-02 18:30:00+00:00,SPY-USD,472.660004,472.959686,471.545013,471.880096,9946269


In [17]:
df_spy.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1962 entries, 0 to 1961
Data columns (total 7 columns):
 #   Column     Non-Null Count  Dtype              
---  ------     --------------  -----              
 0   timestamp  1962 non-null   datetime64[ns, UTC]
 1   symbol     1962 non-null   object             
 2   open       1962 non-null   float64            
 3   high       1962 non-null   float64            
 4   low        1962 non-null   float64            
 5   close      1962 non-null   float64            
 6   volume     1962 non-null   int64              
dtypes: datetime64[ns, UTC](1), float64(4), int64(1), object(1)
memory usage: 107.4+ KB


In [18]:
# Equity transformations
df_qqq = transform_data('QQQ-USD', qqq)
df_eem = transform_data('EEM-USD', eem)
df_xle = transform_data('XLE-USD', xle)

# Bonds transformations
df_tlt = transform_data('TLT-USD', tlt)
df_lqd = transform_data('LQD-USD', lqd)
df_hyg = transform_data('HYG-USD', hyg)

# Commodities transformations
df_gld = transform_data('GLD-USD', gld)
df_uso = transform_data('USO-USD', uso)

# Market Volatility transformations
df_vix = transform_data('VIX-USD', vix)

In [19]:
df_uso.sort_index(ascending=False).head()

Unnamed: 0,timestamp,symbol,open,high,low,close,volume
1961,2025-02-14 20:30:00+00:00,USO-USD,75.75,75.860001,75.709999,75.830002,279120
1960,2025-02-14 19:30:00+00:00,USO-USD,75.7948,75.875,75.709999,75.75,315061
1959,2025-02-14 18:30:00+00:00,USO-USD,75.860001,76.029999,75.629997,75.800003,342881
1958,2025-02-14 17:30:00+00:00,USO-USD,75.884903,76.010002,75.784401,75.845001,284468
1957,2025-02-14 16:30:00+00:00,USO-USD,75.860001,76.099998,75.809998,75.959999,277052


### Clean Data

In [20]:
# Ensure that the collection is empty by deleting all documents
db.yfinanceMarketHistoricalData.delete_many({})

print("The 'yfinanceMarketHistoricalData' collection is now empty.")

The 'yfinanceMarketHistoricalData' collection is now empty.


### Insert data into MongoDB time series collection

In [21]:
# df_spy sort by timestamp in ascending order
df_spy = df_spy.sort_index(ascending=True)
df_spy.tail(20)

Unnamed: 0,timestamp,symbol,open,high,low,close,volume
1942,2025-02-12 15:30:00+00:00,SPY-USD,601.44397,603.320007,600.169983,600.659912,3948328
1943,2025-02-12 16:30:00+00:00,SPY-USD,600.630005,603.98999,600.599976,603.789978,4270148
1944,2025-02-12 17:30:00+00:00,SPY-USD,603.780029,604.549988,603.090027,603.570007,2558860
1945,2025-02-12 18:30:00+00:00,SPY-USD,603.589905,603.759888,602.530029,603.080017,2782930
1946,2025-02-12 19:30:00+00:00,SPY-USD,603.113525,604.22998,603.059998,603.320007,4147162
1947,2025-02-12 20:30:00+00:00,SPY-USD,603.325012,603.590027,602.570007,603.359985,4583717
1948,2025-02-13 14:30:00+00:00,SPY-USD,604.47998,606.419922,603.200012,606.080017,7644381
1949,2025-02-13 15:30:00+00:00,SPY-USD,606.0849,607.48999,606.0,606.690002,3563509
1950,2025-02-13 16:30:00+00:00,SPY-USD,606.700012,606.960022,605.719971,606.539978,2320305
1951,2025-02-13 17:30:00+00:00,SPY-USD,606.539978,606.969971,604.98999,606.200012,2449362


In [22]:
# df_spy sort by timestamp in descending order
df_spy = df_spy.sort_index(ascending=False)
df_spy.tail(20)

Unnamed: 0,timestamp,symbol,open,high,low,close,volume
19,2024-01-04 19:30:00+00:00,SPY-USD,468.570007,469.040009,468.070007,468.269012,11836431
18,2024-01-04 18:30:00+00:00,SPY-USD,469.329987,469.470001,468.509613,468.579895,7022819
17,2024-01-04 17:30:00+00:00,SPY-USD,469.608795,469.619995,468.665009,469.320099,4754153
16,2024-01-04 16:30:00+00:00,SPY-USD,470.660004,470.820007,469.26001,469.609985,6137719
15,2024-01-04 15:30:00+00:00,SPY-USD,469.76001,470.959991,469.619995,470.649994,5975815
14,2024-01-04 14:30:00+00:00,SPY-USD,468.299988,470.149994,468.109985,469.765015,12633397
13,2024-01-03 20:30:00+00:00,SPY-USD,469.274994,469.795013,468.170013,468.730011,18962548
12,2024-01-03 19:30:00+00:00,SPY-USD,470.950012,471.179993,468.970001,469.26001,12079010
11,2024-01-03 18:30:00+00:00,SPY-USD,470.339996,471.190002,468.950012,470.940002,11787244
10,2024-01-03 17:30:00+00:00,SPY-USD,469.579987,470.769989,469.420502,470.329987,6375672


In [23]:
# Pandas DataFrame to dictionary, example with SPY
spy_records = df_spy.to_dict(orient="records")
spy_records

[{'timestamp': Timestamp('2025-02-14 20:30:00+0000', tz='UTC'),
  'symbol': 'SPY-USD',
  'open': 609.6599731445312,
  'high': 610.2899780273438,
  'low': 609.5449829101562,
  'close': 609.7000122070312,
  'volume': 5528441},
 {'timestamp': Timestamp('2025-02-14 19:30:00+0000', tz='UTC'),
  'symbol': 'SPY-USD',
  'open': 610.1500244140625,
  'high': 610.3024291992188,
  'low': 609.5900268554688,
  'close': 609.6699829101562,
  'volume': 2430485},
 {'timestamp': Timestamp('2025-02-14 18:30:00+0000', tz='UTC'),
  'symbol': 'SPY-USD',
  'open': 609.9299926757812,
  'high': 610.7506713867188,
  'low': 609.8800048828125,
  'close': 610.1300048828125,
  'volume': 2059667},
 {'timestamp': Timestamp('2025-02-14 17:30:00+0000', tz='UTC'),
  'symbol': 'SPY-USD',
  'open': 609.3499755859375,
  'high': 610.0900268554688,
  'low': 609.0700073242188,
  'close': 609.9298706054688,
  'volume': 1443250},
 {'timestamp': Timestamp('2025-02-14 16:30:00+0000', tz='UTC'),
  'symbol': 'SPY-USD',
  'open': 610

In [24]:
def insert_market_data(db, df):
    """
    Inserts the transformed DataFrame into the MongoDB time-series collection `yfinanceMarketHistoricalData`.
    
    Parameters:
        - db (pymongo.database.Database): The MongoDB database object.
        - df (pd.DataFrame): The transformed DataFrame containing market data.
    
    Returns:
    - dict: MongoDB insert result.
    """
    # Confirm timestamp is aware and in UTC
    if df['timestamp'].dt.tz is None:
        # If timestamp is naive, localize to UTC
        df['timestamp'] = pd.to_datetime(df['timestamp']).dt.tz_localize('UTC')
    else:
        # If timestamp is aware, convert to UTC
        df['timestamp'] = df['timestamp'].dt.tz_convert('UTC')
    
    # Convert timestamp to Python datetime object (ensuring stored as ISODate)
    df['timestamp'] = df['timestamp'].apply(lambda x: x.to_pydatetime())
    
    # Convert DataFrame to list of dictionaries
    records = df.to_dict(orient="records")
    
    # Insert data into the collection
    result = db.yfinanceMarketHistoricalData.insert_many(records)
    
    return {"inserted_count": len(result.inserted_ids)}

In [25]:
print("Inserting SPY market data...")
inserted_count = insert_market_data(db=db, df=df_spy)
print(f"Inserted {inserted_count} documents.")

print("Inserting QQQ market data...")
inserted_count = insert_market_data(db=db, df=df_qqq)
print(f"Inserted {inserted_count} documents.")

print("Inserting EEM market data...")
inserted_count = insert_market_data(db=db, df=df_eem)
print(f"Inserted {inserted_count} documents.")

print("Inserting XLE market data...")
inserted_count = insert_market_data(db=db, df=df_xle)
print(f"Inserted {inserted_count} documents.")

print("Inserting TLT market data...")
inserted_count = insert_market_data(db=db, df=df_tlt)
print(f"Inserted {inserted_count} documents.")

print("Inserting LQD market data...")
inserted_count = insert_market_data(db=db, df=df_lqd)
print(f"Inserted {inserted_count} documents.")

print("Inserting HYG market data...")
inserted_count = insert_market_data(db=db, df=df_hyg)
print(f"Inserted {inserted_count} documents.")

print("Inserting GLD market data...")
inserted_count = insert_market_data(db=db, df=df_gld)
print(f"Inserted {inserted_count} documents.")

print("Inserting USO market data...")
inserted_count = insert_market_data(db=db, df=df_uso)
print(f"Inserted {inserted_count} documents.")

print("Inserting VIX market data...")
inserted_count = insert_market_data(db=db, df=df_vix)
print(f"Inserted {inserted_count} documents.")

Inserting SPY market data...
Inserted {'inserted_count': 1962} documents.
Inserting QQQ market data...
Inserted {'inserted_count': 1962} documents.
Inserting EEM market data...
Inserted {'inserted_count': 1962} documents.
Inserting XLE market data...
Inserted {'inserted_count': 1962} documents.
Inserting TLT market data...
Inserted {'inserted_count': 1962} documents.
Inserting LQD market data...
Inserted {'inserted_count': 1962} documents.
Inserting HYG market data...
Inserted {'inserted_count': 1962} documents.
Inserting GLD market data...
Inserted {'inserted_count': 1962} documents.
Inserting USO market data...
Inserted {'inserted_count': 1962} documents.
Inserting VIX market data...
Inserted {'inserted_count': 4021} documents.


In [26]:
from pprint import pprint

# Query and sort data by timestamp in descending order
cursor = db.yfinanceMarketHistoricalData.find({"symbol": "SPY-USD"}).sort("timestamp", -1)

# Pretty-print the results
for document in cursor:
    pprint(document)

{'_id': ObjectId('67bc8238cf828636c3756558'),
 'close': 609.7000122070312,
 'high': 610.2899780273438,
 'low': 609.5449829101562,
 'open': 609.6599731445312,
 'symbol': 'SPY-USD',
 'timestamp': datetime.datetime(2025, 2, 14, 20, 30, tzinfo=FixedOffset(datetime.timedelta(0), 'UTC')),
 'volume': 5528441}
{'_id': ObjectId('67bc8238cf828636c3756559'),
 'close': 609.6699829101562,
 'high': 610.3024291992188,
 'low': 609.5900268554688,
 'open': 610.1500244140625,
 'symbol': 'SPY-USD',
 'timestamp': datetime.datetime(2025, 2, 14, 19, 30, tzinfo=FixedOffset(datetime.timedelta(0), 'UTC')),
 'volume': 2430485}
{'_id': ObjectId('67bc8238cf828636c375655a'),
 'close': 610.1300048828125,
 'high': 610.7506713867188,
 'low': 609.8800048828125,
 'open': 609.9299926757812,
 'symbol': 'SPY-USD',
 'timestamp': datetime.datetime(2025, 2, 14, 18, 30, tzinfo=FixedOffset(datetime.timedelta(0), 'UTC')),
 'volume': 2059667}
{'_id': ObjectId('67bc8238cf828636c375655b'),
 'close': 609.9298706054688,
 'high': 610.

### Data Aggregation

In [27]:
# Aggregate the data by hour
# Hourly aggregation to get high, low, open, and close values
hourly_aggregation = db.yfinanceMarketHistoricalData.aggregate([
    {
        "$match": { "symbol": "SPY-USD" }
    },
    {
        "$group": {
            "_id": {
                "symbol": "$symbol",
                "time": {
                    "$dateTrunc": {
                        "date": "$timestamp",
                        "unit": "hour",
                        "binSize": 1
                    }
                }
            },
            "high": { "$max": "$high" },
            "low": { "$min": "$low" },
            "open": { "$first": "$open" },
            "close": { "$last": "$close" }
        }
    },
    {
        "$sort": { "_id.time": 1 }
    }
])

# Store the aggregation result in a list
hourly_results = list(hourly_aggregation)

# Pretty-print the results
for document in hourly_results:
    pprint(document)

# Count the number of documents in the results
print(f"Total documents: {len(hourly_results)}")


{'_id': {'symbol': 'SPY-USD',
         'time': datetime.datetime(2024, 1, 2, 14, 0, tzinfo=FixedOffset(datetime.timedelta(0), 'UTC'))},
 'close': 471.760009765625,
 'high': 472.79998779296875,
 'low': 471.3599853515625,
 'open': 472.5400085449219}
{'_id': {'symbol': 'SPY-USD',
         'time': datetime.datetime(2024, 1, 2, 15, 0, tzinfo=FixedOffset(datetime.timedelta(0), 'UTC'))},
 'close': 472.3299865722656,
 'high': 472.45001220703125,
 'low': 471.29998779296875,
 'open': 471.7699890136719}
{'_id': {'symbol': 'SPY-USD',
         'time': datetime.datetime(2024, 1, 2, 16, 0, tzinfo=FixedOffset(datetime.timedelta(0), 'UTC'))},
 'close': 472.9750061035156,
 'high': 473.3599853515625,
 'low': 472.2699890136719,
 'open': 472.32000732421875}
{'_id': {'symbol': 'SPY-USD',
         'time': datetime.datetime(2024, 1, 2, 17, 0, tzinfo=FixedOffset(datetime.timedelta(0), 'UTC'))},
 'close': 472.6604919433594,
 'high': 473.6700134277344,
 'low': 472.2799987792969,
 'open': 472.9549865722656}
{'_id