In [None]:
!pip install sentence-transformers pandas numpy pyarrow

Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch>=1.11.0->sentence-transformers)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch>=1.11.0->sentence-transformers)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch>=1.11.0->sentence-transformers)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch>=1.11.0->sentence-transformers)
  Downloading nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cublas-cu12==12.4.5.8 (from torch>=1.11.0->sentence-transformers)
  Downloading nvidia_cublas_cu12-12.4.5.8-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cufft-cu12==11.2.1.3 (from torch>=1.11.0->sentence-transformers)
 

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import os

output_directory = '/content/drive/MyDrive/ML_Project/Earning_calls_data'

if os.path.exists(output_directory):
    print(f"Path '{output_directory}' exists.")
    if os.path.isdir(output_directory):
        print(f"And it is a directory.")
    else:
        print(f"But it is NOT a directory (it's a file or something else).")
else:
    print(f"Path '{output_directory}' DOES NOT exist.")

Path '/content/drive/MyDrive/ML_Project/Earning_calls_data' exists.
And it is a directory.


In [None]:
import torch
print(f"PyTorch version: {torch.__version__}")
print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"CUDA version: {torch.version.cuda}")
    print(f"Current GPU device: {torch.cuda.current_device()}")
    print(f"GPU device name: {torch.cuda.get_device_name(torch.cuda.current_device())}")
else:
    print("WARNING: PyTorch cannot find CUDA. GPU will NOT be used.")

PyTorch version: 2.6.0+cu124
CUDA available: True
CUDA version: 12.4
Current GPU device: 0
GPU device name: Tesla T4


In [None]:
import pandas as pd
import numpy as np
import os
import gc
import glob
from typing import Union, List, Optional
import torch
from sentence_transformers import SentenceTransformer

def generate_text_embeddings(
    model: SentenceTransformer,
    texts: Union[List[str], pd.Series],
    batch_size: int = 64,
    normalize: bool = True
) -> np.ndarray:
    """
    Generates embeddings for a list of texts using a pre-loaded SentenceTransformer model.
    """

    processed_texts = [str(t) if pd.notna(t) else "" for t in texts]

    embeddings = model.encode(
        processed_texts,
        batch_size=batch_size,
        show_progress_bar=True,
        normalize_embeddings=normalize
    )
    return np.array(embeddings)

