In [15]:
import pandas as pd
import numpy as np
from transformers import BertTokenizer, BertModel
from sklearn.metrics.pairwise import cosine_similarity
from tqdm import tqdm
import os
import pickle
from collections import defaultdict
import torch
import gc
import shutil

device = torch.device('cpu')
SIMILARITY_THRESHOLD = 0.9
TARGET_PER_TICKER = 3000
POSITIVE_PER_TICKER = 1500
NEGATIVE_PER_TICKER = 1500
CHUNKSIZE = 50000
MAX_LEN = 128
MODEL_NAME = "sentence-transformers/all-MiniLM-L6-v2"
EMBEDDING_BATCH_SIZE = 64

We want to use contextual-embedding to deduplicate

In [16]:
print("loading tokenizer...")
tokenizer = BertTokenizer.from_pretrained(MODEL_NAME)

print("loading model and applying dynamic quantization...")
base_model = BertModel.from_pretrained(MODEL_NAME)
bert_model = torch.quantization.quantize_dynamic(
    base_model, {torch.nn.Linear}, dtype=torch.qint8
)
bert_model.eval()

print("Model optimized (int8 quantized) ready.")

loading tokenizer...
loading model and applying dynamic quantization...




Model optimized (int8 quantized) ready.


In [17]:
def get_bert_embedding(texts, batch_size = 64):
    all_embeddings = []
    for i in range(0, len(texts), batch_size):
        batch_texts = texts[i:i+batch_size]

        #TOKENIZE
        encoded = tokenizer.batch_encode_plus(
            batch_texts,
            add_special_tokens = True,
            max_length = MAX_LEN,
            padding = 'max_length',
            truncation = True,
            return_attention_mask = True,
            return_tensors = 'pt',
        )
        input_ids = encoded['input_ids']
        attention_mask = encoded['attention_mask']

        with torch.no_grad():
            outputs = bert_model(input_ids = input_ids, attention_mask = attention_mask)
            token_embeddings = outputs.last_hidden_state
            input_mask_expanded = attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float()
            sum_embeddings = torch.sum(token_embeddings * input_mask_expanded, 1)
            sum_mask = torch.clamp(input_mask_expanded.sum(1), min = 1e-9)
            embeddings = (sum_embeddings / sum_mask).numpy()
            norms = np.linalg.norm(embeddings, axis = 1, keepdims = True)
            embeddings = embeddings / (norms + 1e-9)

        all_embeddings.append(embeddings)

    return np.vstack(all_embeddings)

In [18]:
def deduplicate_ticker_data(df_ticker):
    if len(df_ticker) == 0:
        return df_ticker

    # hard-coded deduplication
    # if the headline is the same, keep the one with the largest abs(event_sentiment_score)
    original_len = len(df_ticker)
    df_ticker['abs_score'] = df_ticker['event_sentiment_score'].abs()
    df_ticker = df_ticker.sort_values(by = 'abs_score', ascending = False)
    df_ticker = df_ticker.drop_duplicates(subset = ['headline'], keep = 'first')
    df_ticker = df_ticker.drop(columns = ['abs_score']).reset_index(drop = True)

    if len(df_ticker) < 2:
        return df_ticker

    # contextual-embedding deduplication
    texts = df_ticker['headline'].astype(str).str.strip().tolist()
    embeddings = get_bert_embedding(texts, batch_size = EMBEDDING_BATCH_SIZE)
    sim_matrix = np.dot(embeddings, embeddings.T)
    high_sim_indices = np.where(np.triu(sim_matrix, k = 1) > SIMILARITY_THRESHOLD)

    to_remove = set()
    scores = df_ticker['event_sentiment_score'].values

    for idx1, idx2 in zip(high_sim_indices[0], high_sim_indices[1]):
        if idx1 in to_remove or idx2 in to_remove:
            continue

        if abs(scores[idx1]) >= abs(scores[idx2]):
            to_remove.add(idx2)
        else:
            to_remove.add(idx1)

    keep_indices = [i for i in range(len(df_ticker)) if i not in to_remove]
    df_final = df_ticker.iloc[keep_indices].reset_index(drop = True)

    return df_final


