In [None]:
import warnings

warnings.filterwarnings("ignore")
import os

if "jbook" in os.getcwd():
    os.chdir(os.path.abspath(os.path.join("../..")))
FORCE = False

# Text Quality Analysis (TQA) 
Review text quality is an indicator of the content's richness, coherence, and informativeness. In this section, we integrate two complementary quality assessment measures—a lexical/syntactic complexity score and a perplexity-based coherence score—into a weighted sum. This approach provides a balanced evaluation, capturing both the structural diversity and natural language fluency of the reviews.

## Syntactic and Lexical Complexity Assessment
The lexical and syntactic quality assessment (TQA) evaluates review quality using a composite score derived from multiple syntactic and lexical measures. These measures are computed with specific weights:

- **Syntactic Extent Score** (40%): Reflects the richness of content using counts of nouns, verbs, adjectives, and adverbs.
- **Syntactic Diversity Score** (20%): Captures variety in language using an entropy-based calculation.
- **Syntactic Complexity Score** (10%): Assesses the density of key parts of speech relative to total word count.
- **Lexical Complexity Score** (20%): Evaluates text complexity using unique word proportion, special character usage, and word length variation.
- **Typography Score** (10%): Incorporates quality signals such as limited digit use, minimal special characters, and proper terminal punctuation.

A high Lexical and Syntactic Complexity Score typically indicates a text rich in linguistic features, with varied sentence structures and a well-balanced mix of nouns, verbs, and modifiers (like adjectives and adverbs). This variety is particularly valuable for tasks like Aspect-Based Sentiment Analysis (ABSA), where structural complexity can signal content with nuanced aspects and sentiments.

## Coherence - Perplexity-Based Quality Assessment
This measure evaluates review quality by applying 14 linguistic and structural filters, each assigned a weight derived from relative perplexity differences between the full dataset and filtered subsets. The filters assess features like adjective presence, punctuation ratios, word repetition, and special character use. Weights are computed to emphasize filters that most reduce perplexity, thus enhancing text fluency and coherence. The final score is a weighted sum of these filter indicators.

Lower perplexity implies higher fluency, coherence, and grammatical correctness, which are key indicators of text quality. This component is useful for flagging low-quality or noisy text that may be unpredictable or deviate significantly from standard linguistic norms.

## Weighted Scoring Approach
To create a balanced quality score, the Syntactic Complexity Score and Perplexity-Based Score are combined with tailored weights that emphasize their respective strengths.

- **Lexical and Syntactic Complexity Weight**: Typically given more weight when the task demands detailed and linguistically rich text, such as ABSA, where richer syntactic content improves aspect and sentiment extraction.
- **Perplexity-Based Weight**: Often assigned a moderate weight to capture coherence and fluency, ensuring that only grammatically sound and predictable text is prioritized without sacrificing syntactic diversity.

The final **Text Quality Score** is a weighted average of these two components, providing a single score that balances both syntactic richness and linguistic fluency. The remainder of this notebook will execute the text quality scoring pipeline, computing and integrating the two quality measures into a final text quality score. The specifics of the measures are provided in {ref}`appendix:tqa`.

## Purpose of this Notebook
Reproducibility and code transparency are critical principles in scientific research, reinforcing the credibility and trustworthiness of our findings. Clean, repeatable script-based workflows that trace the journey from raw data to clean data and final analyses are essential components of this reproducibility.

To this point, our tendency to abstract away *implementation details* in favor of narrative coherence and focus on what *we* believe is important, has been in contravention of these ideas. In this notebook, we lean into code transparency and reproducibility. Our aim here is to describe our text quality analysis methodology in detail, then walk through the process of configuring, constructing, and executing the workflow; thereby, illustrating how configurations, tasks, and stages are materialized into operational pipelines here, and in downstream workflows. By exposing these *details* here, we hope to establish a shared understanding that will allow for abstraction in downstream tasks without sacrificing clarity or trust.

Here we go.

## Import Libraries

In [None]:
from discover.container import DiscoverContainer
from discover.infra.config.flow import FlowConfigReader
from discover.core.flow import DataPrepStageDef
from discover.flow.data_prep.tqa.stage import TQAStage
from discover.core.flow import PhaseDef, DataPrepStageDef

## Dependency Container

In [None]:
container = DiscoverContainer()
container.init_resources()
container.wire(
    modules=[
        "discover.flow.data_prep.stage",
    ],
)

## Text Quality Analysis Pipeline
Here, we configure the pipeline, construct the `TQAStage` object and run it.

Viola! How's that for code transparency? 

![reproduciblity](../figs/reproducibility.jpeg)


Reproducibility and code transparency are critical principles in scientific research, reinforcing the credibility and trustworthiness of our findings. Clean, repeatable script-based pipelines that trace the journey from raw data to clean data and final analyses are essential components of this reproducibility.

> The trustworthiness of the results of such pipelines rests entirely on their ability to be reproduced with fidelity, which is difficult if pipelines are not documented or recorded minutely and consistently. This difficulty has led to a reproducibility crisis...{cite}`rupprechtImprovingReproducibilityData2020`. 

To this point, our tendency to abstract away *implementation details* in favor of narrative coherence and focus on what *we* believe is important, has been in contravention of these ideas. Code snippets like that above In this notebook, we lean into code transparency and reproducibility. This notebook is an attemptIn this notebook, we  we lean into code transparency and reproducibility. Here, we describe our text quality analysis methodology in detail, then walk through the process of configuring, constructing, and executing the workflow; thereby, illustrating how configurations, tasks, and stages are materialized into pipelines here, and in downstream workflows. 

For whom such detail is overly 
With that rather prolix introduction, 

By exposing these *details* here, we hope to establish a shared understanding that will allow for abstraction in downstream tasks without sacrificing clarity or trust.

Here we go

## Text Quality Analysis Pipeline Tasks
The tasks we'll define below execute in three stages:
1. **Syntactic and Lexical Complexity Scoring Tasks**: These tasks compute the syntactic/lexical complexity scores for each review.
2. **Perplexity-Based Coherence Scoring Tasks**: Coeherence scores for each review are computed here.
3. **Text Quality Scoring Task**: Finally, we compute the composite score, combining the above two measures into a weighted sum, reflecting a balanced evaluation of the quality of each review.

Here, we go.

### Syntactic and Lexical Complexity Scoring Tasks
The **Syntactic and Lexical Complexity Score** is computed through a series of tasks:

1. **NLPTask**: Leverages SparkNLP to tokenize the review text and assign part-of-speech (POS) tags to each word.
2. **ComputeSyntacticStatsTask**: Calculates syntactic metrics, such as raw counts of each POS and the proportions of review text represented by each POS.
3. **ComputeLexicalStatsTask**: Computes lexical complexity statistics, including unique word counts, word proportions, and word length metrics.
4. **ComputeSyntacticLexicalComplexityScore**: Combines the syntactic and lexical metrics to produce a comprehensive complexity score.

Well, I checked StackExchange, and no, these tasks aren't going to define themselves. So, Alex, my SparkNLP coding hat, please!

#### NLPTask
The **NLPTask** class utilizes a Spark ML Pipeline to streamline NLP preprocessing on text data within a Spark DataFrame. It begins by assembling raw text into a structured document format using SparkNLP's `DocumentAssembler`. The input is then tokenized, preparing it for POS tagging with a pretrained `PerceptronModel`. Finally, the `Finisher` stage converts the processed NLP annotations into plain lists of tokens and POS tags, making them easily accessible and usable in the DataFrame. 

In [None]:
# %load -r 19-140 discover/flow/data_prep/tqa/task.py
import math
import os

import matplotlib.pyplot as plt
import pyspark.pandas as ps
from pyspark.ml import Pipeline
from pyspark.sql import DataFrame
from pyspark.sql import functions as F
from sparknlp.annotator import PerceptronModel, Tokenizer
from sparknlp.base import DocumentAssembler, Finisher

