In [None]:
import os
import pandas as pd
import numpy as np
import pickle
import torch
from tokenizers.implementations import ByteLevelBPETokenizer
from transformers import T5Config, T5ForConditionalGeneration
from math import ceil

# Initialize the tokenizer
tokenizer = ByteLevelBPETokenizer(
    "/Users/mac/Desktop/Code_Smell_Detection/dataset/codeT5/codet5-vocab.json",
    "/Users/mac/Desktop/Code_Smell_Detection/dataset/codeT5/codet5-merges.txt",
)

# Utility function to save a model using pickle
def save_model(file_name, model):
    with open(file_name, 'wb') as file:
        pickle.dump(model, file)

# Embed a sequence using the T5 model
def embed_sequence(model, sequence):
    out = model(
        input_ids=torch.tensor(sequence).to(torch.int64).unsqueeze(0),
        decoder_input_ids=torch.tensor(sequence).to(torch.int64).unsqueeze(0),
    )
    pooled_embedding = torch.mean(out.encoder_last_hidden_state[0], dim=0)
    return pooled_embedding.detach().numpy()

In [None]:
def safe_embed_sequence(model, sequence, max_tokens=512):
    embeddings = []
    for i in range(0, len(sequence), max_tokens):
        chunk = sequence[i:i+max_tokens]
        embedding = embed_sequence(model, chunk)
        embeddings.append(embedding)
    # Aggregate embeddings for all chunks (e.g., average or sum)
    return np.mean(embeddings, axis=0)

In [None]:
def embed_class_in_batches(df, model, df_path, batch_size=100):
    if 'embeded_sequence' not in df.columns:
        df['embeded_sequence'] = None

    total_batches = ceil(len(df) / batch_size)
    for batch_idx in range(total_batches):
        start_idx = batch_idx * batch_size
        end_idx = min((batch_idx + 1) * batch_size, len(df))
        print(f"Processing batch {batch_idx + 1}/{total_batches} (rows {start_idx} to {end_idx})")

        for i in range(start_idx, end_idx):
            row = df.iloc[i]
            print(f"Row {i}: Sample ID = {row['sample_id']}")
            try:
                # Compute the embedding
                embedding = embed_sequence(model, tokenizer.encode(row['method']).ids)
                df.at[i, 'embeded_sequence'] = embedding
            except Exception as e:
                print(f"EXCEPTION at row {i}")
                print(e)

        # Save progress after each batch
        pd.to_pickle(df, df_path)
        print(f"Batch {batch_idx + 1} saved to {df_path}")

    print("All batches processed and saved.")
    print(df.head())


In [None]:
def embed_line_by_line_in_batches(df, model, df_path, batch_size=100):
    directory = os.path.dirname(df_path)
    if directory and not os.path.exists(directory):
        os.makedirs(directory)
    df = df.copy()
    df['embeded_sequence_sum'] = None
    df['embeded_sequence_avg'] = None

    total_rows = len(df)
    for start in range(0, total_rows, batch_size):
        end = min(start + batch_size, total_rows)
        df_batch = df.iloc[start:end]
        print(f"Processing batch {start} to {end} (total {total_rows})")
        for i, row in df_batch.iterrows():
            print(f"Processing row {i}: Sample ID {row['sample_id']}")
            lines = row['method'].split('\n')
            embeded = []
            for line in lines:
                try:
                    if len(line) > 0:
                        tokens = tokenizer.encode(line).ids
                        embeded.append(embed_sequence(model, tokens))
                except Exception as e:
                    print(f"Exception in line processing: {e}\nLine: {line}")
            if embeded:
                df.at[i, 'embeded_sequence_sum'] = pd.Series(np.sum(np.asarray(embeded), axis=0).tolist())
                df.at[i, 'embeded_sequence_avg'] = pd.Series(np.mean(np.asarray(embeded), axis=0).tolist())

        pd.to_pickle(df, df_path)
        print(f"Batch {start}-{end} saved to {df_path}")

    print("Processing complete.")

In [None]:
# Load data
df = pd.read_csv('/Users/mac/Desktop/Code_Smell_Detection/dataset/data_class/data_class.csv')
df['label'] = np.where(df.severity == 'none', 0, 1)

# Process with small model
small_config_path = "/Users/mac/Desktop/Code_Smell_Detection/dataset/codeT5/small/config.json"
small_model_path = "/Users/mac/Desktop/Code_Smell_Detection/dataset/codeT5/small/pytorch_model.bin"

small_config = T5Config.from_json_file(small_config_path)
model_small = T5ForConditionalGeneration(small_config)
model_small.load_state_dict(torch.load(small_model_path, map_location=torch.device('cuda')))

In [None]:
# Embed line-by-line for the subset
embed_line_by_line_in_batches(
    df,
    model_small,
    '/Users/mac/Desktop/Code_Smell_Detection/dataset/data_class/T5/df_dc_embeded_by_line_small.pkl',
     batch_size=50
)

In [None]:
# Embed class-level embedding for the subset
embed_class_in_batches(
    df,
    model_small,
    '/Users/mac/Desktop/Code_Smell_Detection/dataset/data_class/T5/df_dc_embeded_small.pkl',
    batch_size=50,
)

In [None]:

# Clean up the small model
del model_small


In [None]:

# Process with base model
base_config_path = "/Users/mac/Desktop/Code_Smell_Detection/dataset/codeT5/base/config.json"
base_model_path = "/Users/mac/Desktop/Code_Smell_Detection/dataset/codeT5/base/pytorch_model.bin"

base_config = T5Config.from_json_file(base_config_path)
model_base = T5ForConditionalGeneration(base_config)
model_base.load_state_dict(torch.load(base_model_path, map_location=torch.device('cuda')))

In [None]:
# Embed line-by-line for the subset
embed_class_in_batches(
    df,
    model_base,
    '/Users/mac/Desktop/Code_Smell_Detection/dataset/data_class/T5/df_dc_embeded_by_line_base.pkl',
    batch_size=50,
)

In [None]:
# Embed class-level embedding for the subset
embed_class_in_batches(
    df,
    model_base,
    '/Users/mac/Desktop/Code_Smell_Detection/dataset/data_class/T5/df_dc_embeded_base.pkl',
    batch_size=50,
)

In [None]:
# Clean up the base model
del model_base