In [1]:
GITHUB_DIR = "aspects-space"

In [None]:
!pip install ipympl
!pip install transformers datasets evaluate
!pip install accelerate -U
!pip install pynvml
!pip install stanza
!pip install mplcursors

#download the model from my google drive
!gdown --folder --id "1-OIgf-F7lNsSkDv3fuqVN3tpCNv2LgiF"

In [None]:
repo_path = GITHUB_DIR
!git clone "https://github.com/katrinrohrb/aspects-space-dev.git" $repo_path

In [4]:
import sys
import os

module_path = os.path.abspath(os.path.join('/content', GITHUB_DIR))
sys.path.insert(0, module_path)

from katspace.core import RESULTS_DIR, DATA_DIR

Configuration file /content/aspects-space/katspace_config.json not found in /content


In [6]:
%matplotlib ipympl

import matplotlib.pyplot as plt
import numpy as np

import numpy as np; np.random.seed(1)
import matplotlib.pyplot as plt

from google.colab import output
output.enable_custom_widget_manager()

import torch

from pathlib import Path

import pandas as pd
import gc

from transformers import AutoTokenizer

from transformers import logging
from transformers import AutoConfig
from transformers import BertForSequenceClassification
from transformers.modeling_outputs import (
    SequenceClassifierOutput,
)

from pynvml import *

import mplcursors
import importlib
import numpy as np

from sklearn.decomposition import PCA
from sklearn.manifold import TSNE

import evaluate

accuracy = evaluate.load("accuracy")

import matplotlib.pyplot as plt
from matplotlib.colors import ListedColormap

import numpy as np
import seaborn as sns
sns.set_theme("paper")
sns.set_style("whitegrid")
from itertools import islice

import sys
import os

module_path = os.path.abspath(os.path.join('/content', GITHUB_DIR))
sys.path.insert(0, module_path)

from katspace.core import RESULTS_DIR, DATA_DIR

RESULTS_DIR = Path(RESULTS_DIR, "test")
TXT_DIR = Path(DATA_DIR, "txt")
MODEL_DIR = Path("checkpoint-286")

import logging
LOG_FILENAME = Path("session.log")
logging.basicConfig(filename=LOG_FILENAME, encoding='utf-8', level=logging.INFO, force=True)
logging.info("Start logging")

label2id = {"perceived_space": 0, "action_space": 1, "visual_space": 2, "descriptive_space":3, "no_space":4}
id2label = {v : k for k, v in label2id.items()}

In [7]:
def plot_space_types(results):
  if type(results) != list:
    results = [results]

  data = [outputs.pooled_output.cpu().detach().numpy() for outputs in results]
  clusters = [label2id[label] for outputs in results for label in outputs.results["label"]]

  data = np.concatenate(data)
  # Step 2: Dimensionality Reduction
  pca = PCA(n_components=50)  # Reducing to 50 dimensions using PCA
  data_reduced = pca.fit_transform(data)

  tsne = TSNE(n_components=2)  # Further reducing to 2D using t-SNE for visualization
  data_embedded = tsne.fit_transform(data_reduced)

  # Step 4: Visualization
  cmap = ListedColormap(list(sns.color_palette().as_hex()[0:5]))
  scatter = plt.scatter(data_embedded[:, 0], data_embedded[:, 1], c=clusters, cmap=cmap)

  handles, labels = scatter.legend_elements()
  custom_labels = ['perceived_space', 'action_space', 'visual_space', 'descriptive_space', 'no_space']

  plt.figure(figsize=(10, 8))
  plt.legend(handles=handles[:5], labels=custom_labels)
  plt.show()