def process_and_save_embeddings(
    parquet_pattern: str,
    output_dir: str = 'text_embeddings_parts',
    text_column: str = 'full_text',
    metadata_columns_to_keep: List[str] = ['Date', 'transcriptid', 'gvkey'],
    embedding_column_name: str = 'embedding_vector',
    model_name: str = 'all-mpnet-base-v2',
    batch_size: int = 64,
    normalize: bool = True,
    part_size: int = 50000,
    verify_first_part_only: bool = False,
    num_verification_rows: int = 5
):
    os.makedirs(output_dir, exist_ok=True)

    target_device_for_model = None
    if torch.cuda.is_available():
        target_device_for_model = 'cuda'
        print(f"process_and_save_embeddings: CUDA available. Attempting to load model '{model_name}' onto GPU.")
    else:
        target_device_for_model = 'cpu'
        print(f"process_and_save_embeddings: CUDA not available. Loading model '{model_name}' onto CPU.")

    try:
        st_model = SentenceTransformer(model_name, device=target_device_for_model)
        print(f"Successfully loaded SentenceTransformer model '{model_name}' onto device: {st_model.device}")
    except Exception as e:
        print(f"Error loading SentenceTransformer model '{model_name}' onto device '{target_device_for_model}': {e}")
        print("Aborting process.")
        return

    all_files = sorted(glob.glob(parquet_pattern))
    if not all_files:
        print(f"No Parquet files found matching pattern: {parquet_pattern}. Exiting.")
        del st_model
        if target_device_for_model == 'cuda': torch.cuda.empty_cache()
        gc.collect()
        return

    global_output_part_counter = 4
    total_rows_processed_across_all_files = 0
    actual_data_columns_in_parquet = sorted(list(set([text_column] + [col for col in metadata_columns_to_keep if col.lower() != 'date'])))

    if verify_first_part_only:
        print(f"--- VERIFICATION MODE ENABLED: Processing only the first {num_verification_rows} rows from the first Parquet file found. ---")
        first_file_path = all_files[0]
        print(f"Verification: Loading data from first file: {first_file_path}")
        print(f"Reading actual Parquet columns: {actual_data_columns_in_parquet} (Date will be from index)")

        try:
            df_loaded_from_first_file = pd.read_parquet(first_file_path, columns=actual_data_columns_in_parquet)
        except Exception as e:
            print(f"Error reading first file {first_file_path} for verification: {e}. Aborting.")
            del st_model; gc.collect(); return

        if df_loaded_from_first_file.empty:
            print(f"First file {first_file_path} is empty. Cannot perform verification.")
            del st_model; gc.collect(); return

        df_from_first_file_with_date_col = df_loaded_from_first_file.copy()
        df_from_first_file_with_date_col['Date'] = df_loaded_from_first_file.index
        del df_loaded_from_first_file; gc.collect()

        df_verification_chunk = df_from_first_file_with_date_col.head(num_verification_rows).copy()
        actual_verification_rows = len(df_verification_chunk)
        del df_from_first_file_with_date_col; gc.collect()

        if df_verification_chunk.empty or df_verification_chunk[text_column].isnull().all():
            print(f"The first {actual_verification_rows} rows selected for verification are empty or all NaN in '{text_column}'. Halting.")
            del st_model; gc.collect(); return

        print(f"\nInput data for verification (first {actual_verification_rows} rows from {first_file_path}, 'Date' is from index):")
        print(df_verification_chunk[metadata_columns_to_keep + [text_column]].head())

        print(f"Generating embeddings for {len(df_verification_chunk)} verification texts...")
        embeddings_array = generate_text_embeddings(
            st_model, df_verification_chunk[text_column], batch_size, normalize
        )

        embedding_list_for_df = [list(row) for row in embeddings_array]
        df_embeddings_col_ver = pd.DataFrame({embedding_column_name: embedding_list_for_df})
        df_metadata_ver = df_verification_chunk[metadata_columns_to_keep].reset_index(drop=True)
        df_output_ver = pd.concat([df_metadata_ver, df_embeddings_col_ver], axis=1)

        output_path_verification = os.path.join(output_dir, f'text_embeddings_verification_first_{actual_verification_rows}_rows.parquet')
        df_output_ver.to_parquet(output_path_verification, index=False)

        print(f"\nVERIFICATION: Resulting combined DataFrame saved to {output_path_verification}")
        print(df_output_ver.head())
        df_output_ver.info()
        print(f"\n--- Computation halted after processing verification rows. ---")

        del df_verification_chunk, embeddings_array, df_embeddings_col_ver, df_metadata_ver, df_output_ver
        del st_model
        if target_device_for_model == 'cuda': torch.cuda.empty_cache()
        gc.collect()
        return

    print(f"\nStarting full processing. Found {len(all_files)} files. Model: '{model_name}' on {st_model.device}.")
    print(f"Will read actual Parquet columns: {actual_data_columns_in_parquet} (Date will be derived from index).")
    print(f"Desired output metadata: {metadata_columns_to_keep}. Embedding in column: '{embedding_column_name}'")

    for file_idx, file_path in enumerate(all_files):
        print(f"\n--- Processing file {file_idx + 1}/{len(all_files)}: {file_path} ---")
        try:
            df_loaded_from_parquet = pd.read_parquet(file_path, columns=actual_data_columns_in_parquet)
        except Exception as e:
            print(f"Error reading file {file_path}: {e}. Skipping this file.")
            continue

        if df_loaded_from_parquet.empty:
            print(f"File {file_path} is empty (after reading specified columns). Skipping.")
            del df_loaded_from_parquet; gc.collect()
            continue

        df_current_file_with_date_col = df_loaded_from_parquet.copy()
        df_current_file_with_date_col['Date'] = df_loaded_from_parquet.index
        del df_loaded_from_parquet; gc.collect()

        total_rows_in_current_file = len(df_current_file_with_date_col)
        num_parts_in_current_file = (total_rows_in_current_file + part_size - 1) // part_size
        print(f"Total rows in this file: {total_rows_in_current_file}. Processing in {num_parts_in_current_file} parts of up to {part_size} rows each.")

        for part_in_file_idx in range(num_parts_in_current_file):
            start_row = part_in_file_idx * part_size
            end_row = min(start_row + part_size, total_rows_in_current_file)
            df_chunk = df_current_file_with_date_col.iloc[start_row:end_row].copy()

            if df_chunk.empty or df_chunk[text_column].isnull().all():
                print(f"Skipping empty/all-NaN text chunk (rows {start_row}-{end_row-1}).")
                del df_chunk; gc.collect()
                continue

            print(f"Processing chunk {part_in_file_idx + 1}/{num_parts_in_current_file} ({len(df_chunk)} texts)...")
            embeddings_array = generate_text_embeddings(
                st_model, df_chunk[text_column], batch_size, normalize
            )

            embedding_list_for_df = [list(row) for row in embeddings_array]
            df_embeddings_col_part = pd.DataFrame({embedding_column_name: embedding_list_for_df})
            df_metadata_part = df_chunk[metadata_columns_to_keep].reset_index(drop=True)
            df_output_part = pd.concat([df_metadata_part, df_embeddings_col_part], axis=1)

            output_path = os.path.join(output_dir, f'text_embeddings_part_{global_output_part_counter:03}.parquet')
            df_output_part.to_parquet(output_path, index=False)
            print(f"Saved output part {global_output_part_counter} ({len(df_output_part)} rows) to {output_path}")

            total_rows_processed_across_all_files += len(df_output_part)
            global_output_part_counter += 1

            del df_chunk, embeddings_array, df_embeddings_col_part, df_metadata_part, df_output_part
            gc.collect()

        del df_current_file_with_date_col
        gc.collect()
        print(f"--- Finished processing file: {file_path} ---")

    print("\nReleasing SentenceTransformer model from memory...")
    del st_model
    if target_device_for_model == 'cuda':
        print("Emptying CUDA cache...")
        torch.cuda.empty_cache()
    gc.collect()

    if total_rows_processed_across_all_files > 0:
        print(f"\nAll {len(all_files)} files processed. Total {total_rows_processed_across_all_files} embeddings saved in {global_output_part_counter} parts to directory: {output_dir}")
    else:
        print(f"\nNo data was processed from any of the files found or all files were empty/problematic.")

