In [None]:
!pip install -U sentence-transformers

In [18]:
import numpy as np
import copy
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
from typing import List
import bisect
import torch
import torch.optim as optim
import logging
import numpy as np
from tqdm import tqdm
from sentence_transformers import SentenceTransformer, models, losses, datasets, evaluation
from sentence_transformers.readers import InputExample
from sklearn.neighbors import KNeighborsClassifier
from annoy import AnnoyIndex
import random
from flask import Flask

## Download and format data

In [3]:
newsgroups_train = fetch_20newsgroups(subset='train')
newsgroups_test = fetch_20newsgroups(subset='test')

In [12]:
print(newsgroups_train.data[11])
print(newsgroups_train.target[11])
print(newsgroups_train.target_names)

From: david@terminus.ericsson.se (David Bold)
Subject: Re: Question for those with popular morality
Reply-To: david@terminus.ericsson.se
Distribution: world
Organization: Camtec Electronics (Ericsson), Leicester, England
Lines: 77
Nntp-Posting-Host: bangkok

In article 17570@freenet.carleton.ca, ad354@Freenet.carleton.ca (James Owens) writes:
>
>In a previous article, david@terminus.ericsson.se (David Bold) says:
>
>>
>>I don't mean to be rude, but I think that you've got hold of the wrong
>>end of a different stick...
>>
>>David
>
>I had a look at your posting again and I see what you mean!  I was so
>intent on explaining how Jung thought we could be more moral than God that
>I overlooked your main line of thought.
>
>You seem to be saying that, God being unknowable, His morality is unknowable.