from discover.flow.base.task import Task
from discover.infra.service.logging.task import task_logger
from discover.infra.utils.file.io import IOService


# ------------------------------------------------------------------------------------------------ #
#                                       NLP TASK                                                   #
# ------------------------------------------------------------------------------------------------ #
class NLPTask(Task):
    """
    A class to perform NLP preprocessing on a specified content column in a Spark DataFrame.
    This task includes tokenization, POS tagging, and formatting of the output as plain lists.

    Attributes
    ----------
    column : str
        The name of the column containing content data to process (default is "content").

    Methods
    -------
    run(data: DataFrame) -> DataFrame
        Executes the NLP pipeline on the provided DataFrame, adding token and POS tag columns.

    _build_pipeline() -> Pipeline
        Constructs a Spark ML Pipeline with stages for document assembly, tokenization,
        POS tagging, and output formatting using a Finisher.
    """

    def __init__(self, column: str = "content") -> None:
        super().__init__()
        self._column = column

    @task_logger
    def run(self, data: DataFrame) -> DataFrame:
        """
        Executes the NLP pipeline on the input DataFrame, applying tokenization and POS tagging,
        and returns the transformed DataFrame with additional columns for tokens and POS tags.

        Parameters
        ----------
        data : DataFrame
            The Spark DataFrame containing the content data column specified during initialization.

        Returns
        -------
        DataFrame
            A transformed Spark DataFrame with new columns: 'tokens' and 'pos', containing lists
            of tokens and POS tags, respectively.
        """
        pipeline = self._build_pipeline()
        return pipeline.fit(data).transform(data)

    def _build_pipeline(self) -> Pipeline:
        """
        Builds and returns a Spark ML Pipeline with stages for document assembly, tokenization,
        POS tagging, and a Finisher for output formatting.

        Returns
        -------
        Pipeline
            A configured Spark Pipeline that performs NLP tasks including tokenization, POS tagging,
            and result formatting for easy integration into a DataFrame.
        """
        # Assembles raw content data into a Spark NLP document
        document_assembler = (
            DocumentAssembler().setInputCol(self._column).setOutputCol("document")
        )

        # Tokenizer splits words for NLP processing
        tokenizer = Tokenizer().setInputCols(["document"]).setOutputCol("tokens")

        # POS Tagging with a pretrained model
        pos = (
            PerceptronModel.pretrained("pos_ud_ewt", "en")
            .setInputCols(["document", "tokens"])
            .setOutputCol("pos_tags")
        )

        # Finisher converts annotations to plain lists for DataFrame output
        finisher = (
            Finisher()
            .setInputCols(["tokens", "pos_tags"])
            .setOutputCols(["tp_tokens", "tp_pos"])
        )

        # Create and return Pipeline with the defined stages
        pipeline = Pipeline(
            stages=[
                document_assembler,
                tokenizer,
                pos,
                finisher,
            ]
        )
        return pipeline

#### Compute Syntactic Complexity Measures
This code processes a Spark DataFrame containing lists of part-of-speech (POS) tags for each text entry. First, it counts the total number of POS tags per entry. It then calculates the counts of specific POS types, such as nouns, verbs, adjectives, and adverbs. Next, it computes the proportion of each POS type relative to the total POS count, ensuring these ratios are only calculated when the total count is greater than zero. Finally, the code cleans up by removing any intermediate columns that are no longer needed.

In [None]:
# %load -r 145-249 discover/flow/data_prep/tqa/task.py
class ComputeSyntacticStatsTask(Task):
    """
    A task to compute Part-of-Speech (POS) statistics for a specified column in a PySpark DataFrame.

    This task generates counts and proportions for specific POS tags (nouns, verbs, adjectives, adverbs, determiners)
    based on POS tags available in the input DataFrame. These statistics are useful for analyzing the linguistic
    characteristics of the text in each row.

    Attributes:
        column (str): The name of the column containing the text or POS data to analyze. Defaults to "content".

    Methods:
        run(data: DataFrame) -> DataFrame:
            Executes the POS statistics calculations on the specified column of the input DataFrame and returns
            the DataFrame with the new POS statistics columns.

    POS Statistics Columns:
        pos_n_nouns (int): The number of noun tags in the text.
        pos_n_verbs (int): The number of verb tags in the text.
        pos_n_adjectives (int): The number of adjective tags in the text.
        pos_n_adverbs (int): The number of adverb tags in the text.
        pos_n_determiners (int): The number of determiner tags in the text.
        pos_p_nouns (float): The proportion of noun tags relative to the total POS tags.
        pos_p_verbs (float): The proportion of verb tags relative to the total POS tags.
        pos_p_adjectives (float): The proportion of adjective tags relative to the total POS tags.
        pos_p_adverbs (float): The proportion of adverb tags relative to the total POS tags.
        pos_p_determiners (float): The proportion of determiner tags relative to the total POS tags.
    """

    def __init__(self, column: str = "content") -> None:
        """
        Initializes the ComputeSyntacticStatsTask with the specified text or POS column.

        Args:
            column (str): The name of the column containing the POS data. Defaults to "content".
        """
        super().__init__()
        self._column = column

    @task_logger
    def run(self, data: DataFrame) -> DataFrame:
        """
        Executes the POS statistics calculations on the specified column.

        The function calculates the counts and proportions of specific POS tags (nouns, verbs, adjectives, adverbs,
        determiners) within each entry of the specified column. The resulting statistics are added as new columns
        in the DataFrame.

        Args:
            data (DataFrame): The input PySpark DataFrame containing the POS tags as a list in the "tp_pos" column.

        Returns:
            DataFrame: The input DataFrame with additional POS statistics columns.
        """

        # Assuming `tp_pos` column contains lists of POS tags for each entry
        # Step 1: Calculate total POS tag count per entry
        data = data.withColumn("pos_count", F.size("tp_pos"))

        # Step 1: Calculate counts of specific POS tags (e.g., NOUN, VERB, ADJ)
        data = data.withColumn(
            "pos_n_nouns", F.expr("size(filter(tp_pos, x -> x = 'NOUN'))")
        )
        data = data.withColumn(
            "pos_n_verbs", F.expr("size(filter(tp_pos, x -> x = 'VERB'))")
        )
        data = data.withColumn(
            "pos_n_adjectives", F.expr("size(filter(tp_pos, x -> x = 'ADJ'))")
        )
        data = data.withColumn(
            "pos_n_adverbs", F.expr("size(filter(tp_pos, x -> x = 'ADV'))")
        )

        # Step 2: Calculate ratios/percentages of specific POS tags
        data = data.withColumn(
            "pos_p_nouns",
            F.when(
                F.col("pos_count") > 0, F.col("pos_n_nouns") / F.col("pos_count")
            ).otherwise(0),
        )
        data = data.withColumn(
            "pos_p_verbs",
            F.when(
                F.col("pos_count") > 0, F.col("pos_n_verbs") / F.col("pos_count")
            ).otherwise(0),
        )
        data = data.withColumn(
            "pos_p_adjectives",
            F.when(
                F.col("pos_count") > 0, F.col("pos_n_adjectives") / F.col("pos_count")
            ).otherwise(0),
        )
        data = data.withColumn(
            "pos_p_adverbs",
            F.when(
                F.col("pos_count") > 0, F.col("pos_n_adverbs") / F.col("pos_count")
            ).otherwise(0),
        )

        # Drop intermediate column if not needed
        data = data.drop("pos_count")

        return data

#### Compute Lexical Complexity Measures
This task calculates various lexical metrics for text data in a PySpark DataFrame. It starts by computing basic character-based statistics, such as the total character count, digit count, and punctuation count, and then calculates their respective proportions. Next, the text is tokenized into words to determine word counts, unique word counts, and the ratio of unique to total words, providing insights into vocabulary richness. The task also measures word length statistics, including the minimum, maximum, mean, and standard deviation, to capture the complexity of the text. 