In [None]:
# 0. Free memory
print("Attempting to free memory before starting the main processing...")
gc.collect()

# 1. Define input pattern and output directory
input_data_directory = '/content/drive/MyDrive/ML_Project/Earning_calls_data'
file_name_pattern = 'earning_calls_full_part_*.parquet'
INPUT_PARQUET_PATTERN = os.path.join(input_data_directory, file_name_pattern)

OUTPUT_EMBEDDINGS_DIR = '/content/drive/MyDrive/ML_Project/Earning_calls_embedding'

# 2. Specify text column, metadata columns, and embedding column name
TEXT_COLUMN_NAME = 'full_text'
METADATA_COLUMNS_TO_APPEAR_IN_OUTPUT = ['Date', 'transcriptid', 'gvkey']
EMBEDDING_COLUMN_OUTPUT_NAME = 'embedding_vector'

# 3. Ensure the output directory exists
os.makedirs(OUTPUT_EMBEDDINGS_DIR, exist_ok=True)
print(f"Input pattern: {INPUT_PARQUET_PATTERN}")
print(f"Output directory: {OUTPUT_EMBEDDINGS_DIR}")

# 4. Call main processing function

# Run in VERIFICATION mode (first 5 rows of the first file)
# print("\nStarting the embedding generation process in VERIFICATION MODE...")
# process_and_save_embeddings(
#     parquet_pattern=INPUT_PARQUET_PATTERN,
#     output_dir=OUTPUT_EMBEDDINGS_DIR,
#     text_column=TEXT_COLUMN_NAME,
#     metadata_columns_to_keep=METADATA_COLUMNS_TO_APPEAR_IN_OUTPUT,
#     embedding_column_name=EMBEDDING_COLUMN_OUTPUT_NAME,
#     model_name='all-mpnet-base-v2',
#     batch_size=64,
#     normalize=True,
#     part_size=10000,
#     verify_first_part_only=True,
#     num_verification_rows=5
# )