Yep, that's pretty much it. I'm not a Jew but I understand that this is the
Jewish way of thinking. However, the Jews believe that the Covenant between
YHWH and the Patriarchs (Abraham and Mos

In [13]:
train_examples = []
for i in range(newsgroups_train.filenames.shape[0]):
  train_examples.append(InputExample(texts=newsgroups_train.data[i], label=newsgroups_train.target[i]))
  # train_examples.append(InputExample(texts=newsgroups_train.data[i]))

test_examples = []
for i in range(newsgroups_test.filenames.shape[0]):
  test_examples.append(InputExample(texts=newsgroups_test.data[i], label=newsgroups_test.target[i]))
  # test_examples.append(InputExample(texts=newsgroups_test.data[i]))

In [None]:
from pprint import pprint

pprint(list(newsgroups_train.target_names))

['alt.atheism',
 'comp.graphics',
 'comp.os.ms-windows.misc',
 'comp.sys.ibm.pc.hardware',
 'comp.sys.mac.hardware',
 'comp.windows.x',
 'misc.forsale',
 'rec.autos',
 'rec.motorcycles',
 'rec.sport.baseball',
 'rec.sport.hockey',
 'sci.crypt',
 'sci.electronics',
 'sci.med',
 'sci.space',
 'soc.religion.christian',
 'talk.politics.guns',
 'talk.politics.mideast',
 'talk.politics.misc',
 'talk.religion.misc']


## Create Triplet dataset (Modified from SentenceLabelDataset)

In [14]:
class NewsgroupsDataset(Dataset):
    """
    Dataset for training with triplet loss.
    This dataset takes a list of sentences grouped by their label and uses this grouping to dynamically select a
    positive example from the same group and a negative example from the other domain for a selected anchor sentence.
    """

    def __init__(self, examples: List[InputExample], provide_positive: bool = True,
                 provide_negative: bool = True):
        """
        Converts input examples to a SentenceLabelDataset usable to train the model with
        SentenceTransformer.smart_batching_collate as the collate_fn for the DataLoader
        Assumes only one sentence per InputExample and labels as integers from 0 to max_num_labels
        and should be used in combination with dataset_reader.LabelSentenceReader.
        Labels with only one example are ignored.
        smart_batching_collate as collate_fn is required because it transforms the tokenized texts to the tensors.
        :param examples:
            the input examples for the training
        :param model
            the Sentence BERT model for the conversion
        :param provide_positive:
            set this to False, if you don't need a positive example (e.g. for BATCH_HARD_TRIPLET_LOSS).
        :param provide_negative:
            set this to False, if you don't need a negative example (e.g. for BATCH_HARD_TRIPLET_LOSS
            or MULTIPLE_NEGATIVES_RANKING_LOSS).
        """
        self.input = examples
        
        self.grouped_inputs = {}
        self.grouped_domain = {}
        self.example_length = examples.filenames.shape[0]

        self.convert_input_examples(examples)

        self.provide_positive = provide_positive
        self.provide_negative = provide_negative


    def convert_input_examples(self, examples: List[InputExample]):
        """
        Converts input examples to a SentenceLabelDataset.
        Create the dictionary for postive and negative
        :param examples:
            the input examples for the training
        """
        all_domains = examples.target_names
        # Group examples
        # Add examples with the same label to the same dict

        for ex_index in range(self.example_length):

            if examples.target[ex_index] in self.grouped_inputs:
                self.grouped_inputs[examples.target[ex_index]].append(ex_index)
            else:
                self.grouped_inputs[examples.target[ex_index]] = [ex_index]

        # Group examples
        # Add examples with the same first class domain to the same dict
        
        for ex_index in range(self.example_length):
            first_domain = all_domains[examples.target[ex_index]].split('.')[0]
            if first_domain in self.grouped_domain:
                self.grouped_domain[first_domain].append(ex_index)
            else:
                self.grouped_domain[first_domain] = [ex_index]


    def __getitem__(self, item):
        if not self.provide_positive and not self.provide_negative:
            return [self.input.data[item]], self.input.data[item]

        # Anchor element
        anchor = self.input.data[item]

        # Get positive from the same group
        if self.provide_positive:
            group_exclusive = copy.deepcopy(self.grouped_inputs[self.input.target[item]])
            group_exclusive.remove(item)
            positive_item_idx = np.random.choice(group_exclusive, 1)
            positive = self.input.data[positive_item_idx[0]]
        else:
            positive = ''
        # Get negative from the random choice of another first class domain
        if self.provide_negative:
            group_domain_index = self.input.target[item]
            group_domain = self.input.target_names[group_domain_index].split('.')[0]

            distinct_domains = list(self.grouped_domain.keys())
            distinct_domains.remove(group_domain)
            group_choice = np.random.choice(distinct_domains, 1)
            negative_item_idx = np.random.choice(self.grouped_domain[group_choice[0]], 1)
            negative = self.input.data[negative_item_idx[0]]
        else:
            negative = ''

        return InputExample(texts=[anchor, positive, negative])


    def __len__(self):
        return self.example_length

In [15]:
model = SentenceTransformer('distilbert-base-nli-mean-tokens')

In [26]:
print(model)

SentenceTransformer(
  (0): Transformer({'max_seq_length': 128, 'do_lower_case': False}) with Transformer model: DistilBertModel 
  (1): Pooling({'word_embedding_dimension': 768, 'pooling_mode_cls_token': False, 'pooling_mode_mean_tokens': True, 'pooling_mode_max_tokens': False, 'pooling_mode_mean_sqrt_len_tokens': False})
)


In [16]:
dataset_train = NewsgroupsDataset(newsgroups_train)
dataset_test = NewsgroupsDataset(newsgroups_test)

dataloader_train = DataLoader(dataset_train, batch_size=16)
dataloader_test = DataLoader(dataset_test, batch_size=16)

In [25]:
train_loss = losses.TripletLoss(model=model)

anchor_list = []
postive_list = []
negative_list = []
for i in dataset_train:
    anchor_list.append(i.texts[0])
    postive_list.append(i.texts[1])
    negative_list.append(i.texts[2])

evaluator = evaluation.TripletEvaluator(anchor_list, postive_list, negative_list)

model.fit(train_objectives=[(dataloader_train, train_loss)], 
          epochs=1, 
          warmup_steps=100, 
          evaluator=evaluator, 
          evaluation_steps=500,
          output_path="./mymodel")

Epoch:   0%|          | 0/1 [00:00<?, ?it/s]

Iteration:   0%|          | 0/708 [00:00<?, ?it/s]

## Prediction model

In [13]:
def embedding(dataset, path):
    model = SentenceTransformer(path)
    embeddings = model.encode(dataset)
    print(len(embeddings))
    return embeddings

In [14]:
def annoy_index(embeddings):
    vector_length = embeddings[0].shape[0]
    num_example = len(embeddings)

    tree = AnnoyIndex(vector_length, 'angular')  # Length of item vector that will be indexed
    for i, vector in zip(range(num_example), embeddings):
        tree.add_item(i, vector)

    tree.build(100)

    index = []
    for vector in embeddings:
        index.append(tree.get_nns_by_vector(vector, 100))

    return index

In [15]:
def predict(x_train, y_train):
    knn = KNeighborsClassifier(n_neighbors=5)
    knn.fit(x_train, y_train)

    return knn

In [24]:
def pipeline(dataset, path, num_dataset: int = None, str_to_classify: str = None):
    """
    param dataset:
        dataset with label to train and get predictions
    
    param num_dataset:
        The index number of the dataset that is to be predicted its label
        if None, it will return the accuracy of the model

    param str_to_classify:
        The string that is to be predicted

    """
    if str_to_classify:
        dataset.data.append(str_to_classify)
        embeddings = embedding(dataset.data, path)
    else:
        embeddings = embedding(dataset.data, path)

    index = annoy_index(embeddings)

    if str_to_classify:
        if len(index)-1 in index[-1]:
            index[-1].remove(len(index)-1)
        knn = predict([embeddings[j] for j in index[-1]], [dataset.target[j] for j in index[-1]])
        prediction = knn.predict([embeddings[-1]])
        print(prediction)
        return prediction

    x_train_group = []
    y_train_group = []

    for i in index:
        x_train_group.append([embeddings[j] for j in i])
        y_train_group.append([dataset.target[j] for j in i])

    x_train_group = np.array(x_train_group)
    y_train_group = np.array(y_train_group)

    if type(num_dataset) == int:
        knn = predict(x_train_group[num_dataset], y_train_group[num_dataset])
        prediction = knn.predict([embeddings[num_dataset]])
        print(prediction)
        return prediction
    else:
        y_predict = []
        for i in range(x_train_group.shape[0]):
            knn = predict(x_train_group[i], y_train_group[i])
            y_predict.append(knn.predict([embeddings[i]]))

        num_prediction = 0
        for i in range(x_train_group.shape[0]):
            if y_predict[i] == dataset.target[i]:
                num_prediction += 1

        score = num_prediction / x_train_group.shape[0]
        print(score)
        return score

## Compare the result with pretrained model

In [112]:
pipeline(fetch_20newsgroups(subset='train'), "./mymodel")
pipeline(fetch_20newsgroups(subset='train'), "distilbert-base-nli-mean-tokens")

11314
0.8928760827293618
11314
0.7601202050556832


0.7601202050556832

## Predict given text

In [19]:
str_to_classify = 'I want to be elected as a president of France'
pipeline(dataset=fetch_20newsgroups(subset='train'), path="./mymodel", str_to_classify=str_to_classify)

11315
[19]


array([19])

## Predict given text from the dataset

In [25]:
pipeline(dataset=fetch_20newsgroups(subset='train'), path="./mymodel", num_dataset=0)
print(fetch_20newsgroups(subset='train').target[0])

11314
[7]
7