def plot_annotations(results, orig_labels = None, hover = False, annotations = None, compare = "prediction"):

  if orig_labels is None:
    compare = "prediction"
  else:
    orig_clusters = [label2id[label] for label in orig_labels]

  data = [batch.pooled_output.cpu().detach().numpy() for batch in results]
  clusters = [label2id[label] for batch in results for label in batch.results["label"]]

  data = np.concatenate(data)

  pca = PCA(n_components=50)  # Reducing to 50 dimensions using PCA
  data_reduced = pca.fit_transform(data)

  tsne = TSNE(n_components=2)  # Further reducing to 2D using t-SNE for visualization
  data_embedded = tsne.fit_transform(data_reduced)

  # Step 4: Visualization
  #plt.figure(figsize=(10, 8))
  fig, ax = plt.subplots(figsize=(10, 8))
  cmap = ListedColormap(list(sns.color_palette().as_hex()[0:5]))
  if (compare == "prediction") | (compare == "both"):
      scatter = plt.scatter(data_embedded[:, 0], data_embedded[:, 1], c=clusters, cmap=cmap)
  if (compare == "gold") | (compare == "both"):
      plt.scatter(data_embedded[:, 0], data_embedded[:, 1], c=orig_clusters, cmap=cmap, s=8)

  if not (annotations is None):
    multiple = not hover
    mplcursors.cursor(ax, hover = hover, multiple = multiple).connect(
    "add", lambda sel: sel.annotation.set_text(annotations[sel.index]))

  handles, labels = scatter.legend_elements()
  custom_labels = ['perceived_space', 'action_space', 'visual_space', 'descriptive_space', 'no_space']


  plt.legend(handles=handles[:5], labels=custom_labels)
  plt.show()

def get_sents(filename):
  filename = Path(TXT_DIR, filename)

  with Path(filename).open(mode = "r", encoding = "utf8") as f:
    return f.readlines()


#rename to indicate it reads from a file
def batch_generator(filename, batch_size = 700):

  with Path(filename).open(mode = "r", encoding = "utf8") as f:
    end_of_file = False
    while not end_of_file:
      sentences = []
      for _ in range(batch_size):
        line = f.readline()
        if line == '':
          end_of_file = True
          break
        sentences.append(line)
      yield sentences


def batch_from_iterable(iterable, batch_size = 700):

    it = iter(iterable)
    while True:
        batch = list(islice(it, batch_size))
        if not batch:
            return
        yield batch


def kat_pipe(model = None, batch_generator = None, tokenizer = None, config = None, batch_size=700, do_postprocess = True):
  torch.cuda.empty_cache()

  model.eval()

  def process_single_batch(sentences):
      encoding = tokenizer(sentences, return_tensors="pt", padding = True, truncation = True)
      encoding = {k: v.to(model.device) for k,v in encoding.items()}
      output_batch = model(**encoding)
      gc.collect()
      torch.cuda.empty_cache()
      if do_postprocess:
        output_batch = postprocess(output_batch)
      return output_batch

  with torch.no_grad():
      yield from (process_single_batch(sentences) for sentences in batch_generator)

 # build own postprocess function, need to get labels again
def postprocess(model_outputs, top_k=1):

        logits = model_outputs["logits"]#[0]

        scores = torch.nn.functional.softmax(logits, dim = 1)

        if top_k == 1:
            model_outputs.results = {"label": [id2label[sent_scores.argmax().item()] for sent_scores in scores],
                               "score": [score.max().item() for score in scores]}
            return model_outputs
        else:
            dict_scores = [
                {"label": id2label[i], "score": score.item()} for i, score in enumerate(scores)
            ]
            dict_scores.sort(key=lambda x: x["score"], reverse=True)
            if top_k is not None:
                dict_scores = dict_scores[:top_k]
            model_outputs.results = dict_scores
            return model_outputs

from dataclasses import dataclass
@dataclass
class KatBertOutput(SequenceClassifierOutput):
    pooled_output : torch.Tensor = None
    results = None