In [None]:
# %load -r 253-395 discover/flow/data_prep/tqa/task.py
class ComputeLexicalStatsTask(Task):
    """
    A task to compute basic text statistics for a specified column in a PySpark DataFrame.

    This task generates various statistics for text data, such as character count, digit and punctuation counts,
    word count, unique word count, and word length statistics, which are useful for analyzing the content and structure
    of text in each row.

    Attributes:
        column (str): The name of the column containing the text data to analyze. Defaults to "content".

    Methods:
        run(data: DataFrame) -> DataFrame:
            Executes the basic statistics calculations on the specified column of the input DataFrame and returns
            the DataFrame with the new statistics columns.

    Basic Statistics Columns:
        stats_char_count (int): The total number of characters in the text.
        stats_digits_count (int): The total number of digits in the text.
        stats_digits_proportion (float): The proportion of digits to total characters.
        stats_special_chars_count (int): The total number of punctuation marks in the text.
        stats_special_chars_proportion (float): The proportion of punctuation marks to total characters.
        stats_word_count (int): The total number of words in the text.
        stats_unique_word_count (int): The total number of unique words in the text.
        stats_unique_word_proportion (float): The proportion of unique words to total words.
        stats_word_length_min (int): The minimum word length in the text.
        stats_word_length_max (int): The maximum word length in the text.
        stats_word_length_mean (float): The mean word length in the text.
        stats_word_length_std (float): The standard deviation of word lengths in the text.
    """

    def __init__(self, column: str = "content") -> None:
        """
        Initializes the ComputeLexicalStatsTask with the specified text column.

        Args:
            column (str): The name of the column containing the text data to analyze. Defaults to "content".
        """
        super().__init__()
        self._column = column

    @task_logger
    def run(self, data: DataFrame) -> DataFrame:
        # 1. Character count
        data = data.withColumn("stats_char_count", F.length(self._column))

        # 2. Digits count
        data = data.withColumn(
            "stats_digits_count",
            F.expr("regexp_count(content, '[^0-9]')"),
        )

        # 3. Digits proportion
        data = data.withColumn(
            "stats_digits_proportion",
            F.when(
                F.col("stats_char_count") > 0,
                F.col("stats_digits_count") / F.col("stats_char_count"),
            ).otherwise(0),
        )

        # 4. Special chars count
        data = data.withColumn(
            "stats_special_chars_count",
            F.expr("regexp_count(content, r'[^\\w\\s]')"),
        )

        # 5. Special chars proportion
        data = data.withColumn(
            "stats_special_chars_proportion",
            F.when(
                F.col("stats_char_count") > 0,
                F.col("stats_special_chars_count") / F.col("stats_char_count"),
            ).otherwise(0),
        )

        # 8. Split content into words
        data = data.withColumn("words", F.split(F.col(self._column), "\\s+"))

        # 9. Word count
        data = data.withColumn("stats_word_count", F.size("words"))

        # 10. Unique word count
        data = data.withColumn("unique_words", F.array_distinct("words"))
        data = data.withColumn("stats_unique_word_count", F.size("unique_words"))

        # 11. Unique word proportion
        data = data.withColumn(
            "stats_unique_word_proportion",
            F.when(
                F.col("stats_word_count") > 0,
                F.col("stats_unique_word_count") / F.col("stats_word_count"),
            ).otherwise(0),
        )

        # 12. Word Repetition Ratio
        data = data.withColumn(
            "stats_word_repetition_ratio", 1 - F.col("stats_unique_word_proportion")
        )

        # Drop intermediate columns
        data = data.drop("words", "unique_words")

        # 14. Word length statistics
        # Split content into words and calculate word lengths
        data = data.withColumn(
            "word_lengths",
            F.expr("transform(split(content, '\\\\s+'), x -> length(x))"),
        )

        # Minimum word length
        data = data.withColumn("stats_word_length_min", F.array_min("word_lengths"))

        # Maximum word length
        data = data.withColumn("stats_word_length_max", F.array_max("word_lengths"))

        # Mean word length
        data = data.withColumn(
            "stats_word_length_mean",
            F.expr(
                "aggregate(transform(word_lengths, x -> CAST(x AS DOUBLE)), CAST(0.0 AS DOUBLE), (acc, x) -> acc + x) / size(word_lengths)"
            ),
        )

        # Standard deviation of word length
        data = data.withColumn(
            "stats_word_length_std",
            F.when(
                F.size("word_lengths") > 1,
                F.sqrt(
                    F.expr(
                        "aggregate(transform(word_lengths, x -> CAST(x AS DOUBLE)), CAST(0.0 AS DOUBLE), (acc, x) -> acc + pow(x - stats_word_length_mean, 2)) / size(word_lengths)"
                    )
                ),
            ).otherwise(0),
        )

        # Drop intermediate column if not needed
        data = data.drop("word_lengths")

        return data

#### Compute Syntactic/Lexical Complexity Score
To evaluate the richness and complexity of text data, this method calculates a comprehensive **SyntacticLexical Complexity Score** composed of three main components:

1. **POS Diversity Score**: Using an entropy-based approach, this score quantifies the variety of part-of-speech (POS) tags, such as nouns, verbs, adjectives, and adverbs. It highlights how diverse the grammatical structures are within the text.

2. **POS Density Score**: By examining the ratio of total POS tags to the word count, this score captures the intensity of POS usage. It indicates how grammatically dense or varied the language is throughout the text.

3. **Lexical Complexity Score**: This metric assesses vocabulary sophistication. It factors in the proportion of unique words, the prevalence of special characters, and the variability in word lengths, offering a detailed look at the text’s lexical intricacy.

Together, these scores are combined in a weighted manner to form the overall **SyntacticLexical Complexity Score**.

