# Testing the Python Script

This script was run on modal to generate the BERT embeddings of the transcripts in the dataset. We used modal to access GPU compute. On a L4 8-core GPU with 16GB memory, the runtime was about 5mins.

Upload the prepared input csv file (which is the dataframe generated by prepare_earnings_data()) to the modal notebook root directory. 

In [1]:
import pandas as pd
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler

raw_data = pd.read_csv("input_data_modal.csv")
CACHE_DIR = "cache/"

In [2]:
def test_train_split_by_date(df, date_col="adjusted_date", train_frac=0.75, val_frac=0.12):
    """
    Splits the DataFrame into train, validation, and test sets based on unique dates.
    
    Parameters:
    - df: pandas DataFrame containing the data.
    - date_col: str, name of the column containing date information.
    - train_frac: float, fraction of data to be used for training.
    - val_frac: float, fraction of data to be used for validation.
    
    Returns:
    - train_df: DataFrame for training set.
    - val_df: DataFrame for validation set.
    - test_df: DataFrame for test set.
    """
    # Ensure the DataFrame is sorted by date
    df = df.sort_values(date_col).reset_index(drop=True)
    
    # Get unique sorted dates
    dates = np.array(sorted(df[date_col].unique()))
    n_dates = len(dates)
    
    # Calculate split indices
    train_end = int(train_frac * n_dates)
    val_end   = int((train_frac + val_frac) * n_dates)
    
    # Split dates
    train_dates = dates[:train_end]
    val_dates   = dates[train_end:val_end]
    test_dates  = dates[val_end:]
    
    # Create DataFrames for each set
    train_df = df[df[date_col].isin(train_dates)].reset_index(drop=True)
    val_df   = df[df[date_col].isin(val_dates)].reset_index(drop=True)
    test_df  = df[df[date_col].isin(test_dates)].reset_index(drop=True)


    return train_df, val_df, test_df


def scale_features(train_df, val_df, test_df, feature_cols):
    """
    Scales the specified feature columns to have mean 0 and standard deviation 1.
    
    Parameters:
    - train_df: DataFrame for training set.
    - val_df: DataFrame for validation set.
    - test_df: DataFrame for test set.
    - feature_cols: list of str, names of the columns to be scaled.
    
    Returns:
    - features_train: numpy array of scaled features for training set.
    - features_val: numpy array of scaled features for validation set.
    - features_test: numpy array of scaled features for test set.
    """
    scaler = StandardScaler()
    
    # Fit scaler on training data and transform
    features_train = scaler.fit_transform(train_df[feature_cols])
    
    # Transform validation and test data
    features_val   = scaler.transform(val_df[feature_cols])
    features_test  = scaler.transform(test_df[feature_cols])
    
    return features_train, features_val, features_test

In [3]:
import os, glob, hashlib, shutil
import torch
from torch.utils.data import Dataset
from transformers import AutoModel, AutoTokenizer

def setup_finbert(model_name = "yiyanghkust/finbert-tone"):
    """
    Sets up the FinBERT model and tokenizer for embedding financial text.

    Returns:
        model: The FinBERT model.
        tokenizer: The FinBERT tokenizer.
        device: torch.device
    """
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    encoder = AutoModel.from_pretrained(model_name)

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    encoder.to(device)
    encoder.eval()

    for param in encoder.parameters():  # freeze FinBERT weights
        param.requires_grad = False

    return encoder, tokenizer, device


def chunks(text, tokenizer, max_tokens=512, overlap=50):
    """
    Splits the input text into chunks of tokens with specified maximum length (512) and overlap.

    Args:
        text (str): The input text to be chunked.
        tokenizer: The tokenizer to convert text to token IDs.
        max_tokens (int): Maximum number of tokens per chunk.
        overlap (int): Number of overlapping tokens between consecutive chunks.

    Returns:
        List of chunks, where each chunk is a list of token IDs.        
    """
    tokens = tokenizer(
        text,
        add_special_tokens=False,
        truncation=False,
        return_attention_mask=False
    )["input_ids"]

    out = []
    start = 0
    while start < len(tokens):
        out.append(tokens[start:start + max_tokens])
        start += max_tokens - overlap
    return out


