In [None]:
!pip install sentence_transformers -qq
from sentence_transformers.evaluation import BinaryClassificationEvaluator
from sentence_transformers.readers import InputExample
from sentence_transformers import SentenceTransformer, losses
from sentence_transformers.evaluation import TripletEvaluator

[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/156.5 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m30.7/156.5 kB[0m [31m1.1 MB/s[0m eta [36m0:00:01[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m[90m━━━━━━━━━━━[0m [32m112.6/156.5 kB[0m [31m1.5 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m156.5/156.5 kB[0m [31m1.8 MB/s[0m eta [36m0:00:00[0m
[?25h

In [None]:
# classsentence_transformers.evaluation.BinaryClassificationEvaluator(sentences1: List[str], sentences2: List[str], labels: List[int], name: str = '', batch_size: int = 32, show_progress_bar: bool = False, write_csv: bool = True)
# Evaluate a model based on the similarity of the embeddings by calculating the accuracy of identifying similar and dissimilar sentences. The metrics are the cosine similarity as well as euclidean and Manhattan distance The returned score is the accuracy with a specified metric.

# The results are written in a CSV. If a CSV already exists, then values are appended.

# The labels need to be 0 for dissimilar pairs and 1 for similar pairs.

# Parameters
# sentences1 – The first column of sentences

# sentences2 – The second column of sentences

# labels – labels[i] is the label for the pair (sentences1[i], sentences2[i]). Must be 0 or 1

# name – Name for the output

# batch_size – Batch size used to compute embeddings

# show_progress_bar – If true, prints a progress bar

# write_csv – Write results to a CSV file

In [None]:
BinaryClassificationEvaluator.__call__
   def __init__(
        self,
        sentences1: List[str],
        sentences2: List[str],
        labels: List[int],
        name: str = "",
        batch_size: int = 32,
        show_progress_bar: bool = False,
        write_csv: bool = True,
            ):
        self.sentences1 = sentences1
        self.sentences2 = sentences2
        self.labels = labels

        assert len(self.sentences1) == len(self.sentences2)
        assert len(self.sentences1) == len(self.labels)
        for label in labels:
            assert label == 0 or label == 1

        self.write_csv = write_csv
        self.name = name
        self.batch_size = batch_size
        if show_progress_bar is None:
            show_progress_bar = (
                logger.getEffectiveLevel() == logging.INFO or logger.getEffectiveLevel() == logging.DEBUG
            )
        self.show_progress_bar = show_progress_bar

        self.csv_file = "binary_classification_evaluation" + ("_" + name if name else "") + "_results.csv"
        self.csv_headers = [
            "epoch",
            "steps",
            "cossim_accuracy",
            "cossim_accuracy_threshold",
            "cossim_f1",
            "cossim_precision",
            "cossim_recall",
            "cossim_f1_threshold",
            "cossim_ap",
            "manhattan_accuracy",
            "manhattan_accuracy_threshold",
            "manhattan_f1",
            "manhattan_precision",
            "manhattan_recall",
            "manhattan_f1_threshold",
            "manhattan_ap",
            "euclidean_accuracy",
            "euclidean_accuracy_threshold",
            "euclidean_f1",
            "euclidean_precision",
            "euclidean_recall",
            "euclidean_f1_threshold",
            "euclidean_ap",
            "dot_accuracy",
            "dot_accuracy_threshold",
            "dot_f1",
            "dot_precision",
            "dot_recall",
            "dot_f1_threshold",
            "dot_ap",
        ]
def __call__(self, model, output_path: str = None, epoch: int = -1, steps: int = -1) -> float:
        scores = self.compute_metrices(model)

        # Main score is the max of Average Precision (AP)
        main_score = max(scores[short_name]["ap"] for short_name in scores)

        file_output_data = [epoch, steps]

        for header_name in self.csv_headers:
            if "_" in header_name:
                sim_fct, metric = header_name.split("_", maxsplit=1)
                file_output_data.append(scores[sim_fct][metric])

        if output_path is not None and self.write_csv:
            csv_path = os.path.join(output_path, self.csv_file)
            if not os.path.isfile(csv_path):
                with open(csv_path, newline="", mode="w", encoding="utf-8") as f:
                    writer = csv.writer(f)
                    writer.writerow(self.csv_headers)
                    writer.writerow(file_output_data)
            else:
                with open(csv_path, newline="", mode="a", encoding="utf-8") as f:
                    writer = csv.writer(f)
                    writer.writerow(file_output_data)

        return main_score



def compute_metrices(self, model):
    try:
        # If the sentences are hashable, then we can use a set to avoid embedding the same sentences multiple times
        sentences = list(set(self.sentences1 + self.sentences2))
        embeddings = model.encode(
            sentences, batch_size=self.batch_size, show_progress_bar=self.show_progress_bar, convert_to_numpy=True
        )
        emb_dict = {sent: emb for sent, emb in zip(sentences, embeddings)}
        embeddings1 = [emb_dict[sent] for sent in self.sentences1]
        embeddings2 = [emb_dict[sent] for sent in self.sentences2]
    except TypeError:
        # Otherwise we just embed everything, e.g. if the sentences are images for evaluating a CLIP model
        embeddings = model.encode(
            self.sentences1 + self.sentences2,
            batch_size=self.batch_size,
            show_progress_bar=self.show_progress_bar,
            convert_to_numpy=True,
        )
        embeddings1 = embeddings[: len(self.sentences1)]
        embeddings2 = embeddings[len(self.sentences1) :]

    cosine_scores = 1 - paired_cosine_distances(embeddings1, embeddings2)
    manhattan_distances = paired_manhattan_distances(embeddings1, embeddings2)
    euclidean_distances = paired_euclidean_distances(embeddings1, embeddings2)

    embeddings1_np = np.asarray(embeddings1)
    embeddings2_np = np.asarray(embeddings2)
    dot_scores = [np.dot(embeddings1_np[i], embeddings2_np[i]) for i in range(len(embeddings1_np))]

    labels = np.asarray(self.labels)
    output_scores = {}
    for short_name, name, scores, reverse in [
        ["cossim", "Cosine-Similarity", cosine_scores, True],
        ["manhattan", "Manhattan-Distance", manhattan_distances, False],
        ["euclidean", "Euclidean-Distance", euclidean_distances, False],
        ["dot", "Dot-Product", dot_scores, True],
    ]:
        acc, acc_threshold = self.find_best_acc_and_threshold(scores, labels, reverse)
        f1, precision, recall, f1_threshold = self.find_best_f1_and_threshold(scores, labels, reverse)
        ap = average_precision_score(labels, scores * (1 if reverse else -1))

        logger.info(
            "Accuracy with {}:           {:.2f}\t(Threshold: {:.4f})".format(name, acc * 100, acc_threshold)
        )
        logger.info("F1 with {}:                 {:.2f}\t(Threshold: {:.4f})".format(name, f1 * 100, f1_threshold))
        logger.info("Precision with {}:          {:.2f}".format(name, precision * 100))
        logger.info("Recall with {}:             {:.2f}".format(name, recall * 100))
        logger.info("Average Precision with {}:  {:.2f}\n".format(name, ap * 100))

        output_scores[short_name] = {
            "accuracy": acc,
            "accuracy_threshold": acc_threshold,
            "f1": f1,
            "f1_threshold": f1_threshold,
            "precision": precision,
            "recall": recall,
            "ap": ap,
        }

    return output_scores

In [None]:
# classsentence_transformers.evaluation.TripletEvaluator(anchors: List[str], positives: List[str], negatives: List[str], main_distance_function: Optional[sentence_transformers.evaluation.SimilarityFunction.SimilarityFunction] = None, name: str = '', batch_size: int = 16, show_progress_bar: bool = False, write_csv: bool = True)
# Evaluate a model based on a triplet: (sentence, positive_example, negative_example).
# Checks if distance(sentence, positive_example) < distance(sentence, negative_example).

# Parameters
# anchors – Sentences to check similarity to. (e.g. a query)

# positives – List of positive sentences

# negatives – List of negative sentences

# main_distance_function – One of 0 (Cosine), 1 (Euclidean) or 2 (Manhattan). Defaults to None, returning all 3.

# name – Name for the output

# batch_size – Batch size used to compute embeddings

# show_progress_bar – If true, prints a progress bar

# write_csv – Write results to a CSV file



In [None]:
!pip install lightning -qq
import os
import re
import random
import pickle
import numpy as np
import pandas as pd
import torch
import lightning as L

from glob import glob
from tqdm.auto import tqdm
from collections import defaultdict
from sklearn.model_selection import KFold
from torch.utils.data import DataLoader
from sklearn.metrics.pairwise import paired_cosine_distances, paired_euclidean_distances, paired_manhattan_distances

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.1/2.1 MB[0m [31m9.6 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m841.5/841.5 kB[0m [31m29.6 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m801.6/801.6 kB[0m [31m34.3 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m23.7/23.7 MB[0m [31m27.2 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m823.6/823.6 kB[0m [31m24.7 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m14.1/14.1 MB[0m [31m43.9 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m731.7/731.7 MB[0m [31m1.2 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m410.6/410.6 MB[0m [31m1.5 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━

In [None]:
from google.colab import drive
drive.mount('/content/drive')
!unzip -j -qq "/content/drive/MyDrive/code.zip" -d "/content"

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
import pandas as pd

train = pd.read_csv('./sample_train.csv')
test = pd.read_csv('./test.csv')
submit = pd.read_csv('./sample_submission.csv')

In [None]:
test

In [None]:
train = pd.read_csv('./sample_train.csv')
train['code1_problem'] = train['code1_path'].apply(lambda x: int(os.path.basename(x).split('_')[0].split('problem')[1]))
train['code2_problem'] = train['code2_path'].apply(lambda x: int(os.path.basename(x).split('_')[0].split('problem')[1]))

In [None]:
tmp = []
for i in glob('./*cpp'):
  k = i.split('_')[1]
  k =k.replace('.cpp','')
  tmp.append(int(k))

In [None]:
tmp

NameError: name 'tmp' is not defined

In [None]:
label_texts = defaultdict(list)
code_paths = glob('./*.cpp')
for code_path in tqdm(code_paths):
    code_basename = os.path.basename(code_path)
    label = int(code_basename.split('_')[0].split('problem')[1])
    with open(code_path, 'r', encoding='utf-8') as f:
        code_text = f.read()
    label_texts[label].append(code_text)

  0%|          | 0/250000 [00:00<?, ?it/s]

In [None]:
labels = np.array(list(label_texts.keys()))

In [None]:
labels

In [None]:
train['code1']

In [None]:
from sentence_transformers.evaluation import BinaryClassificationEvaluator
import csv
BinaryClassificationEvaluator.__call__
class CustomEvaluator(BinaryClassificationEvaluator):
    def __call__(self, model, output_path: str = None, epoch: int = -1, steps: int = -1) -> float:
        scores = self.compute_metrices(model)
        main_score = max(scores[short_name]["accuracy"] for short_name in scores)
        file_output_data = [epoch, steps]
        for header_name in self.csv_headers:
            if "_" in header_name:
                sim_fct, metric = header_name.split("_", maxsplit=1)
                file_output_data.append(scores[sim_fct][metric])
        if output_path is not None and self.write_csv:
            csv_path = os.path.join(output_path, self.csv_file)
            if not os.path.isfile(csv_path):
                with open(csv_path, newline="", mode="w", encoding="utf-8") as f:
                    writer = csv.writer(f)
                    writer.writerow(self.csv_headers)
                    writer.writerow(file_output_data)
            else:
                with open(csv_path, newline="", mode="a", encoding="utf-8") as f:
                    writer = csv.writer(f)
                    writer.writerow(file_output_data)
        return main_score

In [None]:
N_SPLIT = 5
EPOCHS = 5
batchsize = 4

In [None]:
evaluator = CustomEvaluator(sentences1 = train['code1'].values.tolist(),
                            sentences2 = train['code2'].values.tolist(),
                            labels = train['similar'].values.tolist(),

                            batch_size = batchsize,
                            show_progress_bar = True,
                            write_csv = True,)

In [None]:
kf =KFold(n_splits=N_SPLIT,shuffle=True, random_state=42)
for fold_idx, (train_index, val_index) in enumerate(kf.split(labels)):
    # fold마다 oom이 발생해서 커널을 다시 시작해야합니다.
    # if fold!=0: continue
    labels_train_fold = labels[train_index]
    labels_val_fold = labels[val_index]
    val_df = train[train['code1_problem'].isin(labels_val_fold) & train['code2_problem'].isin(labels_val_fold)]
    label_min = min((val_df['similar'] == 0).sum(),(val_df['similar'] == 1).sum())
    val_df = pd.concat([val_df[val_df['similar']==0].sample(label_min),val_df[val_df['similar']==1].sample(label_min)],axis=0)

    train_examples = []
    for label_train in labels_train_fold:
        for code_text in label_texts[label_train]:
            train_examples.append(InputExample(texts=[code_text], label=label_train))

    train_dataloader = DataLoader(train_examples, shuffle=True, batch_size=batchsize)

    val_evaluator = CustomEvaluator(
        sentences1=val_df['code1'].values.tolist(),
        sentences2=val_df['code2'].values.tolist(),
        labels=val_df['similar'].values.tolist(),
        batch_size=batchsize,
        show_progress_bar=True,
        write_csv=True,
    )
    # model = SentenceTransformer('flax-sentence-embeddings/st-codesearch-distilroberta-base')
    model = SentenceTransformer('microsoft/codereviewer')
    model.forward = torch.compile(model.forward, mode="reduce-overhead")
    train_loss = losses.BatchHardSoftMarginTripletLoss(model=model)
    model.fit(
        use_amp=True,
        train_objectives=[(train_dataloader, train_loss)],
        epochs=EPOCHS,
        warmup_steps=len(train_examples)//batchsize,
        save_best_model=True,
        evaluator=val_evaluator,
        output_path=f'./checkpoints/codereviewer-{fold_idx=}',
    )


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


config.json:   0%|          | 0.00/2.13k [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/892M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/1.29k [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/575k [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/294k [00:00<?, ?B/s]

added_tokens.json:   0%|          | 0.00/1.87k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/913 [00:00<?, ?B/s]

Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.
Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.


Epoch:   0%|          | 0/5 [00:00<?, ?it/s]

Iteration:   0%|          | 0/50000 [00:00<?, ?it/s]

In [None]:
ztest_df = pd.read_csv('./data/test.csv')
sentences1, sentences2 = test_df['code1'].values.tolist(), test_df['code2'].values.tolist()
sentences = list(set(sentences1 + sentences2))

preds = []
for fold_idx in range(5):
    model = SentenceTransformer(f'./checkpoints/codereviewer-{fold_idx=}')
    embeddings = model.encode(
        sentences, batch_size=BATCH_SIZE, show_progress_bar=True, convert_to_numpy=True
    )
    emb_dict = {sent: emb for sent, emb in zip(sentences, embeddings)}
    embeddings1 = [emb_dict[sent] for sent in sentences1]
    embeddings2 = [emb_dict[sent] for sent in sentences2]

    score_names = ['cossim_accuracy','manhattan_accuracy','euclidean_accuracy','dot_accuracy']
    eval = pd.read_csv(f'./checkpoints/codereviewer-{fold_idx=}/eval/binary_classification_evaluation_results.csv')
    max_score_name = score_names[eval[score_names].max().argmax()]
    max_score_threshold = eval.iloc[eval[score_names].max(1).values.argmax()][f"{max_score_name}_threshold"]

    if max_score_name == 'cossim_accuracy':
        cosine_scores = 1 - paired_cosine_distances(embeddings1, embeddings2)
        pred = (cosine_scores>max_score_threshold) * 1
    elif max_score_name == 'manhattan_accuracy':
        manhattan_distances = paired_manhattan_distances(embeddings1, embeddings2)
        pred = (manhattan_distances<max_score_threshold) * 1
    elif max_score_name == 'euclidean_accuracy':
        euclidean_distances = paired_euclidean_distances(embeddings1, embeddings2)
        pred = (euclidean_distances<max_score_threshold) * 1
    elif max_score_name == 'dot_accuracy':
        embeddings1_np = np.asarray(embeddings1)
        embeddings2_np = np.asarray(embeddings2)
        dot_scores = [np.dot(embeddings1_np[i], embeddings2_np[i]) for i in range(len(embeddings1_np))]
        pred = (dot_scores>max_score_threshold) * 1
    else:
        raise ValueError
    preds.append(pred)
preds = np.array(preds)

In [None]:
train.head()
#code1_path, code2_path, code1, code2, similar

In [None]:
test

In [None]:
model = SentenceTransformer('flax-sentence-embeddings/st-codesearch-distilroberta-base')

In [None]:
model = SentenceTransformer('mixedbread-ai/mxbai-embed-large-v1')

In [None]:
model = SentenceTransformer('microsoft/codereviewer')