# Create Datasets (Optimized)

Create an HDF5 table from SHARADAR data that mirrors the WIKI_PRICES.csv format.
This script combines data from SHARADAR_SEP.csv (price data) and SHARADAR_ACTIONS.csv
(dividend and split information) to create a dataset compatible with the format used
in the ML4T examples.

This version is optimized for memory efficiency to prevent kernel crashes.

In [None]:
import warnings
warnings.filterwarnings('ignore')

import pandas as pd
import numpy as np
from pathlib import Path
import logging
import gc  # For garbage collection

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Define paths
DATA_DIR = Path('/home/noslen/alpaca-trading/data')
SHARADAR_DIR = DATA_DIR / 'SHARADAR'
OUTPUT_FILE = DATA_DIR / 'assets.h5'
WIKI_PRICES_PATH = DATA_DIR / 'WIKI_PRICES.csv'

### Load SHARADAR_SEP.csv price data (with chunking)

In [2]:
# Check the total number of rows first
sep_path = SHARADAR_DIR / 'SHARADAR_SEP.csv'
row_count = sum(1 for _ in open(sep_path)) - 1  # Subtract 1 for header
print(f"Total rows in {sep_path}: {row_count}")

# Define a reasonable chunk size (adjust based on your system's memory)
CHUNK_SIZE = 500000  # Process 500,000 rows at a time

# Preview the first few rows
preview_df = pd.read_csv(sep_path, nrows=5, parse_dates=['date'])
print("Preview of data:")
print(preview_df)

Total rows in /home/noslen/alpaca-trading/data/SHARADAR/SHARADAR_SEP.csv: 17300827
Preview of data:
   ticker       date  open  high   low  close   volume  closeadj  closeunadj  \
0   ABILF 2021-11-09  0.30  0.33  0.30   0.33   7500.0      0.33        0.33   
1   ABILF 2021-11-08  0.35  0.35  0.35   0.35      0.0      0.35        0.35   
2     AAC 2021-09-24  9.74  9.75  9.73   9.75  38502.0      9.75        9.75   
3   AAC.U 2021-09-24  9.95  9.95  9.90   9.90   2692.0      9.90        9.90   
4  AAC.WS 2021-09-24  0.92  0.92  0.87   0.89  38784.0      0.89        0.89   

  lastupdated  
0  2021-11-09  
1  2021-11-09  
2  2021-09-24  
3  2021-09-24  
4  2021-09-24  


### Load SHARADAR_ACTIONS.csv for dividend and split information

In [3]:
actions_path = SHARADAR_DIR / 'SHARADAR_ACTIONS.csv'
    
# Actions data should be much smaller, so we can load it all at once
actions_df = pd.read_csv(
    actions_path,
    parse_dates=['date'],
    index_col=None
)

print(actions_df.head())
print(f"loaded {len(actions_df)} rows from {actions_path}")

        date         action ticker                               name  \
0 2015-01-02       delisted   XWES         WORLD ENERGY SOLUTIONS INC   
1 2015-01-02  acquisitionby   XWES         WORLD ENERGY SOLUTIONS INC   
2 2015-01-02       dividend    WSR                    WHITESTONE REIT   
3 2015-01-02       dividend   WSCI                 WSI INDUSTRIES INC   
4 2015-01-02          split  WMLPQ  WESTMORELAND RESOURCE PARTNERS LP   

      value contraticker   contraname  
0  69.40000          NaN          NaN  
1  69.40000         ENOC  ENERNOC INC  
2   0.09500          NaN          NaN  
3   0.04000          NaN          NaN  
4   0.08333          NaN          NaN  
loaded 323840 rows from /home/noslen/alpaca-trading/data/SHARADAR/SHARADAR_ACTIONS.csv


### Extract dividend information from actions dataframe

In [4]:
# Filter for dividend actions
dividend_df = actions_df[actions_df['action'] == 'dividend'].copy()