In [None]:
# %load -r 400-579 discover/flow/data_prep/tqa/task.py
class ComputeSyntacticLexicalScoresTask(Task):
    """
    A task to compute a Text Quality Assessment (TQA) score based on various components
    such as POS count, POS diversity, lexical complexity, POS intensity, and TQA quality checks.

    Attributes:
        pos_diversity_weight (float): The weight assigned to the POS diversity component.
        pos_density_weight (float): The weight assigned to the POS intensity component.
        lexical_complexity_weight (float): The weight assigned to the lexical complexity component.
        column (str): Column containing review text.
        new_column (str): The name of the output column to store the computed TQA score.
    """

    def __init__(
        self,
        pos_diversity_weight: float,
        pos_density_weight: float,
        lexical_complexity_weight: float,
        column: str = "content",
        new_column: str = "tqa_syntactic_lexical_score",
    ) -> None:
        """
        Initializes the ComputeSyntacticLexicalScoresTask with specified weights and output column name.

        Args:
            pos_diversity_weight (float): Weight for the POS diversity component.
            pos_density_weight (float): Weight for the POS intensity component.
            lexical_complexity_weight (float): Weight for the lexical complexity component.
            new_column (str): Name of the output column for the TQA score. Defaults to "enrichment_tqa_score1".
        """
        super().__init__()
        self._pos_diversity_weight = pos_diversity_weight
        self._pos_density_weight = pos_density_weight
        self._lexical_complexity_weight = lexical_complexity_weight
        self._column = column
        self._new_column = new_column

    @task_logger
    def run(self, data: DataFrame) -> DataFrame:
        """
        Executes the TQA score computation by applying several components as UDFs.

        Args:
            data (DataFrame): The input PySpark DataFrame containing text data and related features.

        Returns:
            DataFrame: The input DataFrame with additional columns for each component score
            and the final TQA score.
        """

        # Define UDFs for each computation

        @F.udf("float")
        def compute_pos_diversity_score(
            content, pos_p_nouns, pos_p_verbs, pos_p_adjectives, pos_p_adverbs
        ):
            """
            Computes the POS diversity score using an entropy-based calculation.

            Args:
                content (str): The text content of the review.
                pos_p_nouns (float): Proportion of nouns in the content.
                pos_p_verbs (float): Proportion of verbs in the content.
                pos_p_adjectives (float): Proportion of adjectives in the content.
                pos_p_adverbs (float): Proportion of adverbs in the content.

            Returns:
                float: The computed POS diversity score.
            """
            if len(content) > 2:
                pos_tags = [pos_p_nouns, pos_p_verbs, pos_p_adjectives, pos_p_adverbs]
                pos_diversity = -sum(p * math.log(p) for p in pos_tags if p > 0)
                return float(pos_diversity * self._pos_diversity_weight)
            return 0.0

        @F.udf("float")
        def compute_pos_density_score(
            content,
            pos_n_nouns,
            pos_n_verbs,
            pos_n_adjectives,
            pos_n_adverbs,
            stats_word_count,
        ):
            """
            Computes the POS intensity score based on the number of POS tags relative
            to the word count.

            Args:
                content (str): The text content of the review.
                pos_n_nouns (int): Number of nouns in the content.
                pos_n_verbs (int): Number of verbs in the content.
                pos_n_adjectives (int): Number of adjectives in the content.
                pos_n_adverbs (int): Number of adverbs in the content.
                stats_word_count (int): Total word count in the content.

            Returns:
                float: The computed POS intensity score.
            """
            if len(content) > 2 and stats_word_count > 0:
                pos_density = (
                    pos_n_nouns + pos_n_verbs + pos_n_adjectives + pos_n_adverbs
                ) / stats_word_count
                return float(pos_density * self._pos_density_weight)
            return 0.0

        @F.udf("float")
        def compute_lexical_complexity_score(
            content,
            stats_unique_word_proportion,
            stats_special_chars_proportion,
            stats_word_length_std,
        ):
            """
            Computes the lexical complexity score based on unique word proportion,
            special character proportion, and word length standard deviation.

            Args:
                content (str): The text content of the review.
                stats_unique_word_proportion (float): Proportion of unique words in the content.
                stats_special_chars_proportion (float): Proportion of special characters in the content.
                stats_word_length_std (float): Standard deviation of word lengths in the content.

            Returns:
                float: The computed lexical complexity score.
            """
            if len(content) > 2:
                lexical_complexity = (
                    0.4 * stats_unique_word_proportion
                    + 0.3 * stats_special_chars_proportion
                    + 0.3 * stats_word_length_std
                )
                return float(lexical_complexity * self._lexical_complexity_weight)
            return 0.0

        # Apply UDFs to create new columns
        data = data.withColumn(
            "tqm_pos_diversity_score",
            compute_pos_diversity_score(
                F.col(self._column),
                F.col("pos_p_nouns"),
                F.col("pos_p_verbs"),
                F.col("pos_p_adjectives"),
                F.col("pos_p_adverbs"),
            ),
        )

        data = data.withColumn(
            "tqm_pos_density_score",
            compute_pos_density_score(
                F.col(self._column),
                F.col("pos_n_nouns"),
                F.col("pos_n_verbs"),
                F.col("pos_n_adjectives"),
                F.col("pos_n_adverbs"),
                F.col("stats_word_count"),
            ),
        )

        data = data.withColumn(
            "tqm_lexical_complexity_score",
            compute_lexical_complexity_score(
                F.col(self._column),
                F.col("stats_unique_word_proportion"),
                F.col("stats_special_chars_proportion"),
                F.col("stats_word_length_std"),
            ),
        )

        # Calculate the TQA score as a weighted combination of components
        data = data.withColumn(
            self._new_column,
            F.col("tqm_pos_diversity_score")
            + F.col("tqm_lexical_complexity_score")
            + F.col("tqm_pos_density_score"),
        )

        return data

Next, we assess the coherence of each review. 

### Compute Perplexity-Based Coherence Score Pipeline
The perplexity-based coherence scores are computed in three steps.

1. **ComputePerplexityFilters**: Binary indicators for 14 data quality heuristics are added to the dataset. They indicate the presence of key POS tags that are important in sentiment analysis. We will denote the matrix of these indicators $I$, where $I_i{(\text{review})}=1$ indicates that the review satisfies the $i^{th}$ filter criteria.  

2. **ComputePerplexityWeights**: Each of the 14 filters are applied to the dataset, creating 14 subsets. The average perplexity of each subset is computed, and the perplexity weight reflects the degree to which each filter reduces complexity as follows:
$$w_i=\text{max}\bigg(0,\frac{PP_{all}-PP_i}{PP_{all}}\bigg)$$
where $PP_{all}$ is average perplexity of the full dataset, and $P_i$ is the average perplexity of the dataset with the $i^{th}$ filter applied.  

3. **ComputeCoherenceScores**: Perplexity-based coherence scores are computed as a weighted sum of the indicator matrix $I$ and the weights $w$ created above as:
$$\text{score}_{review}=\frac{\displaystyle\sum_{i=1}^Fw_iI_i(\text{review})}{\displaystyle\sum_{i=1}^Fw_i}$$ 

Here's the task.

#### Compute Perplexity Filters
This task creates binary indicators for 14 filters, each selected from over 50+ filters based on consistent perplexity improvements {cite}`sharmaTextQualityBasedPruning2024`.

1. **Linguistic Features**: New columns are created to indicate whether the text contains specific parts of speech:
   - **Adjective**: `tqf_has_adjective` checks for at least one adjective.
   - **Adverb**: `tqf_has_adverb` checks for at least one adverb.
   - **Determiner**: `tqf_has_determiner` checks for at least one determiner.
   - **Noun**: `tqf_has_noun` checks for at least one noun.
   - **Verb**: `tqf_has_verb` checks for at least one verb.

2. **Punctuation and Special Character Checks**:
   - **Terminal Punctuation**: `tqf_has_terminal_punctuation` checks if the text ends with a period, exclamation mark, or question mark.
   - **High Special Characters Ratio**: `tqf_high_special_chars_ratio` is `True` if special characters make up more than 25% of the text.
   - **High Punctuation Ratio**: `tqf_high_punctuation_ratio` is `True` if punctuation makes up more than 25% of the text.

3. **Content Length and Structure**:
   - **Word Count Range**: `tqf_word_count_range` is `True` if the word count is between 4 and 255 words.
   - **Stop Word Match**: `tqf_stop_word_match` checks if at least two common stop words are present in the text.
   - **First Letter Capitalized**: `tqf_first_letter_cap` indicates if the first letter of the text is uppercase.
   - **Not All Caps**: `tqf_no_all_caps` is `True` if the text is not written entirely in uppercase.

4. **Word Repetition and Special Character Checks**:
   - **High Word Repetition**: `tqf_high_word_repetition` flags text with a word repetition ratio of 20% or higher.
   - **No Special Characters**: `tqf_no_special_chars` is `True` if the text contains no special characters beyond standard punctuation.

Finally, the code drops unnecessary columns for tokens and POS tags, returning the enriched DataFrame with these new features.