def chunk_to_vector(chunk_id_list, encoder, tokenizer, device, batch_size=16):
    """
    Takes in a list of chunks (each chunk is a list of token IDs), uses FinBERT to compute
    CLS vector for each chunk.

    Args:
        chunk_id_list: List of chunks, where each chunk is a list of token IDs.
        encoder: FinBERT model.
        tokenizer: FinBERT tokenizer.
        device: cpu or gpu device.
        batch_size: Number of chunks to process in a batch.

    Returns:
        torch.Tensor of shape (num_chunks, hidden_dim)
    """
    vecs = []

    with torch.no_grad():
        # process in batches and prepare inputs by padding/truncating
        for i in range(0, len(chunk_id_list), batch_size):
            batch = chunk_id_list[i:i + batch_size]

            inputs = [
                tokenizer.prepare_for_model(
                    ch,
                    add_special_tokens=True,
                    max_length=512,
                    truncation=True,
                    return_attention_mask=True
                )
                for ch in batch
            ]

            enc = tokenizer.pad(
                inputs,
                padding="max_length",
                max_length=512,
                return_tensors="pt"
            )

            # ensure batch dimension
            if enc["input_ids"].dim() == 1:
                enc["input_ids"] = enc["input_ids"].unsqueeze(0)
            if enc["attention_mask"].dim() == 1:
                enc["attention_mask"] = enc["attention_mask"].unsqueeze(0)
            if "token_type_ids" in enc and enc["token_type_ids"].dim() == 1:
                enc["token_type_ids"] = enc["token_type_ids"].unsqueeze(0)

            enc = {k: v.to(device) for k, v in enc.items()}

            out = encoder(**enc).last_hidden_state   # (B,512,768)
            vec = out[:, 0, :]                       # (B,768) CLS embedding
            vecs.append(vec)

    vec = torch.cat(vecs, dim=0)
    return vec  # (C,768)


def transcript_id(text):
    """
    Generates a unique identifier for a given transcript using its MD5 hash.
    """
    return hashlib.md5(text.encode("utf-8")).hexdigest()


def build_cache(data, cache_dir, encoder, tokenizer, device, overlap=0):
    """
    Takes in a dataset of transcripts, labels, and financial features, computes FinBERT
    embeddings, and caches to disk.

    Args:
        data: List of tuples (transcript, label, fin_features).
        cache_dir: Directory to store cached embeddings.
        encoder: FinBERT model.
        tokenizer: FinBERT tokenizer.
        device: cpu or gpu device.
        overlap: Number of overlapping tokens between consecutive chunks.

    """
    os.makedirs(cache_dir, exist_ok=True)

    for i, (transcript, y, fin_features) in enumerate(data):
        cid = transcript_id(transcript)
        path = os.path.join(cache_dir, f"{cid}.pt")
        if os.path.exists(path):
            continue

        chunk_id_list = chunks(transcript, tokenizer, overlap=overlap)
        Z = chunk_to_vector(chunk_id_list, encoder, tokenizer, device, batch_size=8)

        f = torch.tensor(fin_features, dtype=torch.float16)
        torch.save(
            {"Z": Z.to(torch.float16), "fin_features": f, "y": int(y)},
            path
        )

        if (i + 1) % 50 == 0:
            print(f"[{i+1}/{len(data)}] cached | files={len(glob.glob(cache_dir+'/*.pt'))}")

    print(f"Cached {len(data)} transcripts → {cache_dir}")


def zip_cache(cache_dir, zip_path):
    """
    Zips the entire cache directory into a single .zip file.
    """
    assert os.path.exists(cache_dir), f"{cache_dir} does not exist"
    shutil.make_archive(
        base_name=zip_path.replace(".zip", ""),
        format="zip",
        root_dir=cache_dir
    )
    print(f"Created zip file: {zip_path}")