# Create a dataframe with ticker, date, and dividend amount
dividend_df = dividend_df[['date', 'ticker', 'value']].rename(columns={'value': 'ex-dividend'})
# Sum up multiple dividends on the same day for the same ticker
dividend_df = dividend_df.groupby(['date', 'ticker']).sum().reset_index()

print(f"Extracted {len(dividend_df)} dividend records")
print(dividend_df.head())

Extracted 270003 dividend records
        date ticker  ex-dividend
0 2015-01-02      A      0.10000
1 2015-01-02   ABEV      0.04893
2 2015-01-02  ARPJQ      0.19660
3 2015-01-02    ATW      0.25000
4 2015-01-02    BDN      0.15000


### Extract split information from actions dataframe

In [5]:
# Filter for split actions
splits = actions_df[actions_df['action'] == 'split'].copy()
    
# Create a dataframe with ticker, date, and split ratio
split_df = splits[['date', 'ticker', 'value']].rename(columns={'value': 'split_ratio'})

print(f"Extracted {len(split_df)} split records")
print(split_df.head())

# Free up memory
del actions_df, splits
gc.collect()

Extracted 4088 split records
         date ticker  split_ratio
4  2015-01-02  WMLPQ      0.08333
12 2015-01-02   RFMD      0.25000
22 2015-01-02   GNTX      2.00000
59 2015-01-05  PSTRQ      0.10000
96 2015-01-07   UBFO      1.01000


0

## Transform SHARADAR data into WIKI_PRICES.csv format (processing in chunks)

WIKI_PRICES.csv columns:
ticker,date,open,high,low,close,volume,ex-dividend,split_ratio,adj_open,adj_high,adj_low,adj_close,adj_volume

SHARADAR_SEP.csv columns:
ticker,date,open,high,low,close,volume,closeadj,closeunadj,lastupdated

In [None]:
# Process the data in chunks to avoid memory issues
chunk_reader = pd.read_csv(
    sep_path,
    parse_dates=['date'],
    chunksize=CHUNK_SIZE
)

# Create an empty HDF5 store
with pd.HDFStore(OUTPUT_FILE, mode='w') as store:
    logger.info(f"Created HDF5 store at {OUTPUT_FILE}")

min_itemsize = {'ticker': max_ticker_len}
logger.info(f"Setting min_itemsize to {min_itemsize}")

# Process each chunk
chunk_count = 0
total_rows_processed = 0

for chunk in chunk_reader:
    chunk_count += 1
    chunk_size = len(chunk)
    total_rows_processed += chunk_size
    
    logger.info(f"Processing chunk {chunk_count} with {chunk_size} rows ({total_rows_processed} total rows processed)")
    
    # Start with the price data from this chunk
    wiki_chunk = chunk[['ticker', 'date', 'open', 'high', 'low', 'close', 'volume']].copy()
    
    # Add ex-dividend column (default to 0.0)
    wiki_chunk['ex-dividend'] = 0.0
    
    # Add split_ratio column (default to 1.0)
    wiki_chunk['split_ratio'] = 1.0

    # Update with actual dividend information
    if not dividend_df.empty:
        # Merge dividend information
        wiki_chunk = pd.merge(
            wiki_chunk, 
            dividend_df, 
            on=['ticker', 'date'], 
            how='left'
        )
        # Fill missing values with 0.0 and handle duplicates
        wiki_chunk['ex-dividend'] = wiki_chunk['ex-dividend_y'].fillna(wiki_chunk['ex-dividend_x'])
        wiki_chunk.drop(['ex-dividend_x', 'ex-dividend_y'], axis=1, inplace=True)

    # Update with actual split information
    if not split_df.empty:
        # Merge split information
        wiki_chunk = pd.merge(
            wiki_chunk, 
            split_df, 
            on=['ticker', 'date'], 
            how='left'
        )
        # Fill missing values with 1.0 and handle duplicates
        wiki_chunk['split_ratio'] = wiki_chunk['split_ratio_y'].fillna(wiki_chunk['split_ratio_x'])
        wiki_chunk.drop(['split_ratio_x', 'split_ratio_y'], axis=1, inplace=True)

    # Calculate adjusted values using closeadj/close ratio from SHARADAR
    # In SHARADAR, closeadj is already adjusted for both splits and dividends
    adj_ratio = chunk['closeadj'] / chunk['close']

    wiki_chunk['adj_open'] = chunk['open'] * adj_ratio
    wiki_chunk['adj_high'] = chunk['high'] * adj_ratio
    wiki_chunk['adj_low'] = chunk['low'] * adj_ratio
    wiki_chunk['adj_close'] = chunk['closeadj']
    wiki_chunk['adj_volume'] = chunk['volume']  # Volume typically doesn't need adjustment in this context

    # Set index to date and ticker for consistency with WIKI_PRICES format
    wiki_chunk = wiki_chunk.set_index(['date', 'ticker']).sort_index()
    
    # Append to HDF5 store
    with pd.HDFStore(OUTPUT_FILE, mode='a') as store:
        if chunk_count == 1:
            # For the first chunk, create the dataset
            store.put('sharadar/prices', wiki_chunk, format='table')
        else:
            # For subsequent chunks, append to the dataset
            store.append('sharadar/prices', wiki_chunk, format='table')
    
    # Free memory
    del wiki_chunk, chunk
    gc.collect()
    
    logger.info(f"Chunk {chunk_count} processed and saved")