In [None]:
# %load -r 604-714 discover/flow/data_prep/tqa/task.py
class ComputePerplexityFiltersTask(Task):
    """
    A task to compute Text Quality Assessment (TQA) statistics for reviews in a PySpark DataFrame.

    This task generates various boolean flags based on the presence of certain parts of speech, punctuation patterns,
    and statistical ratios in the review text. These flags can be used to assess the quality and characteristics
    of each review.

    Methods:
        run(data: DataFrame) -> DataFrame:
            Executes the TQA statistics calculations on the specified columns of the input DataFrame and returns the
            DataFrame with the new TQA columns.

    TQA Filter Columns:
        tqf_has_adjective (bool): True if the review has at least one adjective.
        tqf_has_adverb (bool): True if the review has at least one adverb.
        tqf_has_determiner (bool): True if the review has at least one determiner.
        tqf_has_noun (bool): True if the review has at least one noun.
        tqf_has_terminal_punctuation (bool): True if the review contains terminal punctuation (., !, or ?).
        tqf_has_verb (bool): True if the review has at least one verb.
        tqf_high_digit_ratio (bool): True if the ratio of digits to words is greater than 0.25.
        tqf_high_punctuation_ratio (bool): True if the ratio of punctuation to words is greater than 0.25.
        tqf_word_count_range (bool): True if the word count is between 3 and 256.
    """

    def __init__(self) -> None:
        super().__init__()

    @task_logger
    def run(self, data: DataFrame) -> DataFrame:

        # 1. Whether review has at least one adjective
        data = data.withColumn("tqf_has_adjective", F.col("pos_n_adjectives") > 0)

        # 2. Whether review has at least one adverb
        data = data.withColumn("tqf_has_adverb", F.col("pos_n_adverbs") > 0)

        # 3. Whether review has at least one determiner
        data = data.withColumn("tqf_has_determiner", F.col("pos_n_determiners") > 0)

        # 4. Whether review has at least one noun
        data = data.withColumn("tqf_has_noun", F.col("pos_n_nouns") > 0)

        # 5. Whether the review contains terminal punctuation (., !, or ?)
        data = data.withColumn(
            "tqf_has_terminal_punctuation", F.col("content").rlike("[.!?]$")
        )

        # 6. Whether review has at least one verb
        data = data.withColumn("tqf_has_verb", F.col("pos_n_verbs") > 0)

        # 7. Whether special characters to words ratio is greater than 0.25
        data = data.withColumn(
            "tqf_high_special_chars_ratio",
            F.col("stats_special_chars_proportion") > 0.25,
        )

        # 8. Whether punctuation to words ratio is greater than 0.25
        data = data.withColumn(
            "tqf_high_punctuation_ratio",
            F.col("stats_punctuation_proportion") > 0.25,
        )

        # 8. Whether word count is in the range > 3 and < 256
        data = data.withColumn(
            "tqf_word_count_range",
            (F.col("stats_word_count") > 3) & (F.col("stats_word_count") < 256),
        )

        # 9. Stop wprd match
        # List of stop words to search for
        stop_words = ["the", "be", "to", "of", "and", "that", "have", "with"]

        # Create conditions for each stop word
        conditions = [
            F.expr(f"array_contains(split(content, ' '), '{word}')").cast("int")
            for word in stop_words
        ]

        # Sum the conditions and check if at least 2 stop words are present
        data = data.withColumn(
            "tqf_stop_word_match", F.when(sum(conditions) >= 2, True).otherwise(False)
        )

        # 10. Create a new column "tqf_first_letter_cap" based on the first letter being uppercase
        data = data.withColumn(
            "tqf_first_letter_cap", F.expr("substring(content, 1, 1) rlike '^[A-Z]'")
        )

        # 11. Create a new column "tqf_no_all_caps" based on whether the content is all caps
        data = data.withColumn("tqf_no_all_caps", ~F.col("content").rlike("^[^a-z]*$"))

        # 12. Create a new column "tqf_high_word_repetition" if 'stats_word_repetition_ratio' >= 0.2
        data = data.withColumn(
            "tqf_high_word_repetition", F.col("stats_word_repetition_ratio") >= 0.2
        )

        # Define the regex pattern for special characters (non-alphanumeric, non-punctuation)
        special_chars_pattern = r"[^a-zA-Z0-9\s.,!?;:'\"()\-]"

        # Set tqf_no_special_chars to True if content has no special characters
        data = data.withColumn(
            "tqf_no_special_chars", ~F.col("content").rlike(special_chars_pattern)
        )

        # Delete tokens an pos tags from dataset
        data = data.drop("tp_tokens", "tp_pos")

        return data

#### Compute and Save Perplexity Weights
This task computes weights based on the average perplexity of text reviews, leveraging a set of predefined filters. Here's how it works:

1. **Check for Existing Weights**: The task first checks if the perplexity weights file already exists at the specified file path (`self._pp_filepath`). If the file exists, the task skips the computation.

2. **Calculate Overall Average Perplexity**: The code computes the overall average perplexity (`pp_all`) across all reviews in the input DataFrame.

3. **Identify and Sort Filters**: It gathers all columns in the DataFrame that start with a specific prefix (`self._pp_filter_prefix`), which correspond to different text quality filters. These filters are then sorted.

4. **Compute Weights for Each Filter**:
   - For each filter, the code filters the DataFrame to include only reviews where the filter condition is `True`.
   - It then calculates the average perplexity for this subset of reviews (`avg_perplexity_i`).
   - The weight for each filter is calculated as `(pp_all - avg_perplexity_i) / pp_all`, reflecting how much the filtered subset differs from the overall average perplexity.

5. **Save Weights to File**: The computed weights are converted into a Pandas DataFrame, formatted, and saved to a file using `IOService.write()`.

This task generates and saves weights that quantify the impact of various text quality filters on review perplexity, providing a basis for coherence assessments.

In [None]:
# %load -r 718-791 discover/flow/data_prep/tqa/task.py
class ComputePerplexityWeights(Task):
    """
    A class to compute and save perplexity weights for various filters in a PySpark DataFrame.

    This task calculates the overall average perplexity and the average perplexity for each
    filter column that starts with a specified prefix. It then computes weights for each filter
    and writes the results to a specified file.

    Attributes:
        _column (str): The name of the column containing perplexity values.
        _pp_filepath (str): The file path to save the perplexity weights.
        _pp_filter_prefix (str): The prefix used to identify filter columns.
    """

    def __init__(
        self,
        column: str = "dqp_perplexity",
        pp_filepath: str = "models/tqa/pp_weights.csv",
        pp_filter_prefix: str = "tqf_",
    ) -> None:
        """
        Initializes the ComputePerplexityWeights class with the specified parameters.

        Args:
            column (str): The name of the column containing perplexity values. Defaults to 'dqp_perplexity'.
            pp_filepath (str): The file path to save the perplexity weights. Defaults to 'models/tqa/ppl_weights.csv'.
            pp_filter_prefix (str): The prefix used to identify filter columns. Defaults to 'tqf_'.
        """
        self._column = column
        self._pp_filepath = pp_filepath
        self._pp_filter_prefix = pp_filter_prefix

    @task_logger
    def run(self, data: DataFrame) -> DataFrame:
        """
        Computes and saves perplexity weights for filter columns in the DataFrame.

        The method calculates the overall average perplexity and the average perplexity
        for each filter column. It then computes a weight for each filter based on the
        difference between the overall and individual perplexity values, and saves the
        weights to a specified file.

        Args:
            data (DataFrame): The input PySpark DataFrame containing the perplexity values and filter columns.

        Returns:
            DataFrame: The input DataFrame (unmodified).
        """
        if not os.path.exists(self._pp_filepath):
            # Calculate the overall average perplexity
            pp_all = data.agg(F.avg(self._column)).first()[0]
            w_i = {}

            # Get all columns that start with the filter prefix and sort them
            filters = [
                col for col in data.columns if col.startswith(self._pp_filter_prefix)
            ]
            filters = sorted(filters)

            # Compute the average perplexity and the weights for each filter
            for filter in filters:
                filtered_data = data.filter(F.col(filter) == True)  # noqa
                avg_perplexity_i = filtered_data.agg(F.avg(self._column)).first()[0]
                w_i[filter] = (pp_all - avg_perplexity_i) / pp_all

            # Convert the weights dictionary to a DataFrame and write to file
            weights = pd.DataFrame.from_dict(
                w_i, orient="index", columns=["weight"]
            ).reset_index(names="filter")
            IOService.write(data=weights, filepath=self._pp_filepath)

        return data

