# Example: Creating Custom Pipeline Components

**Requirements**: The mexca package must be installed with the components `spe`, `tra`, and `sen`. This can be done with the command `!pip install mexca[spe,tra,sen]`.

In this notebook, we will create a new customized component for mexca. All five components in the MEXCA pipeline can be replaced by customized components as long as they have an `apply` method which receives the same input and produces the same output as the standard components.

In this example, we will replace the sentiment extraction component with a new component that predicts topic probabilities for each transcribed sentence. For the topic prediction, we use a pretrained [DeBERTa](https://huggingface.co/docs/transformers/v4.34.1/en/model_doc/deberta#overview) cross-encoder model for natural language inference provided by the SentenceTransformers package on [Hugging Face Hub](https://huggingface.co/cross-encoder/nli-deberta-base). The model predicts topic probabilities via zero-shot classification which is explained in more detail in this [blog post](https://joeddav.github.io/blog/2020/05/29/ZSL.html). It is implemented in PyTorch using the transformers package.

## Creating the Custom Component

First, let's import the modules and classes that we need for the new component.

In [1]:
import inspect
import os
from typing import Any, Dict, List, Optional

from tqdm import tqdm
from intervaltree import IntervalTree, Interval
from IPython.display import Video
from urllib.request import urlopen
from mexca.audio import SpeakerIdentifier
from mexca.data import (
    BaseData,
    AudioTranscription,
    SentimentAnnotation,
    TranscriptionData,
)
from mexca.pipeline import Pipeline
from mexca.text import AudioTranscriber, SentimentExtractor
from transformers import (
    AutoModelForSequenceClassification,
    DebertaForSequenceClassification,
)
import torch

Before we actually create the new component, let's dive into the architecture of the `SentimentExtractor` component that we want to replace. As all components, it has an `apply` method which takes an `AudioTranscription` object as input and returns an `SentimentAnnotation` as output.

In [2]:
inspect.signature(SentimentExtractor.apply)

<Signature (self, transcription: mexca.data.AudioTranscription, show_progress: bool = True) -> mexca.data.SentimentAnnotation>

Our new topic extraction component will also work with audio transcription input, so we can leave the input as it is. However, it will return a different output, namely, an annotation with topic probabilities for each sentence. For our new component, we will create a new subclass from `SentimentAnnotation` to store topic probabilities. The `SentimentAnnotation` class stores data in an `IntervalTree` object which contains `Interval` objects. In our case, each `Interval` object stores data about one sentence, including our topic probabilities. Let's create a new class for storing topic data for a single sentence.

In [3]:
class CustomTopicData(BaseData):
    """Store topic probabilities for a single sentence.

    Parameters
    ----------
    text: str
        Sentence text.
    topics: dict
        Probabilities (values) for each topic (keys).

    """

    text: str
    topics: Dict[str, float]

Now, we create a subclass of `SentimentAnnotation` which stores the data for all sentences. `SentimentAnnotation` has the properties `data_type` (indicating which data class is stored in the segments) and `serialization_name` (used for automatic JSON serialization) which we override in our custom subclass.

In [4]:
class CustomTopicAnnotation(SentimentAnnotation):
    """Store topic annotations for transcribed sentences.

    Parameters
    ----------
    filename: pydantic.FilePath
        Path to the transcribed file. Must point to a valid file.
    segments: intervaltree.Intervaltree
        Interval tree containing segments with topic data for all sentences.

    """

    # Override abstract properties
    @property
    def data_type(self) -> Any:
        CustomTopicData

    @property
    def serialization_name(self) -> str:
        return "topic_annotation"

We have now completely defined the output of our topic extractor component. In the next step, we create the new component itself. We create the `CustomTopicExtractor` class as a subclass of the `SentimentExtractor` class to inherit its methods and properties. First, override the constructor method (i.e., `__init__`) and add a new argument `topic` to it. This way, we can specify for which topics the component should predict probabilities. Second, we override the `classifier` property to load the pretrained DeBERTa model. Our implementation uses lazy initialization, so the model is only loaded into memory, when we access it for the first time. This can save memory and improve performance.

N.B.: mexca uses lazy initialization for all standard components to avoid loading the pretrained models into memory at the same time. This reduces the occurrence of runtime errors because not enough working memory is available and improves performance.

In [5]:
class CustomTopicExtractor(SentimentExtractor):
    def __init__(
        self,
        topics: List[str],
        model_name: Optional[str] = None,
        device: Optional[torch.device] = None,
    ):
        super().__init__(model_name, device)
        self.topics = topics

    @property
    def classifier(self) -> DebertaForSequenceClassification:
        """The pretrained sequence classification model for topic prediction.
        Loaded automatically from `model_name`.
        """
        if not self._classifier:
            self._classifier = (
                AutoModelForSequenceClassification.from_pretrained(
                    self.model_name,
                ).to(self.device)
            )

        return self._classifier

    def apply(
        self, transcription: AudioTranscription, show_progress: bool = True
    ) -> CustomTopicAnnotation:
        """Extract topic probabilities from text.

        Iterates over the sentences in the audio transcription and predicts topic probabilities.

        Parameters
        ----------
        transcription: AudioTranscription
            The transcription of the speech segments in the audio fie split into sentences.
            Returned by `AudioTranscriber`.
        show_progress: bool, optional, default=True
            Whether a progress bar is displayed or not.

        Returns
        -------
        CustomTopicAnnotation
            An data class object with the topic probabilities
            for each sentence.

        """

        # Create output object with empty interval tree
        topic_annotation = CustomTopicAnnotation(
            filename=transcription.filename, segments=IntervalTree()
        )

        # Interate over sentences (this could be optimized by batching sentences)
        for sent in tqdm(
            transcription.segments,
            total=len(transcription.segments),
            disable=not show_progress,
        ):
            # Transform text and topics into tokens
            tokens = self.tokenizer(
                [sent.data.text for _ in self.topics],
                self.topics,
                padding=True,
                return_tensors="pt",
            ).to(self.device)
            # Get model predictions
            output = self.classifier(**tokens)
            # Transform logits to probabilities (scores)
            logits = output.logits.detach().cpu()
            # Omit neutral scores (dim 2) and only take contradiction (dim 0) and entailment (dim 1) scores
            scores = logits[:, [0, 1]].softmax(dim=1)
            # Add probabilities and topics to output
            topic_annotation.segments.add(
                Interval(
                    begin=sent.begin,
                    end=sent.end,
                    # Use custom data class
                    data=CustomTopicData(
                        text=sent.data.text,
                        topics={
                            key: val
                            for (key, val) in zip(self.topics, scores[:, 1])
                        },
                    ),
                )
            )

        return topic_annotation

The custom topic extraction component is now complete. In the next section, we will apply it to two examples.

## Applying the Custom Component

### Test Example

Let's apply our new `CustomTopicExtractor` component to a test case. We first create an instance of the component class with three topics: Trade, Justice, and Migration. We also specify the name of the pretrained model on Hugging Face Hub. 

In [6]:
topics = ["trade", "justice", "migration"]

extractor = CustomTopicExtractor(
    topics=topics,
    model_name="cross-encoder/nli-deberta-base",
)

Then, we create some test input data. We construct an interval tree with intervals for two sentences and add it to an `AudioTranscription` object.

In [7]:
sentences = IntervalTree(
    [
        Interval(
            begin=0.0,
            end=1.0,
            data=TranscriptionData(
                index=0, text="This deal will greatly boost our economy."
            ),
        ),
        Interval(
            begin=1.0,
            end=2.0,
            data=TranscriptionData(
                index=1, text="The country decided to open its borders."
            ),
        ),
    ]
)

transcription = AudioTranscription(
    filename="debate.mp4",
    segments=sentences,
)

We can now use the `AudioAnnotation` object as input for the `apply` method of our topic extractor component.

In [8]:
result = extractor.apply(transcription)

100%|██████████| 2/2 [00:01<00:00,  1.34it/s]


Let's iterate over the result and print the topic probabilities for each sentence.

In [9]:
for seg in result.segments:
    print(seg.data.topics)

{'trade': 0.9688225388526917, 'justice': 0.0038643144071102142, 'migration': 0.030821722000837326}
{'trade': 0.6463996767997742, 'justice': 0.015696685761213303, 'migration': 0.7641936540603638}


For the first sentence, the Trade topic has by far the highest probability, that is, the topic is most likely entailed in the sentence. The second sentence, however, entails Trade and Migration to a similar extent since borders can refer both topics.

### Real-world example

We can also include our custom component in a MEXCA pipeline and apply it to a real world example. We will use a video of the US presidential debate between Clinton and Trump in 2016 which can be found on [YouTube](https://www.youtube.com/watch?v=DBhrSdjePkk). First, let's download the video from a third-party website.

In [10]:
def download_example(url, filename):
    # Check if filename exists
    if not os.path.exists(filename):
        video = urlopen(url)

        with open(filename, "wb") as file:
            file.write(video.read())

In [11]:
example_url = "https://books.psychstat.org/rdata/data/debate.mp4"
filename = "debate.mp4"

download_example(example_url, filename)

Video(filename)

Now, we create a MEXCA pipeline with three components: An `SpeakerIdentifier` to detect speech segments, an `AudioTranscriber` component to transcribe the speech, and our `CustomTopicExtractor` to predict the topic probabilities. We specify that we want to detect speech for two different speaker. For the audio transcription, we select the smallest Whisper model to speed up the process.

In [12]:
pipeline = Pipeline(
    speaker_identifier=SpeakerIdentifier(
        num_speakers=2,
        use_auth_token="HF_TOKEN" # Replace this string with your token
    ),
    audio_transcriber=AudioTranscriber(whisper_model="tiny"),
    sentiment_extractor=extractor,
)

We apply the pipeline to the debate video. We specify that we only want to process the file from second 10 to second 30 and that the language is English. We also add that we don't want to merge the resulting output from the different components since we did not implement how our topic probabilities should be merged.

In [13]:
output = pipeline.apply(
    filepath=filename, process_subclip=(10, 30), language="en", merge=False
)

huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Av

100%|██████████| 33/33 [00:03<00:00,  9.85it/s]


huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


100%|██████████| 5/5 [00:11<00:00,  2.35s/it]
100%|██████████| 10/10 [00:01<00:00,  6.59it/s]


Let's take a look at the transcribed speech segments and the most likely topic for each sentence.

In [14]:
for seg in output.sentiment.segments:
    topic = list(seg.data.topics.keys())[
        torch.Tensor(list(seg.data.topics.values())).argmax()
    ]

    print("Sentence: ", seg.data.text, "\n Topic: ", topic)

Sentence:  Well, Donald, I know you live in your own reality, but that's 
 Topic:  trade
Sentence:  was against it once it was finally negotiated and the terms were laid out. 
 Topic:  trade
Sentence:  Do you need any other stuff? 
 Topic:  trade
Sentence:  Well, I hope. 
 Topic:  justice
Sentence:  called it the gold standard. 
 Topic:  justice
Sentence:  You called it the gold standard of trade deals. 
 Topic:  trade
Sentence:  He said it's the finest deal you've ever seen. 
 Topic:  trade
Sentence:  You called it the gold standard. 
 Topic:  justice
Sentence:  I wrote about that in. 
 Topic:  trade
Sentence:  And then you heard what I said about it, and all of a sudden you were against it. 
 Topic:  trade


The debate at this time interval seems to be mostly about trade deals. Also note how gold standard is more associated with Justice than Trade.

# Conclusion

In this example, we created a custom component for extracting topic probabilities from transcribed text. This was done by creating a subclass from an existing mexca component and imitating it's input and output. This approach generally illustrates how new components for MEXCA can be created or existing components can be modified.