logger.info(f"All {total_rows_processed} rows processed and saved to {OUTPUT_FILE}")

2025-05-31 15:20:23,748 - INFO - Created HDF5 store at /home/noslen/alpaca-trading/data/assets.h5
2025-05-31 15:20:24,299 - INFO - Processing chunk 1 with 500000 rows (500000 total rows processed)
2025-05-31 15:20:25,856 - INFO - Chunk 1 processed and saved
2025-05-31 15:20:26,395 - INFO - Processing chunk 2 with 500000 rows (1000000 total rows processed)
2025-05-31 15:20:27,945 - INFO - Chunk 2 processed and saved
2025-05-31 15:20:28,483 - INFO - Processing chunk 3 with 500000 rows (1500000 total rows processed)
2025-05-31 15:20:30,196 - INFO - Chunk 3 processed and saved
2025-05-31 15:20:30,727 - INFO - Processing chunk 4 with 500000 rows (2000000 total rows processed)
2025-05-31 15:20:32,212 - INFO - Chunk 4 processed and saved
2025-05-31 15:20:32,738 - INFO - Processing chunk 5 with 500000 rows (2500000 total rows processed)
2025-05-31 15:20:34,507 - INFO - Chunk 5 processed and saved
2025-05-31 15:20:35,027 - INFO - Processing chunk 6 with 500000 rows (3000000 total rows processed

ValueError: Trying to store a string with len [9] in [ticker] column but
this column has a limit of [8]!
Consider using min_itemsize to preset the sizes on these columns

In [3]:
# Verify the results
with pd.HDFStore(OUTPUT_FILE, mode='r') as store:
    # Check what's in the store
    logger.info(f"HDF5 store contents: {store.keys()}")
    
    # Get info about the dataset
    prices = store.select('sharadar/prices', start=0, stop=5)
    logger.info(f"Sample of data (first 5 rows):\n{prices}")
    
    # Get total size
    total_size = store.get_storer('sharadar/prices').nrows
    logger.info(f"Total rows in dataset: {total_size}")

2025-06-01 21:38:48,615 - INFO - HDF5 store contents: ['/sharadar/prices', '/quandl/wiki/prices']
2025-06-01 21:38:48,657 - INFO - Sample of data (first 5 rows):
                    open    high    low  close     volume  ex-dividend  \
date       ticker                                                        
2015-01-02 A       41.18  41.310  40.37  40.56  1530798.0          0.1   
           AAUAF    0.95   0.980   0.94   0.95   182200.0          0.0   
           ABCB    25.83  25.855  24.92  25.27    73280.0          0.0   
           BMO     70.65  70.870  69.72  70.17   554563.0          0.0   
           CA1     30.70  30.890  30.28  30.69  3475466.0          0.0   

                   split_ratio   adj_open   adj_high    adj_low  adj_close  \
date       ticker                                                            
2015-01-02 A               1.0  37.854938  37.974442  37.110341     37.285   
           AAUAF           1.0   0.950000   0.980000   0.940000      0.950   
       

In [None]:

# Set up logging if not already configured
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# First, get all AAPL data, then filter for adj_close == 0.0
with pd.HDFStore(OUTPUT_FILE, mode='r') as store:
    # Step 1: Query just for AAPL data
    aapl_data = store.select('sharadar/prices', where='ticker == "AAPL"')
    logger.info(f"Retrieved {len(aapl_data)} rows of AAPL data")
    
    # Step 2: Filter in memory for adj_close == 0.0
    aapl_zero_high = aapl_data[aapl_data['adj_high'] == 0.0]
    aapl_zero_low = aapl_data[aapl_data['adj_low'] == 0.0]
    aapl_zero_open = aapl_data[aapl_data['adj_open'] == 0.0]
    aapl_zero_volume = aapl_data[aapl_data['adj_volume'] == 0.0]

    
    # Display information about the result
    logger.info(f"Found {len(aapl_zero_high)} AAPL records with adj_high = 0.0")
    logger.info(f"Found {len(aapl_zero_low)} AAPL records with adj_low = 0.0")
    logger.info(f"Found {len(aapl_zero_open)} AAPL records with adj_open = 0.0")
    logger.info(f"Found {len(aapl_zero_volume)} AAPL records with adj_volume = 0.0")
    
    # Show the data if any records were found
    if len(aapl_zero_high) > 0:
        logger.info(f"Sample of AAPL data with adj_high = 0.0:\n{aapl_zero_high.head()}")
    else:
        logger.info("No AAPL records found with adj_close = 0.0")

2025-06-01 21:48:00,933 - INFO - Retrieved 2604 rows of AAPL data
2025-06-01 21:48:00,935 - INFO - Found 0 AAPL records with adj_close = 0.0
2025-06-01 21:48:00,936 - INFO - No AAPL records found with adj_close = 0.0


In [9]:
#!/usr/bin/env python3
"""
Create an HDF5 table from SHARADAR data that mirrors the WIKI_PRICES.csv format.
This script combines data from SHARADAR_SEP.csv (price data) and SHARADAR_ACTIONS.csv
(dividend and split information) to create a dataset compatible with the format used
in the ML4T examples.

This version is optimized for memory efficiency to prevent kernel crashes and fixes
the string length issue with ticker symbols.
"""

import warnings
warnings.filterwarnings('ignore')

import pandas as pd
import numpy as np
from pathlib import Path
import logging
import gc  # For garbage collection

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Define paths
DATA_DIR = Path('/home/noslen/alpaca-trading/data')
SHARADAR_DIR = DATA_DIR / 'SHARADAR'
OUTPUT_FILE = DATA_DIR / 'assets.h5'

# Define a reasonable chunk size (adjust based on your system's memory)
CHUNK_SIZE = 500000  # Process 500,000 rows at a time

def main():
    # Check the total number of rows first
    sep_path = SHARADAR_DIR / 'SHARADAR_SEP.csv'
    row_count = sum(1 for _ in open(sep_path)) - 1  # Subtract 1 for header
    logger.info(f"Total rows in {sep_path}: {row_count}")

    # Preview the first few rows
    preview_df = pd.read_csv(sep_path, nrows=5, parse_dates=['date'])
    logger.info("Preview of data:")
    logger.info(preview_df)

    # Load actions data (should be much smaller, so we can load it all at once)
    actions_path = SHARADAR_DIR / 'SHARADAR_ACTIONS.csv'
    actions_df = pd.read_csv(
        actions_path,
        parse_dates=['date'],
        index_col=None
    )
    logger.info(f"Loaded {len(actions_df)} rows from {actions_path}")

    # Extract dividend information
    dividend_df = actions_df[actions_df['action'] == 'dividend'].copy()
    dividend_df = dividend_df[['date', 'ticker', 'value']].rename(columns={'value': 'ex-dividend'})
    dividend_df = dividend_df.groupby(['date', 'ticker']).sum().reset_index()
    logger.info(f"Extracted {len(dividend_df)} dividend records")

    # Extract split information
    splits = actions_df[actions_df['action'] == 'split'].copy()
    split_df = splits[['date', 'ticker', 'value']].rename(columns={'value': 'split_ratio'})
    logger.info(f"Extracted {len(split_df)} split records")

    # Free up memory
    del actions_df, splits
    gc.collect()

    # Process the data in chunks
    chunk_reader = pd.read_csv(
        sep_path,
        parse_dates=['date'],
        chunksize=CHUNK_SIZE
    )

    # Find the maximum ticker length to set min_itemsize properly
    max_ticker_len = 0
    for chunk in pd.read_csv(sep_path, usecols=['ticker'], chunksize=CHUNK_SIZE):
        current_max = chunk['ticker'].str.len().max()
        if current_max > max_ticker_len:
            max_ticker_len = current_max
    
    logger.info(f"Maximum ticker symbol length: {max_ticker_len}")
    
    # Add some buffer to be safe
    min_itemsize = {'ticker': max_ticker_len + 2}
    logger.info(f"Setting min_itemsize to {min_itemsize}")

    # Create an empty HDF5 store
    with pd.HDFStore(OUTPUT_FILE, mode='w') as store:
        logger.info(f"Created HDF5 store at {OUTPUT_FILE}")

    # Process each chunk
    chunk_count = 0
    total_rows_processed = 0

    # Reset the chunk reader
    chunk_reader = pd.read_csv(
        sep_path,
        parse_dates=['date'],
        chunksize=CHUNK_SIZE
    )

    for chunk in chunk_reader:
        chunk_count += 1
        chunk_size = len(chunk)
        total_rows_processed += chunk_size
        
        logger.info(f"Processing chunk {chunk_count} with {chunk_size} rows ({total_rows_processed} total rows processed)")
        
        # Start with the price data from this chunk
        wiki_chunk = chunk[['ticker', 'date', 'open', 'high', 'low', 'close', 'volume']].copy()
        
        # Add ex-dividend column (default to 0.0)
        wiki_chunk['ex-dividend'] = 0.0
        
        # Add split_ratio column (default to 1.0)
        wiki_chunk['split_ratio'] = 1.0

        # Update with actual dividend information
        if not dividend_df.empty:
            # Merge dividend information
            wiki_chunk = pd.merge(
                wiki_chunk, 
                dividend_df, 
                on=['ticker', 'date'], 
                how='left'
            )
            # Fill missing values with 0.0 and handle duplicates
            wiki_chunk['ex-dividend'] = wiki_chunk['ex-dividend_y'].fillna(wiki_chunk['ex-dividend_x'])
            wiki_chunk.drop(['ex-dividend_x', 'ex-dividend_y'], axis=1, inplace=True)

        # Update with actual split information
        if not split_df.empty:
            # Merge split information
            wiki_chunk = pd.merge(
                wiki_chunk, 
                split_df, 
                on=['ticker', 'date'], 
                how='left'
            )
            # Fill missing values with 1.0 and handle duplicates
            wiki_chunk['split_ratio'] = wiki_chunk['split_ratio_y'].fillna(wiki_chunk['split_ratio_x'])
            wiki_chunk.drop(['split_ratio_x', 'split_ratio_y'], axis=1, inplace=True)

        # Calculate adjusted values using closeadj/close ratio from SHARADAR
        # In SHARADAR, closeadj is already adjusted for both splits and dividends
        adj_ratio = chunk['closeadj'] / chunk['close']

        wiki_chunk['adj_open'] = chunk['open'] * adj_ratio
        wiki_chunk['adj_high'] = chunk['high'] * adj_ratio
        wiki_chunk['adj_low'] = chunk['low'] * adj_ratio
        wiki_chunk['adj_close'] = chunk['closeadj']
        wiki_chunk['adj_volume'] = chunk['volume']  # Volume typically doesn't need adjustment in this context

        # Set index to date and ticker for consistency with WIKI_PRICES format
        wiki_chunk = wiki_chunk.set_index(['date', 'ticker']).sort_index()
        
        # Append to HDF5 store with min_itemsize parameter to handle longer ticker symbols
        with pd.HDFStore(OUTPUT_FILE, mode='a') as store:
            if chunk_count == 1:
                # For the first chunk, create the dataset with min_itemsize
                store.put('sharadar/prices', wiki_chunk, format='table', min_itemsize=min_itemsize)
            else:
                # For subsequent chunks, append to the dataset
                store.append('sharadar/prices', wiki_chunk, format='table', min_itemsize=min_itemsize)
        
        # Free memory
        del wiki_chunk, chunk
        gc.collect()
        
        logger.info(f"Chunk {chunk_count} processed and saved")

    logger.info(f"All {total_rows_processed} rows processed and saved to {OUTPUT_FILE}")

    # Verify the results
    with pd.HDFStore(OUTPUT_FILE, mode='r') as store:
        # Check what's in the store
        logger.info(f"HDF5 store contents: {store.keys()}")
        
        # Get info about the dataset
        prices = store.select('sharadar/prices', start=0, stop=5)
        logger.info(f"Sample of data (first 5 rows):\n{prices}")
        
        # Get total size
        total_size = store.get_storer('sharadar/prices').nrows
        logger.info(f"Total rows in dataset: {total_size}")

if __name__ == "__main__":
    main()


2025-05-31 15:25:10,385 - INFO - Total rows in /home/noslen/alpaca-trading/data/SHARADAR/SHARADAR_SEP.csv: 17300827
2025-05-31 15:25:10,389 - INFO - Preview of data:
2025-05-31 15:25:10,390 - INFO -    ticker       date  open  high   low  close   volume  closeadj  closeunadj  \
0   ABILF 2021-11-09  0.30  0.33  0.30   0.33   7500.0      0.33        0.33   
1   ABILF 2021-11-08  0.35  0.35  0.35   0.35      0.0      0.35        0.35   
2     AAC 2021-09-24  9.74  9.75  9.73   9.75  38502.0      9.75        9.75   
3   AAC.U 2021-09-24  9.95  9.95  9.90   9.90   2692.0      9.90        9.90   
4  AAC.WS 2021-09-24  0.92  0.92  0.87   0.89  38784.0      0.89        0.89   

  lastupdated  
0  2021-11-09  
1  2021-11-09  
2  2021-09-24  
3  2021-09-24  
4  2021-09-24  
2025-05-31 15:25:10,737 - INFO - Loaded 323840 rows from /home/noslen/alpaca-trading/data/SHARADAR/SHARADAR_ACTIONS.csv
2025-05-31 15:25:10,864 - INFO - Extracted 270003 dividend records
2025-05-31 15:25:10,886 - INFO - Extr

In [15]:

df = (pd.read_csv(WIKI_PRICES_PATH,
                 parse_dates=['date'],
                 index_col=['date', 'ticker'],
                 infer_datetime_format=True)
     .sort_index())

print(df.info())
with pd.HDFStore(OUTPUT_FILE) as store:
    store.put('quandl/wiki/prices', df)

<class 'pandas.core.frame.DataFrame'>
MultiIndex: 15389314 entries, (Timestamp('1962-01-02 00:00:00'), 'ARNC') to (Timestamp('2018-03-27 00:00:00'), 'ZUMZ')
Data columns (total 12 columns):
 #   Column       Dtype  
---  ------       -----  
 0   open         float64
 1   high         float64
 2   low          float64
 3   close        float64
 4   volume       float64
 5   ex-dividend  float64
 6   split_ratio  float64
 7   adj_open     float64
 8   adj_high     float64
 9   adj_low      float64
 10  adj_close    float64
 11  adj_volume   float64
dtypes: float64(12)
memory usage: 1.4+ GB
None