#### Compute Perplexity-Based Coherence Score
The ComputeCoherenceScoreTask class calculates a coherence score for text data based on weighted indicators. 

1. **Initialization**: 
   - The class is initialized with parameters for the output column name (`new_column`) and the file path to the perplexity weights (`pp_filepath`).
   - It loads the weights from the specified file, which are used to compute the coherence score.

2. **Loading Weights**:
   - The weights are read from a file into a Pandas DataFrame and then converted to a PySpark DataFrame for further processing.
   - These weights are collected into a list of dictionaries, making them iterable for constructing the weighted sum expression.

3. **Score Calculation**:
   - The total weight is computed by summing all individual weights.
   - A weighted sum expression is constructed by multiplying each filter indicator (converted to double) by its corresponding weight and normalizing by the total weight.
   - The coherence score is added to the DataFrame as a new column, specified by `new_column`.

4. **Output**: 
   - The method returns the modified PySpark DataFrame, now enriched with the computed coherence score.

This class calculates a coherence score that reflects the overall text quality by leveraging precomputed perplexity weights and binary filter indicators, providing a quantitative measure of coherence for each text entry.

In [None]:
# %load -r 795-851 discover/flow/data_prep/tqa/task.py
class ComputeCoherenceScoreTask(Task):
    def __init__(
        self,
        new_column: str = "tqa_coherence_score",
        pp_filepath: str = "models/tqa/pp_weights.csv",
    ):
        """
        Initializes the TQATask2 with specified parameters and loads weights for computation.

        Args:
            ppl_full (float): The perplexity value for normalization.
            column (str): The name of the column in the DataFrame containing text data. Defaults to "content".
            new_column (str): The name of the output column for the computed TQA score. Defaults to "enrichment_tqa_score2".
            ppl_filepath (str): Path to file containing filtered perplexity scores. Defaults to "models/tqa/tqa_ppl.csv".
        """
        super().__init__()
        self._new_column = new_column
        self._pp_filepath = pp_filepath

    @task_logger
    def run(self, data: DataFrame) -> DataFrame:
        """
        Computes the TQA score based on the binary indicators and the assigned weights.

        Args:
            data (DataFrame): The input PySpark DataFrame containing binary indicators for filters.

        Returns:
            DataFrame: The input DataFrame with an additional column for the computed TQA score.
        """
        # Load perplexity weights
        self._weights_pandas = IOService.read(self._pp_filepath).reset_index(drop=True)
        # Convert to spark DataFrame
        weights_spark = ps.DataFrame(self._weights_pandas).to_spark()

        # Collect weights into a list of dictionaries
        weights_list = weights_spark.collect()

        # Compute the total sum of the weights
        total_weight = sum(item["weight"] for item in weights_list)

        # Compute the weighted sum of filter indicators
        filter_sum_expr = sum(
            [
                F.col(item["filter"]).cast("double") * F.lit(item["weight"])
                for item in weights_list
            ]
        ) / F.lit(
            total_weight
        )  # Normalize by the total sum of the weights

        # Add the computed TQA score as a new column
        data = data.withColumn(self._new_column, filter_sum_expr)

        return data

## Compute Text Quality Score
Finally, we compute the text quality score as a composite of the **Syntactic/Lexical Complexity Score** and the **Perplexity-Based Coherence Score**. We first, scale each score to the range [0,1] using minmax scaling. Then the final score is weighted sum of both scaled scores as follows:

$$\text{score}_{review}=w_1\times{\text{SLC}}_{review} + w_2\times{\text{PBC}}_{review}$$ 
where $\text{SLC}_{review}$ is the **Syntactic/Lexical Complexity Score** and $\text{PBC}_{review}$ is the **Perplexity-Based Coherence Score**. Weights $w_1$ and $w_2$ are the weights for the **Syntactic/Lexical Complexity Score** and the **Perplexity-Based Coherence Score**, respectively.

In [None]:
# %load -r 855-944 discover/flow/data_prep/tqa/task.py
class ComputeTextQualityScore(Task):
    """
    A task to compute a final Text Quality Assessment (TQA) score by normalizing and combining two TQA scores
    using specified weights.

    Attributes:
        new_column (str): Column containing the final text quality score.
        tqa_syntactic_weight (float): The weight assigned to the first TQA score. Defaults to 0.4.
        tqa_perplexity_weight (float): The weight assigned to the second TQA score. Defaults to 0.6.
        _data (DataFrame): The DataFrame holding the data after computation.
    """

    def __init__(
        self,
        new_column: str = "tqa_score",
        tqa_syntactic_weight: float = 0.4,
        tqa_perplexity_weight: float = 0.6,
    ):
        """
        Initializes the ComputeTextQualityScore with specified weights for combining the two TQA scores.

        Args:
            tqa_syntactic_weight (float): Weight for the first TQA score. Defaults to 0.4.
            tqa_perplexity_weight (float): Weight for the second TQA score. Defaults to 0.6.
        """
        super().__init__()
        self._new_column = new_column
        self._tqa_syntactic_weight = tqa_syntactic_weight
        self._tqa_perplexity_weight = tqa_perplexity_weight
        self._data = None

    @task_logger
    def run(self, data: DataFrame) -> DataFrame:
        """
        Normalizes two TQA scores to the range [0, 1] and computes a final TQA score
        using the specified weights.

        Args:
            data (DataFrame): The input PySpark DataFrame containing "enrichment_tqa_score1" and "enrichment_tqa_score2" columns.

        Returns:
            DataFrame: The PySpark DataFrame with normalized scores and the final combined TQA score.
        """
        # Normalize both scores to [0, 1]
        min_max_enrichment_tqa_score1 = data.select(
            F.min("tqa_syntactic_lexical_score"), F.max("tqa_syntactic_lexical_score")
        ).first()
        min_enrichment_tqa_score1, max_enrichment_tqa_score1 = (
            min_max_enrichment_tqa_score1
        )

        min_max_enrichment_tqa_score2 = data.select(
            F.min("tqa_coherence_score"), F.max("tqa_coherence_score")
        ).first()
        min_enrichment_tqa_score2, max_enrichment_tqa_score2 = (
            min_max_enrichment_tqa_score2
        )

        data = data.withColumn(
            "tqa_syntactic_lexical_score",
            (F.col("tqa_syntactic_lexical_score") - min_enrichment_tqa_score1)
            / (max_enrichment_tqa_score1 - min_enrichment_tqa_score1),
        )

        data = data.withColumn(
            "tqa_coherence_score",
            (F.col("tqa_coherence_score") - min_enrichment_tqa_score2)
            / (max_enrichment_tqa_score2 - min_enrichment_tqa_score2),
        )

        # Combine scores using weights
        data = data.withColumn(
            self._new_column,
            self._tqa_syntactic_weight * F.col("tqa_syntactic_lexical_score")
            + self._tqa_perplexity_weight * F.col("tqa_coherence_score"),
        )

        # Compute min and max of the new column
        min_value = data.agg(F.min(self._new_column)).collect()[0][0]
        max_value = data.agg(F.max(self._new_column)).collect()[0][0]

        # Apply Min-Max transformation
        data = data.withColumn(
            self._new_column,
            (F.col(self._new_column) - min_value) / (max_value - min_value),
        )
        self._data = data
        return data

## Building the Text Quality Analysis (TQA) Pipeline
### Pipeline Configuration
The configuration for the `tqa` pipeline has four principal compoents:
1. **`stage_name`**: Specifies the name of the stage for the stage builder.
2. **`source_config`**: Specifies the stage input deataset configuration.
3. **`destination_config`**: Defines the configuration for the output dataset.
4. **`tasks`**: A list of tasks (defined above) and their arguments.