class KatBert(BertForSequenceClassification):
    from typing import List, Optional, Tuple, Union
    from torch import nn
    from torch.nn import BCEWithLogitsLoss, CrossEntropyLoss, MSELoss
    from transformers.modeling_outputs import (
        SequenceClassifierOutput,
    )
    def forward(
        self,
        input_ids: Optional[torch.Tensor] = None,
        attention_mask: Optional[torch.Tensor] = None,
        token_type_ids: Optional[torch.Tensor] = None,
        position_ids: Optional[torch.Tensor] = None,
        head_mask: Optional[torch.Tensor] = None,
        inputs_embeds: Optional[torch.Tensor] = None,
        labels: Optional[torch.Tensor] = None,
        output_attentions: Optional[bool] = None,
        output_hidden_states: Optional[bool] = None,
        return_dict: Optional[bool] = None,
    ) -> Union[Tuple[torch.Tensor], SequenceClassifierOutput]:

        return_dict = return_dict if return_dict is not None else self.config.use_return_dict

        outputs = self.bert(
            input_ids,
            attention_mask=attention_mask,
            token_type_ids=token_type_ids,
            position_ids=position_ids,
            head_mask=head_mask,
            inputs_embeds=inputs_embeds,
            output_attentions=output_attentions,
            output_hidden_states=output_hidden_states,
            return_dict=return_dict,
        )

        pooled_output = outputs[1]

        pooled_output = self.dropout(pooled_output)
        logits = self.classifier(pooled_output)

        loss = None
        if labels is not None:
            if self.config.problem_type is None:
                if self.num_labels == 1:
                    self.config.problem_type = "regression"
                elif self.num_labels > 1 and (labels.dtype == torch.long or labels.dtype == torch.int):
                    self.config.problem_type = "single_label_classification"
                else:
                    self.config.problem_type = "multi_label_classification"

            if self.config.problem_type == "regression":
                loss_fct = MSELoss()
                if self.num_labels == 1:
                    loss = loss_fct(logits.squeeze(), labels.squeeze())
                else:
                    loss = loss_fct(logits, labels)
            elif self.config.problem_type == "single_label_classification":
                print("DEBUG: ", self.config.problem_type)
                loss_fct = CrossEntropyLoss()
                loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1))
            elif self.config.problem_type == "multi_label_classification":
                loss_fct = BCEWithLogitsLoss()
                loss = loss_fct(logits, labels)
        if not return_dict:
            output = (logits,) + outputs[2:]
            return ((loss,) + output) if loss is not None else output

        return KatBertOutput(
            loss=loss,
            logits=logits,
            pooled_output = pooled_output,
            hidden_states=outputs.hidden_states,
            attentions=outputs.attentions,
        )


In [8]:
cursor = mplcursors.cursor(hover=mplcursors.HoverMode.Transient)
@cursor.connect("add")
def on_add(sel):
    x, y, width, height = sel.artist[sel.index].get_bbox().bounds
    sel.annotation.set(text=f"{x+width/2}: {height}",
                       position=(0, 20), anncoords="offset points")
    sel.annotation.xy = (x + width / 2, y + height)

In [None]:
checkpoint = MODEL_DIR

config = AutoConfig.from_pretrained(checkpoint, label2id=label2id, id2label=id2label)

tokenizer = AutoTokenizer.from_pretrained("dbmdz/bert-base-german-cased")

transformers_module = importlib.import_module("transformers")

model_class = getattr(sys.modules[__name__], "KatBert")
model = model_class.from_pretrained(checkpoint, config = config, device_map = "cuda")

In [10]:
#rename this function to results_from_file_batched
def get_heads(filename):

  filename = Path(TXT_DIR, filename)
  iterator = kat_pipe(model = model , tokenizer = tokenizer, config = config,
                      batch_generator = batch_generator(filename, batch_size = 700), batch_size=700, do_postprocess = True)

  return [result for result in iterator]

def heads_from_iterable(iterable, batch_size = 700):

  iterator = kat_pipe(model = model , tokenizer = tokenizer, config = config,
                      batch_generator = batch_from_iterable(iterable, batch_size = batch_size), batch_size=batch_size, do_postprocess = True)

  return [result for result in iterator]

In [None]:
!wget https://raw.githubusercontent.com/katrinrohrb/aspects-space-dev/refs/heads/colabtest/data/annotations.csv annotations.csv

In [None]:
ds = pd.read_csv(Path("annotations.csv"))
ds

In [None]:
dirname = TXT_DIR.as_posix()
filelist = ["Adalbert_Stifter_-_Der_Nachsommer.txt", "Johann_Wolfgang_von_Goethe_-_Die_Leiden_des_jungen_Werther.txt",
            "Heinrich_von_Kleist_-_Michael_Kohlhaas.txt", "Franz_Kafka_-_Der_Prozeß.txt"]
!mkdir $dirname
for txt in filelist:
  !wget --directory-prefix={dirname} https://raw.githubusercontent.com/katrinrohrb/aspects-space-dev/refs/heads/colabtest/data/txt/{txt}
!ls -l $dirname

In [27]:
filename = "Heinrich_von_Kleist_-_Michael_Kohlhaas.txt"
results_kleist = get_heads(filename)

In [None]:
plot_annotations(results_kleist, annotations = get_sents(filename), hover=False)