# Run in FULL PROCESSING mode (Ensure Option A is commented out) ---
print("\nStarting the embedding generation process in FULL PROCESSING MODE...")
process_and_save_embeddings(
    parquet_pattern=INPUT_PARQUET_PATTERN,
    output_dir=OUTPUT_EMBEDDINGS_DIR,
    text_column=TEXT_COLUMN_NAME,
    metadata_columns_to_keep=METADATA_COLUMNS_TO_APPEAR_IN_OUTPUT,
    embedding_column_name=EMBEDDING_COLUMN_OUTPUT_NAME,
    model_name='all-mpnet-base-v2',
    batch_size=128,
    normalize=True,
    part_size=25000,
    verify_first_part_only=False
)

print("\n--- Script execution cell finished. ---")

Attempting to free memory before starting the main processing...
Input pattern: /content/drive/MyDrive/ML_Project/Earning_calls_data/earning_calls_full_part_*.parquet
Output directory: /content/drive/MyDrive/ML_Project/Earning_calls_embedding

Starting the embedding generation process in FULL PROCESSING MODE...
process_and_save_embeddings: CUDA available. Attempting to load model 'all-mpnet-base-v2' onto GPU.
Successfully loaded SentenceTransformer model 'all-mpnet-base-v2' onto device: cuda:0

Starting full processing. Found 2 files. Model: 'all-mpnet-base-v2' on cuda:0.
Will read actual Parquet columns: ['full_text', 'gvkey', 'transcriptid'] (Date will be derived from index).
Desired output metadata: ['Date', 'transcriptid', 'gvkey']. Embedding in column: 'embedding_vector'

--- Processing file 1/2: /content/drive/MyDrive/ML_Project/Earning_calls_data/earning_calls_full_part_001.parquet ---
Total rows in this file: 100000. Processing in 4 parts of up to 25000 rows each.
Processing ch

Batches:   0%|          | 0/196 [00:00<?, ?it/s]

Saved output part 4 (25000 rows) to /content/drive/MyDrive/ML_Project/Earning_calls_embedding/text_embeddings_part_004.parquet
Processing chunk 2/4 (25000 texts)...


Batches:   0%|          | 0/196 [00:00<?, ?it/s]

Saved output part 5 (25000 rows) to /content/drive/MyDrive/ML_Project/Earning_calls_embedding/text_embeddings_part_005.parquet
Processing chunk 3/4 (25000 texts)...


Batches:   0%|          | 0/196 [00:00<?, ?it/s]

Saved output part 6 (25000 rows) to /content/drive/MyDrive/ML_Project/Earning_calls_embedding/text_embeddings_part_006.parquet
Processing chunk 4/4 (25000 texts)...


Batches:   0%|          | 0/196 [00:00<?, ?it/s]

Saved output part 7 (25000 rows) to /content/drive/MyDrive/ML_Project/Earning_calls_embedding/text_embeddings_part_007.parquet
--- Finished processing file: /content/drive/MyDrive/ML_Project/Earning_calls_data/earning_calls_full_part_001.parquet ---

--- Processing file 2/2: /content/drive/MyDrive/ML_Project/Earning_calls_data/earning_calls_full_part_002.parquet ---
Total rows in this file: 16274. Processing in 1 parts of up to 25000 rows each.
Processing chunk 1/1 (16274 texts)...


Batches:   0%|          | 0/128 [00:00<?, ?it/s]

Saved output part 8 (16274 rows) to /content/drive/MyDrive/ML_Project/Earning_calls_embedding/text_embeddings_part_008.parquet
--- Finished processing file: /content/drive/MyDrive/ML_Project/Earning_calls_data/earning_calls_full_part_002.parquet ---

Releasing SentenceTransformer model from memory...
Emptying CUDA cache...

All 2 files processed. Total 116274 embeddings saved in 9 parts to directory: /content/drive/MyDrive/ML_Project/Earning_calls_embedding

--- Script execution cell finished. ---