We'll begin by defining the input and output dataset configurations. The input dataset, `review` was created in `tqd` stage of the `dataprep` phase. Since we are performing NLP operations on a distributed (PySpark), we set `nlp` and `distributed` to True. This will ensure that the Spark session contains the SparkNLP packages required.

Next, we add the tasks and their parameters to the pipeline configuration.

Here, we've configured the eight tasks which we've defined above in our `tqa` configuration. Each specifies the `class_name`, the path for the `module` containing the class, and `params`, a dictionary containing the task's arguments. These arguments include most notably, the weights for the `ComputeSyntacticLexicalScoresTask` and `ComputeTextQualityScore` tasks.  Now that the **Text Quality Analysis Pipeline** is fully specified, we can now build the `DataPrepStage` object that encapsulates the pipeline tasks.

## Stage
A Stage object orchestrates the task execution of a pipeline. Exposed in the `Stage` base class below, is the `build` method which takes a stage configuration and constructs a Stage object.   

In [None]:
# %load -r 21-73 discover/flow/base/stage.py
from abc import ABC, abstractmethod
from typing import List

from discover.core.flow import PhaseDef, StageDef
from discover.core.namespace import NestedNamespace
from discover.flow.base.task import Task, TaskBuilder


# ------------------------------------------------------------------------------------------------ #
#                                        STAGE                                                     #
# ------------------------------------------------------------------------------------------------ #
class Stage(ABC):
    """Abstract base class for Stage pipelines."""

    def __init__(
        self,
        source_config: dict,
        destination_config: dict,
        tasks: List[Task],
        force: bool = False,
    ) -> None:
        self._source_config = NestedNamespace(source_config)
        self._destination_config = NestedNamespace(destination_config)
        self._tasks = tasks
        self._force = force

    @property
    @abstractmethod
    def phase(self) -> PhaseDef:
        """Phase"""

    @property
    @abstractmethod
    def stage(self) -> StageDef:
        """Stage"""

    @abstractmethod
    def run(self) -> str:
        """Stage execution"""

    @classmethod
    def build(cls, stage_config: dict, force: bool = False) -> Stage:
        tasks = [
            TaskBuilder.build(task_config) for task_config in stage_config["tasks"]
        ]
        return cls(
            source_config=stage_config["source_config"],
            destination_config=stage_config["destination_config"],
            tasks=tasks,
            force=force,
        )

The build method invokes the TaskBuilder defined below.

In [None]:
# %load -r 130-200 discover/flow/base/task.py
class TaskBuilder:
    """
    A builder class for constructing task instances from configuration data.

    The `TaskBuilder` class provides a static method, `build`, which reads task configuration
    data and dynamically creates an instance of the specified task class using the provided
    parameters. This allows for flexible and dynamic task instantiation based on configuration.

    Methods
    -------
    build(task_config: dict) -> object
        Constructs and returns an instance of a task class using the specified configuration.

    Examples
    --------
    >>> task_config = {
    ...     'module_name': 'mypackage.mymodule',
    ...     'class_name': 'NormalizationTask',
    ...     'params': {'param1': value1, 'param2': value2}
    ... }
    >>> task_instance = TaskBuilder.build(task_config)
    """

    @staticmethod
    def build(task_config):
        """
        Builds and returns an instance of a task class based on the provided configuration.

        Parameters
        ----------
        task_config : dict
            A dictionary containing task configuration with the following keys:
                - 'phase' (str): The phase associated with the task (e.g., 'preprocessing').
                - 'stage' (str): The stage within the phase (e.g., 'normalization').
                - 'module_name' (str): The name of the module containing the task class.
                - 'class_name' (str): The name of the task class to instantiate.
                - 'params' (dict): Additional parameters to pass to the task's constructor.

        Returns
        -------
        object
            An instance of the specified task class initialized with the provided parameters.

        Raises
        ------
        ModuleNotFoundError
            If the specified module cannot be found.
        AttributeError
            If the specified class does not exist in the module.
        TypeError
            If the class constructor does not accept the provided parameters.

        Examples
        --------
        >>> task_config = {
        ...     'module_name': 'mypackage.mymodule',
        ...     'class_name': 'NormalizationTask',
        ...     'params': {'param1': value1, 'param2': value2}
        ... }
        >>> task_instance = TaskBuilder.build(task_config)
        """
        module = task_config["module"]
        class_name = task_config["class_name"]
        params = task_config["params"]
        return instantiate_class(
            module=module,
            class_name=class_name,
            params=params,
        )

The `Stage` base class exposes the `build` method, which is responsible for dynamically creating the Stage object its tasks based on a configuration. Here's a breakdown of its functionality:

1. **Extract Configuration Details**: 
   - `module = task_config["module"]`: Retrieves the module name from the `task_config` dictionary.
   - `class_name = task_config["class_name"]`: Retrieves the class name from the `task_config` dictionary.
   - `params = task_config["params"]`: Retrieves the parameters for the class instantiation from the `task_config` dictionary.

2. **Instantiate the Task Class**: 
   - The `instantiate_class` function is called with the extracted `module`, `class_name`, and `params`. This function dynamically imports the specified module, retrieves the class, and creates an instance of it using the provided parameters.

This base class allows for flexible and extensible workflows, where new tasks can be added or modified simply by updating the configuration, without changing the underlying code. It is particularly useful in building configurable data pipelines or frameworks that support various types of tasks.

Next, we examine the `instantiate_class`.

In [None]:
# %load -r 29-68 discover/flow/base/task.py
def instantiate_class(module: str, class_name: str, params: dict):
    """
    Dynamically imports a module and instantiates a class with the given parameters.

    Parameters
    ----------
    module : str
        The name of the module from which to import the class (e.g., 'mypackage.mymodule').
    class_name : str
        The name of the class to instantiate from the module.
    params : dict
        A dictionary of additional parameters to pass to the class constructor.

    Returns
    -------
    object
        An instance of the specified class with the provided parameters.

    Raises
    ------
    ModuleNotFoundError
        If the specified module cannot be found.
    AttributeError
        If the specified class does not exist in the module.
    TypeError
        If the class constructor does not accept the provided parameters.

    Examples
    --------
    >>> obj = instantiate_class(
    ...     module='mypackage.mymodule',
    ...     class_name='Normalizer',
    ...     params={'param1': value1, 'param2': value2}
    ... )
    """
    module = importlib.import_module(module)
    cls = getattr(module, class_name)
    return cls(**params)

The `instantiate_class` dynamically imports and initializes a class using the `importlib` library. Here's a breakdown of what it does:

1. **Dynamic Module Import**: 
   - `importlib.import_module(module)` is used to dynamically import a module by its name, which is passed as a string to the `module` variable. This allows for the import of modules at runtime rather than at the beginning of the script.

2. **Retrieve Class Reference**: 
   - `getattr(module, class_name)` fetches the class reference from the imported module using the `class_name` string. This retrieves the class definition, allowing the code to reference the class dynamically.

3. **Instantiate Class**: 
   - `cls(**params)` creates an instance of the class, passing `params` as keyword arguments to the class's constructor. This allows for flexible instantiation of classes with varying parameters.

With this function, we are able to load and instantiate classes dynamically based on runtime information, such as configurations or external inputs.

Inheriting from the `Stage` class, the `DataPrepStage` defines orchestration for the data preparation pipelines. Let's take a look.

### Data Preparation Stage
The `run` method begins by checking whether the endpoint already exists and evaluating the `force` parameter to decide the appropriate execution path. If the endpoint exists and the `force` parameter is not set, the method simply returns the `asset_id`. However, if execution is forced or the endpoint does not exist, the method proceeds to remove any existing dataset associated with the endpoint. It then sequentially executes each task in the list, using the output of one task as the input for the next. Once all tasks have been executed, the final processed dataset is saved to the repository according to the destination dataset configuration.