def create_finbert_cache(raw_data,cache_dir=CACHE_DIR,return_days=1,overlap=50):
    """
    High-level function to create FinBERT cache from raw data.
    First split data by date into train/val/test, scale financial features, zip them and then input to create finbert cache.

    Args:
        data: List of tuples (transcript, label, fin_features).
        cache_dir: Directory to store cached embeddings.
        overlap: Number of overlapping tokens between consecutive chunks.
    """
    encoder, tokenizer, device = setup_finbert()

    train_data, val_data, test_data = test_train_split_by_date(raw_data)
    train_features,val_features, test_features = scale_features(train_data, val_data, test_data,["abvol_20d", "abcallday_r1", "abcallday_r5", "abcallday_r20"])
    train_transcripts,val_transcripts,test_transcripts=train_data["transcript"].tolist(),val_data["transcript"].tolist(),test_data["transcript"].tolist()
    
    if return_days==1:
        y_train_1d,y_val_1d,y_tet_1d=train_data["r1d_direction"],val_data["r1d_direction"],test_data["r1d_direction"]
    else:
        y_train_1d,y_val_1d,y_tet_1d=train_data["r5d_direction"],val_data["r5d_direction"],test_data["r5d_direction"]
    
    train_data, val_data, test_data = list(zip(train_transcripts, y_train_1d, train_features)), list(zip(val_transcripts, y_val_1d, val_features)), list(zip(test_transcripts, y_tet_1d, test_features))
    
    for data, split in zip([train_data, val_data, test_data], ["train", "val", "test"]):
        split_cache_dir = os.path.join(cache_dir, split,str(return_days))
        zip_path=os.path.join(cache_dir, f"cache_{split}_{return_days}"+".zip")    
        build_cache(data, split_cache_dir, encoder, tokenizer, device, overlap=overlap)
        zip_cache(split_cache_dir, zip_path)

# if __name__ == "__main__":
#     raw_data = prepare_earnings_data()
#     create_finbert_cache(raw_data,cache_dir=CACHE_DIR,return_days=1,overlap=50)
#     create_finbert_cache(raw_data,cache_dir=CACHE_DIR,return_days=5,overlap=50)


In [4]:
create_finbert_cache(raw_data,cache_dir=CACHE_DIR,return_days=1,overlap=50)

config.json:   0%|          | 0.00/533 [00:00<?, ?B/s]

vocab.txt: 0.00B [00:00, ?B/s]

pytorch_model.bin:   0%|          | 0.00/439M [00:00<?, ?B/s]

You're using a BertTokenizerFast tokenizer. Please note that with a fast tokenizer, using the `__call__` method is faster than using a method to encode the text followed by a call to the `pad` method to get a padded encoding.


model.safetensors:   0%|          | 0.00/439M [00:00<?, ?B/s]

[50/2316] cached | files=43
[100/2316] cached | files=90
[150/2316] cached | files=138
[200/2316] cached | files=182
[250/2316] cached | files=209
[300/2316] cached | files=257
[400/2316] cached | files=313
[450/2316] cached | files=355
[550/2316] cached | files=409
[650/2316] cached | files=456
[700/2316] cached | files=504
[750/2316] cached | files=554
[800/2316] cached | files=600
[900/2316] cached | files=693
[1000/2316] cached | files=777
[1050/2316] cached | files=824
[1100/2316] cached | files=866
[1150/2316] cached | files=911
[1200/2316] cached | files=957
[1250/2316] cached | files=1004
[1350/2316] cached | files=1099
[1400/2316] cached | files=1146
[1450/2316] cached | files=1190
[1500/2316] cached | files=1238
[1550/2316] cached | files=1283
[1600/2316] cached | files=1329
[1650/2316] cached | files=1366
[1700/2316] cached | files=1412
[1750/2316] cached | files=1460
[1800/2316] cached | files=1508
[1850/2316] cached | files=1557
[1950/2316] cached | files=1641
[2000/2316] 