In [1]:
from google.colab import drive
drive.mount('/content/drive/')

import os
os.environ['CUDA_LAUNCH_BLOCKING'] = "1"
import torch
import logging
logging.basicConfig(level=logging.ERROR)
# If there's a GPU available...
if torch.cuda.is_available():

    # Tell PyTorch to use the GPU.
    device = torch.device("cuda")

    print('There are %d GPU(s) available.' % torch.cuda.device_count())

    print('We will use the GPU:', torch.cuda.get_device_name(0))

# If not...
else:
    print('No GPU available, using the CPU instead.')
    device = torch.device("cpu")




Mounted at /content/drive/
There are 1 GPU(s) available.
We will use the GPU: Tesla T4


In [2]:
!pip install --upgrade pip

!pip install torch
!pip install allennlp
!pip install tqdm pandas beautifulsoup4 requests


Collecting pip
  Downloading pip-24.0-py3-none-any.whl (2.1 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.1/2.1 MB[0m [31m7.1 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: pip
  Attempting uninstall: pip
    Found existing installation: pip 23.1.2
    Uninstalling pip-23.1.2:
      Successfully uninstalled pip-23.1.2
Successfully installed pip-24.0
Collecting nvidia-cuda-nvrtc-cu12==12.1.105 (from torch)
  Downloading nvidia_cuda_nvrtc_cu12-12.1.105-py3-none-manylinux1_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.1.105 (from torch)
  Downloading nvidia_cuda_runtime_cu12-12.1.105-py3-none-manylinux1_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.1.105 (from torch)
  Downloading nvidia_cuda_cupti_cu12-12.1.105-py3-none-manylinux1_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==8.9.2.26 (from torch)
  Downloading nvidia_cudnn_cu12-8.9.2.26-py3-none-manylinux1_x86_64.whl.metadata (1.6 kB)


[0m^C


In [3]:
# Change directory to the target folder
%cd /content/drive/MyDrive/TUDelft/NLP Group Project/claimDecomp


/content/drive/.shortcut-targets-by-id/1ddSDwiHDKf6ffm9NsuyXSK6uep9cGT6k/NLP Group Project/claimDecomp


In [4]:
# # Execute the bash script
# !bash scripts/download_data.sh

In [5]:
# %cd /content/drive/MyDrive/TUDelft/NLP Group Project/claimDecomp/scripts
# !ls
# # !bash run_nli_models.sh

In [None]:
from google.colab import drive

# Step 1: Mount Google Drive
drive.mount('/content/drive')

# Step 2: Change directory to the target folder
%cd /content/drive/MyDrive/TUDelft/NLP Group Project/claimDecomp/scripts

# Step 3: Install the compatible versions of torch and allennlp


# Step 4: List the contents to verify the script is present
!ls

# # Step 5: Run the Python script directly
# !python3 run_nli.py


In [None]:
import numpy
from overrides import overrides
from typing import List, Dict
from allennlp.common.util import JsonDict
from allennlp.data import Instance
from allennlp.predictors.predictor import Predictor
from allennlp.data.fields import LabelField


@Predictor.register("doc_nli")
class DocNliPredictor(Predictor):

    def predict(self, premise: str, hypothesis: str,
                answer_score: float = None) -> JsonDict:
        instance = self._dataset_reader.text_to_instance(
            premise,
            hypothesis,
            answer_score
        )
        if instance:
            return self.predict_instance(instance)
        else:
            return None

    def predict_batch(self, premises: List[str], hypothesises: List[str],
                      answer_scores: List[float] = None):
        instances = []
        if answer_scores is None:
            for premise, hypothesis in zip(premises, hypothesises):
                instance = self._dataset_reader.text_to_instance(
                    premise,
                    hypothesis
                )
                if instance:
                    instances.append(instance)
        else:
            for premise, hypothesis, answer_score in zip(premises,
                                                         hypothesises,
                                                         answer_scores
                                                         ):
                instance = self._dataset_reader.text_to_instance(
                    premise,
                    hypothesis,
                    answer_score=answer_score
                )
                if instance:
                    instances.append(instance)
        outputs = self.predict_batch_instance(instances)
        return outputs

    @overrides
    def predictions_to_labeled_instances(
            self, instance: Instance, outputs: Dict[str, numpy.ndarray]
    ) -> List[Instance]:
        # This function is used to to compute gradients of what the model predicted.
        new_instance = instance.duplicate()
        label = numpy.argmax(outputs["logits"])
        # Skip indexing, we have integer representations of the strings "entailment", etc.
        new_instance.add_field("label",
                               LabelField(int(label), skip_indexing=True))
        return [new_instance]

    @overrides
    def _json_to_instance(self, json_dict: JsonDict) -> Instance:
        premise = json_dict["premise"]
        hypothesis = json_dict["hypothesis"]
        instance = self._dataset_reader.text_to_instance(
            premise,
            hypothesis
        )
        return instance


In [None]:
from typing import Dict, Optional
import json
import logging
from overrides import overrides

from allennlp.common.file_utils import cached_path
from allennlp.common.util import pad_sequence_to_length
from allennlp.data.dataset_readers.dataset_reader import DatasetReader
from allennlp.data.fields import Field, TextField, LabelField, MetadataField, ArrayField
from allennlp.data.instance import Instance
from allennlp.data.token_indexers import SingleIdTokenIndexer, TokenIndexer
from allennlp.data.tokenizers import Tokenizer, SpacyTokenizer, PretrainedTransformerTokenizer

logger = logging.getLogger(__name__)


@DatasetReader.register("doc_nli")
class DocNliReader(DatasetReader):

    def __init__(
        self,
        tokenizer: Optional[Tokenizer] = None,
        token_indexers: Dict[str, TokenIndexer] = None,
        max_source_length: Optional[int] = 512,
        **kwargs,
    ) -> None:
        super().__init__(manual_distributed_sharding=True, **kwargs)
        self._tokenizer = tokenizer or SpacyTokenizer()
        if isinstance(self._tokenizer, PretrainedTransformerTokenizer):
            assert not self._tokenizer._add_special_tokens
        self._token_indexers = token_indexers or {"tokens": SingleIdTokenIndexer()}
        self.max_source_length = max_source_length

    @overrides
    def _read(self, file_path: str):
        file_path = cached_path(file_path)

        with open(file_path, "r") as doc_nli_file:
            doc_nli_examples = json.load(doc_nli_file)
            count = 0
            for example in doc_nli_examples:
                label = "entail" if example["label"] == 'entailment' else "not_entail"
                # using the whole paragraph as premise or just the answering sent
                premise = example['premise']
                count += 1
                # if self.joint_training and count == 1000:
                #     break
                hypothesis = example["hypothesis"]
                instance = self.text_to_instance(premise,
                                                 hypothesis,
                                                 label,
                                                 )
                if instance:
                    yield instance

    @overrides
    def text_to_instance(
        self,  # type: ignore
        premise: str,
        hypothesis: str,
        label: str = None,
        answer_score: float = None
    ) -> Instance:
        fields: Dict[str, Field] = {}
        premise = self._tokenizer.tokenize(premise)
        hypothesis = self._tokenizer.tokenize(hypothesis)
        tokens = self._tokenizer.add_special_tokens(premise, hypothesis)

        if len(tokens) > self.max_source_length:
            tokens = tokens[:self.max_source_length]
        fields["tokens"] = TextField(tokens, self._token_indexers)

        metadata = {
            "premise_tokens": [x.text for x in premise],
            "hypothesis_tokens": [x.text for x in hypothesis],
        }
        fields["metadata"] = MetadataField(metadata)

        if label:
            fields["label"] = LabelField(label)

        return Instance(fields)


In [None]:
from typing import Dict, Optional

from overrides import overrides
import torch

from allennlp.data import TextFieldTensors, Vocabulary
from allennlp.models.model import Model
from allennlp.modules import FeedForward, Seq2SeqEncoder, Seq2VecEncoder, TextFieldEmbedder
from allennlp.nn import InitializerApplicator, util, Activation
from allennlp.nn.util import get_text_field_mask
from allennlp.training.metrics import CategoricalAccuracy, F1Measure
from allennlp.common import Params


@Model.register("qa_nli_classifier")
class QaNliClassifier(Model):
    """
    This `Model` implements a basic text classifier. After embedding the text into
    a text field, we will optionally encode the embeddings with a `Seq2SeqEncoder`. The
    resulting sequence is pooled using a `Seq2VecEncoder` and then passed to
    a linear classification layer, which projects into the label space. If a
    `Seq2SeqEncoder` is not provided, we will pass the embedded text directly to the
    `Seq2VecEncoder`.
    Registered as a `Model` with name "basic_classifier".
    # Parameters
    vocab : `Vocabulary`
    text_field_embedder : `TextFieldEmbedder`
        Used to embed the input text into a `TextField`
    seq2seq_encoder : `Seq2SeqEncoder`, optional (default=`None`)
        Optional Seq2Seq encoder layer for the input text.
    seq2vec_encoder : `Seq2VecEncoder`
        Required Seq2Vec encoder layer. If `seq2seq_encoder` is provided, this encoder
        will pool its output. Otherwise, this encoder will operate directly on the output
        of the `text_field_embedder`.
    feedforward : `FeedForward`, optional, (default = `None`)
        An optional feedforward layer to apply after the seq2vec_encoder.
    dropout : `float`, optional (default = `None`)
        Dropout percentage to use.
    num_labels : `int`, optional (default = `None`)
        Number of labels to project to in classification layer. By default, the classification layer will
        project to the size of the vocabulary namespace corresponding to labels.
    label_namespace : `str`, optional (default = `"labels"`)
        Vocabulary namespace corresponding to labels. By default, we use the "labels" namespace.
    initializer : `InitializerApplicator`, optional (default=`InitializerApplicator()`)
        If provided, will be used to initialize the model parameters.
    """

    def __init__(
        self,
        vocab: Vocabulary,
        text_field_embedder: TextFieldEmbedder,
        seq2vec_encoder: Seq2VecEncoder,
        seq2seq_encoder: Seq2SeqEncoder = None,
        feedforward: Optional[FeedForward] = None,
        dropout: float = None,
        num_labels: int = None,
        label_namespace: str = "labels",
        namespace: str = "tokens",
        initializer: Optional[InitializerApplicator] = None,
        use_answer_score: Optional[bool] = None,
        **kwargs,
    ) -> None:

        super().__init__(vocab, **kwargs)
        self._text_field_embedder = text_field_embedder
        self._seq2seq_encoder = seq2seq_encoder
        self._seq2vec_encoder = seq2vec_encoder

        self._use_answer_score = use_answer_score

        params = Params({
            'input_dim': 1025 if use_answer_score else 1024,
            'hidden_dims': 1025 if use_answer_score else 1024,
            'activations': 'tanh',
            'num_layers': 1
        })

        self._feedforward = FeedForward.from_params(params)

        if feedforward is not None:
            self._classifier_input_dim = feedforward.get_output_dim()
        else:
            self._classifier_input_dim = self._seq2vec_encoder.get_output_dim()
        if self._use_answer_score:
            self._classifier_input_dim += 1


        if dropout:
            self._dropout = torch.nn.Dropout(dropout)
        else:
            self._dropout = None
        self._label_namespace = label_namespace
        self._namespace = namespace

        if num_labels:
            self._num_labels = num_labels
        else:
            self._num_labels = vocab.get_vocab_size(namespace=self._label_namespace)

        self._classification_layer = torch.nn.Linear(self._classifier_input_dim, self._num_labels)
        self._accuracy = CategoricalAccuracy()
        self._f1 = F1Measure(positive_label=0)
        self._loss = torch.nn.CrossEntropyLoss()
        if initializer is not None:
            initializer(self)

    def forward(  # type: ignore
        self,
        tokens: TextFieldTensors,
        label: torch.IntTensor = None,
        answer_scores: torch.FloatTensor = None,
        metadata: Dict = None,
    ) -> Dict[str, torch.Tensor]:

        """
        # Parameters
        tokens : `TextFieldTensors`
            From a `TextField`
        label : `torch.IntTensor`, optional (default = `None`)
            From a `LabelField`
        # Returns
        An output dictionary consisting of:
            - `logits` (`torch.FloatTensor`) :
                A tensor of shape `(batch_size, num_labels)` representing
                unnormalized log probabilities of the label.
            - `probs` (`torch.FloatTensor`) :
                A tensor of shape `(batch_size, num_labels)` representing
                probabilities of the label.
            - `loss` : (`torch.FloatTensor`, optional) :
                A scalar loss to be optimised.
        """

        embedded_text = self._text_field_embedder(tokens)
        mask = get_text_field_mask(tokens)
        if self._seq2seq_encoder:
            embedded_text = self._seq2seq_encoder(embedded_text, mask=mask)

        embedded_text = self._seq2vec_encoder(embedded_text, mask=mask)

        if self._dropout:
            embedded_text = self._dropout(embedded_text)

        if self._use_answer_score:
            answer_scores = torch.unsqueeze(answer_scores, 1)
            embedded_text = torch.cat([embedded_text, answer_scores],
                                      dim=-1)

        if self._feedforward is not None:
            embedded_text = self._feedforward(embedded_text)

        logits = self._classification_layer(embedded_text)
        probs = torch.nn.functional.softmax(logits, dim=-1)

        output_dict = {"logits": logits, "probs": probs,
                       "token_ids": util.get_token_ids_from_text_field_tensors(
                           tokens)}
        self.make_output_human_readable(output_dict)
        if label is not None:
            loss = self._loss(logits, label.long().view(-1))
            output_dict["loss"] = loss
            self._accuracy(logits, label)
            self._f1(logits, label)

        return output_dict

    @overrides
    def make_output_human_readable(
        self, output_dict: Dict[str, torch.Tensor]
    ) -> Dict[str, torch.Tensor]:
        """
        Does a simple argmax over the probabilities, converts index to string label, and
        add `"label"` key to the dictionary with the result.
        """
        predictions = output_dict["probs"]
        if predictions.dim() == 2:
            predictions_list = [predictions[i] for i in range(predictions.shape[0])]
        else:
            predictions_list = [predictions]
        classes = []
        for prediction in predictions_list:
            label_idx = prediction.argmax(dim=-1).item()
            label_str = self.vocab.get_index_to_token_vocabulary(self._label_namespace).get(
                label_idx, str(label_idx)
            )
            classes.append(label_str)
        output_dict["label"] = classes
        tokens = []
        for instance_tokens in output_dict["token_ids"]:
            tokens.append(
                [
                    self.vocab.get_token_from_index(token_id.item(), namespace=self._namespace)
                    for token_id in instance_tokens
                ]
            )
        output_dict["tokens"] = tokens
        return output_dict

    def get_metrics(self, reset: bool = False) -> Dict[str, float]:
        acc = self._accuracy.get_metric(reset)
        f1_metric = self._f1.get_metric(reset)
        metrics = {"accuracy": acc,
                   "precision": f1_metric['precision'],
                   "recall": f1_metric['recall'],
                   "f1": f1_metric['f1']}
        return metrics

    default_predictor = "text_classifier"

In [None]:
import argparse
from allennlp.common.util import import_module_and_submodules
from allennlp.predictors.predictor import Predictor


def main(args):
    if args.model == "nq-nli":
        model_path = "./nli_models/nq-nli.tar.gz"
        model_name = "qa_nli"
    elif args.model == 'mnli':
        model_path = "./nli_models/mnli.tar.gz"
        model_name = "textual_entailment"
    elif args.model == 'doc-nli':
        model_path = "./nli_models/doc-nli.tar.gz"
        model_name = "qa_nli"
    else:
        raise ValueError('no model named {}'.format(args.model))

    predictor = Predictor.from_path(
        model_path,
        model_name,
        cuda_device=0)

    premise = "The jobless rate for Hispanics hit a record low of 3.9% in" \
              " September, while African Americans maintained its lowest rate " \
              "ever, 5.5%."
    hypothesis = "The unemployment rate of African Americans is historically low."

    results = predictor.predict(
        premise=premise,
        hypothesis=hypothesis
    )
    print(results)


if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--model', type=str, default='doc_nli')
    parsed_args = parser.parse_args()
    import_module_and_submodules("predictors")
    import_module_and_submodules("dataset_reader")
    import_module_and_submodules("model")
    main(parsed_args)