In [None]:
# %load -r 22-204 discover/flow/data_prep/stage.py
from typing import List, Union

import pandas as pd
import pyspark
from dependency_injector.wiring import Provide, inject

from discover.assets.dataset import Dataset
from discover.assets.idgen import AssetIDGen
from discover.container import DiscoverContainer
from discover.core.flow import DataPrepStageDef, PhaseDef
from discover.flow.base.stage import Stage
from discover.flow.base.task import Task
from discover.infra.persistence.repo.dataset import DatasetRepo
from discover.infra.service.logging.stage import stage_logger


# ------------------------------------------------------------------------------------------------ #
#                                    DATA PREP STAGE                                               #
# ------------------------------------------------------------------------------------------------ #
class DataPrepStage(Stage):
    """
    A stage class for preparing datasets, handling loading, processing, and saving of data.

    The `DataPrepStage` class orchestrates the execution of data preparation tasks,
    including loading source datasets, applying a series of tasks, and saving the processed
    data to a destination. It uses a repository for dataset persistence and can be configured
    to force execution even if the destination dataset already exists.

    Parameters
    ----------
    source_config : dict
        Configuration for the source dataset, including details like phase, stage, and name.
    destination_config : dict
        Configuration for the destination dataset, including details like phase, stage, and name.
    tasks : List[Task]
        A list of tasks to execute as part of the data preparation stage.
    force : bool, optional
        Whether to force execution if the destination dataset endpoint already exists (default is False).
    repo : DatasetRepo, optional
        A repository for dataset persistence, injected via dependency injection (default is `DiscoverContainer.repo.dataset_repo`).
    **kwargs : dict
        Additional keyword arguments for stage configuration.

    Attributes
    ----------
    _repo : DatasetRepo
        The repository instance used for dataset persistence.
    _source_asset_id : str
        The generated asset ID for the source dataset based on the configuration.
    _destination_asset_id : str
        The generated asset ID for the destination dataset based on the configuration.
    _logger : logging.Logger
        Logger instance for logging events related to the data preparation stage.

    Methods
    -------
    run() -> None
        Executes the stage by loading the source dataset, applying tasks, and saving the result.
    _create_destination_dataset(data: Union[pd.DataFrame, pyspark.sql.DataFrame]) -> Dataset
        Creates the destination dataset with the processed data and configuration details.
    _load_source_dataset() -> Dataset
        Loads the source dataset from the repository using the source asset ID.
    _save_destination_dataset(dataset: Dataset) -> None
        Saves the processed dataset to the repository using the destination asset ID.
    _endpoint_exists(asset_id: str) -> bool
        Checks if the dataset endpoint already exists in the repository.

    Notes
    -----
    The `DataPrepStage` class leverages dependency injection to retrieve a dataset repository instance.
    It ensures that datasets are properly loaded and saved based on the specified configurations.
    """

    @inject
    def __init__(
        self,
        source_config: dict,
        destination_config: dict,
        tasks: List[Task],
        force: bool = False,
        repo: DatasetRepo = Provide[DiscoverContainer.repo.dataset_repo],
        **kwargs,
    ) -> None:
        super().__init__(
            source_config=source_config,
            destination_config=destination_config,
            tasks=tasks,
            force=force,
        )
        self._repo = repo

        self._destination_asset_id = AssetIDGen.get_asset_id(
            asset_type=self._destination_config.asset_type,
            phase=PhaseDef.from_value(value=self._destination_config.phase),
            stage=DataPrepStageDef.from_value(value=self._destination_config.stage),
            name=self._destination_config.name,
        )

        self._logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}")

    @property
    def phase(self) -> PhaseDef:
        return PhaseDef.from_value(value=self._destination_config.phase)

    @property
    def stage(self) -> PhaseDef:
        return DataPrepStageDef.from_value(value=self._destination_config.stage)

    @stage_logger
    def run(self) -> str:
        """Executes the stage by loading the source dataset, applying tasks, and saving the result.

        Returns:
            asset_id (str): Returns the asset_id for the asset created.
        """
        if (
            self._endpoint_exists(asset_id=self._destination_asset_id)
            and not self._force
        ):
            return self._destination_asset_id
        else:
            if self._repo.exists(asset_id=self._destination_asset_id):
                self._repo.remove(asset_id=self._destination_asset_id)

            data = self._load_source_data()

            for task in self._tasks:
                data = task.run(data=data)

            dataset = self._create_destination_dataset(data=data)

            self._save_destination_dataset(dataset=dataset)

            return self._destination_asset_id

    def _endpoint_exists(self, asset_id: str) -> bool:
        """Checks if the dataset endpoint already exists in the repository."""
        return self._repo.exists(asset_id=asset_id)

    def _load_source_data(self) -> pd.DataFrame:
        """Loads the source dataset from the repository using the source asset ID."""
        source_asset_id = AssetIDGen.get_asset_id(
            asset_type=self._source_config.asset_type,
            phase=PhaseDef.from_value(value=self._source_config.phase),
            stage=DataPrepStageDef.from_value(value=self._source_config.stage),
            name=self._source_config.name,
        )
        dataset = self._repo.get(
            asset_id=source_asset_id,
            distributed=self._source_config.distributed,
            nlp=self._source_config.nlp,
        )

        if self._source_config.distributed:
            # Rename the pandas index column if it exists
            if "__index_level_0__" in dataset.content.columns:
                dataset.content = dataset.content.withColumnRenamed(
                    "__index_level_0__", "pandas_index"
                )
        return dataset.content

    def _create_destination_dataset(
        self, data: Union[pd.DataFrame, pyspark.sql.DataFrame]
    ) -> Dataset:
        """Creates the destination dataset with the processed data and configuration details."""
        return Dataset(
            phase=PhaseDef.from_value(self._destination_config.phase),
            stage=DataPrepStageDef.from_value(self._destination_config.stage),
            name=self._destination_config.name,
            content=data,
            nlp=self._destination_config.nlp,
            distributed=self._destination_config.distributed,
        )

    def _remove_destination_dataset(self) -> None:
        """Removes the destination dataset from the repository."""
        self._repo.remove(asset_id=self._destination_asset_id)

    def _save_destination_dataset(self, dataset: Dataset) -> None:
        """Saves the processed dataset to the repository using the destination asset ID."""
        self._repo.add(dataset=dataset)

## Summary

In this notebook, we defined the tasks comprising the **Text Quality Analysis Pipeline** and constructed the pipeline stage responsible for its orchestration. Starting with its configuration, we constructed a Stage object, which invoked a Task builder that dynamically instantiated the Task classes based upon configuration and added them to the `DataPrepStage` object. The run method will execute the pipeline and persist the results in the dataset repository. 

This process is encapsulated in the following code cell, which will now run the **Text Quality Analysis Pipeline**. 


In [None]:
# Obtain the configuration
reader = FlowConfigReader()
stage_config = reader.get_stage_config(
    phase=PhaseDef.DATAPREP, stage=DataPrepStageDef.TQA
)

# Build and run Data Ingestion Stage
stage = TQAStage.build(stage_config=stage_config, force=FORCE)
asset_id = stage.run()

## Summary and Transition to the Data Quality Analysis (DQA):
With the **Text Quality Analysis (TQA) Pipeline** now complete, we have the linguistic elements that contribute to a holistic assessment of text quality for NLP applications. These enriched text quality measures are determinative inputs for our next stage: the **Data Quality Analysis (DQA)**. 

In the DQA, we’ll dilate our aperture, integrating sentiments, typographical, and linguistic metrics across several dimensions of data quality, allowing us to uncover areas of concern, and devise further data processing interventions. 