In [19]:
def balance_samples(df_dedup):
    if len(df_dedup) == 0:
        return pd.DataFrame()
    
    if 'label' not in df_dedup.columns:
        df_dedup['label'] = df_dedup['event_sentiment_score'].apply(
            lambda x: 1 if x > 0.2 else (0 if x < 0 else 2)
        )
    
    df_positive = df_dedup[df_dedup['label'] == 1]
    df_negative = df_dedup[df_dedup['label'] == 0]

    # sampling
    n_pos = min(len(df_positive), POSITIVE_PER_TICKER)
    n_neg = min(len(df_negative), NEGATIVE_PER_TICKER)

    if n_pos > 0:
        pos_sampled = df_positive.sample(n = n_pos, random_state = 42)
    else:
        pos_sampled = pd.DataFrame()
    
    if n_neg > 0:
        neg_sampled = df_negative.sample(n = n_neg, random_state = 42)
    else:
        neg_sampled = pd.DataFrame()
    
    df_final = pd.concat([pos_sampled, neg_sampled], ignore_index = True)
    
    return df_final.reset_index(drop = True)

In [20]:
def process_data_pipeline_stratified(input_path, output_path, temp_base_dir="./temp_pipeline"):
    # parameters
    RAW_POS_LIMIT = int(POSITIVE_PER_TICKER * 8)
    RAW_NEG_LIMIT = int(NEGATIVE_PER_TICKER * 8)
    
    print(f"Stratified Collection Targets per Ticker:")
    print(f"  - Raw Positive needed: {RAW_POS_LIMIT}")
    print(f"  - Raw Negative needed: {RAW_NEG_LIMIT}")
    print(f"  - Neutral: Ignored (to save space)")
    
    # directory preparation
    temp_ticker_dir = os.path.join(temp_base_dir, "raw_tickers")
    if os.path.exists(temp_base_dir):
        shutil.rmtree(temp_base_dir)
    os.makedirs(temp_ticker_dir, exist_ok=True)
    
    os.makedirs(os.path.dirname(output_path), exist_ok=True)
    if os.path.exists(output_path):
        os.remove(output_path)
    
    # Step 1: Scanning & Stratified Sampling (Pos/Neg)
    print("="*60)
    print("Step 1: Scanning & Stratified Sampling (Pos/Neg)")
    print("="*60)
    
    # counter structure: counts[ticker] = {'pos': 0, 'neg': 0}
    ticker_counts = defaultdict(lambda: {'pos': 0, 'neg': 0})
    # record completed tickers (both pos and neg are full)
    ticker_completed = set()
    
    seen_hashes = set() 
    
    chunk_iter = pd.read_csv(input_path, chunksize=CHUNKSIZE, low_memory=False)
    
    for i, chunk in enumerate(tqdm(chunk_iter, desc="Reading Chunks")):
        # 1. basic cleaning
        chunk = chunk.dropna(subset=['headline', 'event_sentiment_score', 'ticker'])
        chunk['headline'] = chunk['headline'].astype(str).str.strip()
        chunk = chunk[chunk['headline'] != '']
        
        # 2. fast deduplication (String Hash)
        current_hashes = chunk['headline'].apply(hash)
        is_new = ~current_hashes.isin(seen_hashes)
        seen_hashes.update(current_hashes[is_new])
        chunk = chunk[is_new]
        
        if len(chunk) == 0: continue
            
        # 3. exclude tickers that are already full
        chunk = chunk[~chunk['ticker'].isin(ticker_completed)]
        if len(chunk) == 0: continue
            
        # 4. label
        # 1: Positive, 0: Negative, 2: Neutral
        # note: here we directly filter out neutral data (Label 2), because we only need positive and negative
        labels = chunk['event_sentiment_score'].apply(
            lambda x: 1 if x > 0.2 else (0 if x < 0 else 2)
        )
        chunk = chunk.assign(label=labels)
        
        # 6. only keep Positive (1) and Negative (0)
        chunk = chunk[chunk['label'].isin([0, 1])]
        if len(chunk) == 0: continue

        # 7. distribution logic
        for ticker, group in chunk.groupby('ticker'):
            if ticker in ticker_completed: continue
            
            new_pos = group[group['label'] == 1]
            new_neg = group[group['label'] == 0]
            
            current_counts = ticker_counts[ticker]
            
            # calculate how many more we need
            needed_pos = max(0, RAW_POS_LIMIT - current_counts['pos'])
            needed_neg = max(0, RAW_NEG_LIMIT - current_counts['neg'])
            
            to_save_list = []
            
            if needed_pos > 0 and len(new_pos) > 0:
                take_pos = new_pos.head(needed_pos)
                to_save_list.append(take_pos)
                current_counts['pos'] += len(take_pos)
            
            if needed_neg > 0 and len(new_neg) > 0:
                take_neg = new_neg.head(needed_neg)
                to_save_list.append(take_neg)
                current_counts['neg'] += len(take_neg)
            
            # if there is data to save
            if to_save_list:
                to_save = pd.concat(to_save_list)
                save_path = os.path.join(temp_ticker_dir, f"{ticker}.csv")
                header = not os.path.exists(save_path)
                to_save.to_csv(save_path, mode='a', header=header, index=False)
            
            # check if the ticker is full
            if current_counts['pos'] >= RAW_POS_LIMIT and current_counts['neg'] >= RAW_NEG_LIMIT:
                ticker_completed.add(ticker)
                
        if i % 10 == 0:
            gc.collect()

    print(f"\nStep 1 Completed. Collected data for {len(ticker_counts)} tickers.")
    
    # === Step 2: Processing per ticker===
    
    print("\n" + "="*60)
    print("Step 2: Processing tickers (Deduplication + Balancing)")
    print("="*60)
    
    tickers_to_process = sorted(list(ticker_counts.keys()))
    write_header = True
    processed_buffer = []
    stats_list = []
    
    for ticker in tqdm(tickers_to_process, desc="Processing Tickers"):
        file_path = os.path.join(temp_ticker_dir, f"{ticker}.csv")
        if not os.path.exists(file_path): continue
        
        try:
            df = pd.read_csv(file_path)
            
            
            # 1. deduplication
            df_dedup = deduplicate_ticker_data(df)
            
            # 2. sampling (ensure 1.5k/1.5k)
            df_final = balance_samples(df_dedup)
            
            # statistics
            stats_list.append({
                'ticker': ticker,
                'raw_pos_collected': len(df[df['label']==1]),
                'raw_neg_collected': len(df[df['label']==0]),
                'final_pos': len(df_final[df_final['label']==1]),
                'final_neg': len(df_final[df_final['label']==0]),
            })
            
            if len(df_final) > 0:
                processed_buffer.append(df_final)
            
            if len(processed_buffer) >= 5:
                batch_df = pd.concat(processed_buffer, ignore_index=True)
                batch_df = batch_df[['ticker', 'headline', 'event_sentiment_score', 'label']]
                batch_df.to_csv(output_path, mode='a', header=write_header, index=False)
                write_header = False 
                processed_buffer = [] 
                gc.collect()
            
            os.remove(file_path)
            
        except Exception as e:
            print(f"Error processing {ticker}: {e}")
            continue

    if len(processed_buffer) > 0:
        batch_df = pd.concat(processed_buffer, ignore_index=True)
        batch_df = batch_df[['ticker', 'headline', 'event_sentiment_score', 'label']]
        batch_df.to_csv(output_path, mode='a', header=write_header, index=False)
    
    # save statistics
    pd.DataFrame(stats_list).to_csv(output_path.replace('.csv', '_stats.csv'), index=False)
    shutil.rmtree(temp_base_dir)
    print(f"\nDone! Final data saved to: {output_path}")

In [21]:
input_path = "../data/raw/raw_data.csv"
output_path = "../data/processed/balanced_data.csv"

process_data_pipeline_stratified(input_path, output_path)

Stratified Collection Targets per Ticker:
  - Raw Positive needed: 12000
  - Raw Negative needed: 12000
  - Neutral: Ignored (to save space)
Step 1: Scanning & Stratified Sampling (Pos/Neg)


Reading Chunks: 2132it [21:12,  1.68it/s]



Step 1 Completed. Collected data for 499 tickers.

Step 2: Processing tickers (Deduplication + Balancing)


Processing Tickers: 100%|██████████| 499/499 [1:44:09<00:00, 12.53s/it]  


Done! Final data saved to: ../data/processed/balanced_data.csv



