In [1]:
import json
import os
import sys
from typing import Any

sys.path.append(os.path.abspath(".."))

from eda.parsing import Conversations, Participants
from eda.utils import FOLDER_DIR

DATA_PATH = FOLDER_DIR / "data"

def export_json_data(name: str, data: dict[str, Any]):
    with DATA_PATH.joinpath(name).open("w") as fp:
        json.dump(data, fp, indent=4, ensure_ascii=False)


def create_json_data() -> dict[str, Any]:
    return {"metadata": {}, "data": {}}

participants = Participants()
conversations = Conversations(participants)

### Question 1

In [2]:
conversations.read_all(parallel=True, load_sentiments=True)

In [3]:
from collections import Counter
from collections.abc import Iterable

from eda.models import ConversationLine, Generation
from eda.sentiments import SentimentType
from eda.utils import round_precise


def sentiments_from_lines(
    lines: Iterable[ConversationLine], exclude_true_neutrals: bool = False
) -> tuple[list[str], list[float]]:
    sentiments = Counter()

    for line in lines:
        if not line.sentiments.has_loaded_scores():
            continue
        if not line.sentiments.has_scores():
            continue
        if exclude_true_neutrals and line.sentiments.neutral == 1.0:
            continue
        sentiments += line.sentiments.score_counts

    total_sum = sum(sentiments.values())
    names, scores = zip(
        *(
            (
                SentimentType(sentiment_name).display_name,
                round_precise(score / total_sum * 100, 2),
            )
            for sentiment_name, score in sentiments.items()
        )
    )
    return list(names), list(scores)


def sentiment_percentages() -> list[dict[str, Any]]:
    lines_by_generation = Generation.create_mapping()
    for conversation in conversations:
        for line in conversation.lines:
            generation = line.participant.generation
            lines_by_generation[generation].append(line)

    sentiment_names = None
    generation_names = []
    data = []

    for generation, lines in lines_by_generation.items():
        generation_names.append(generation.name)
        sentiment_names, scores = sentiments_from_lines(
            lines, exclude_true_neutrals=True
        )

        total = sum(scores)
        proportions = [
            (sentiment_name, score / total * 100)
            for sentiment_name, score in zip(sentiment_names, scores)
        ]
        
        data.append({generation.name: dict(proportions)})

    return data

data = create_json_data()
data["metadata"]["title"] = "Sentiment percentages per generation"
data["data"] = sentiment_percentages()
export_json_data("sentiment_percentages.json", data)

In [4]:
conversations.read_all(parallel=True, load_prosodic=True)

In [5]:

from collections import defaultdict
from collections.abc import Generator

from eda.models import Participant


def human_name_from_snake_case(name: str) -> str:
    return " ".join(name.split("_")).capitalize()


def filter_prosodic_attributes(
    line: ConversationLine,
) -> Generator[tuple[str, str]]:
    yield from (
        (name, value) for name, value in vars(line).items() 
        if name.endswith("phrases")
        and name != "overlapping_phrases"
    )


def prosodic_frequencies(participant: Participant) -> dict[str, float]:
    participant_lines = conversations.participant_lines(participant)
    participant_data = defaultdict(int)
    n_lines = 0
    for line in participant_lines:
        for name, value in filter_prosodic_attributes(line):
            participant_data[name] += len(value)
        n_lines += 1

    norm_participant_data = {
        key: value / n_lines for key, value in participant_data.items()
    }
    return norm_participant_data


def prosodic_counts_by_generation(data: dict[Generation, Any]) -> dict[str, dict[str, float]]:
    for participant in participants:
        generation = participant.generation
        data[generation].append(prosodic_frequencies(participant))

    result = {}
    for generation, participants_frequencies in data.items():
        total_generation_counts = Counter()
        for participant_frequencies in participants_frequencies:
            counts = Counter(participant_frequencies)
            total_generation_counts += counts

        result[generation.name] = {
            human_name_from_snake_case(prosodic_feature): count
            / len(participants_frequencies)
            for prosodic_feature, count in total_generation_counts.items()
        }
    return result


generation_map = Generation.create_mapping()
counts = prosodic_counts_by_generation(generation_map)
data = create_json_data()
data["metadata"]["title"] = "Average prosodic feature frequency per line by generation"
data["data"] = counts
export_json_data("prosodic_features.json", data)

### Question 2

In [None]:
conversations.read_all(parallel=True, load_tagged=True)

In [17]:
from dataclasses import dataclass
from typing import Optional

from eda.language import TaggedText

TOP_N_LEMMAS = 20
ALLOWED_POS_VALUES = None
PER_WORDS = 1000

@dataclass(frozen=True)
class PosTaggedLemma:
    lemma: str
    pos_name: str


@dataclass(frozen=True)
class GenerationLemmaInfo:
    pt_lemmas: list[PosTaggedLemma]
    lemma_counts: list[int]
    n_total_words: int


def top_lemmas_by_generation(
    conversations,
    *,
    top_n: int = 10,
    min_lemma_length: int = 3,
    allowed_pos_values: Optional[set] = None,
) -> dict[Generation, GenerationLemmaInfo]:
    text_by_generation: dict[Generation, list[TaggedText]] = Generation.create_mapping()

    for conversation in conversations:
        for line in conversation:
            generation = line.participant.generation
            text_by_generation[generation].extend(line.tagged)

    lemmas_by_generation = {}
    for generation, words in text_by_generation.items():
        words_by_lemma = defaultdict(list)
        for word in words:
            if len(word.lemma) < min_lemma_length:
                continue
            if allowed_pos_values is not None and word.pos not in allowed_pos_values:
                continue
            pt_lemma = PosTaggedLemma(word.lemma, word.pos_name)
            words_by_lemma[pt_lemma].append(word)

        counts = Counter({
            pt_lemma: len(group) for pt_lemma, group in words_by_lemma.items()
        })

        lemmas, frequencies = zip(*counts.most_common(top_n))
        lemmas_by_generation[generation] = GenerationLemmaInfo(
            list(lemmas), list(frequencies), len(words)
        )

    return lemmas_by_generation

def important_lemmas() -> dict[str, Any]:
    top_lemmas = top_lemmas_by_generation(
        conversations, top_n=TOP_N_LEMMAS, allowed_pos_values=ALLOWED_POS_VALUES
    )

    result = {key.name: {} for key in Generation.create_mapping()}
    for generation, info in top_lemmas.items():
        pt_lemmas = info.pt_lemmas
        counts = info.lemma_counts
        gen_name = generation.name
        for ptl, count in zip(pt_lemmas, counts):
            lemma_frequency = round_precise((count / info.n_total_words) * PER_WORDS, 2)
            result[gen_name][ptl.lemma] = lemma_frequency
    return result


data = create_json_data()
data["metadata"]["title"] = "Top lemmas by generation"
data["metadata"]["top_n_lemmas"] = TOP_N_LEMMAS
data["metadata"]["per_n_words"] = PER_WORDS
data["data"] = important_lemmas()
export_json_data("top_lemmas.json", data)

### Question 3

In [None]:

from collections.abc import Callable
from functools import cache

from eda.language import AttributedWord
from eda.models import MacroRegion, ParticipantLines


def participant_macro_region(participant: Participant) -> MacroRegion:
    conversation = conversations.conversation(participant.conversation_code)
    return conversation.macro_region


def generate_participant_words(lines: ParticipantLines) -> Generator[AttributedWord]:
    for line in lines:
        yield from filter(lambda word: word.is_linguistic, line.normalised_words)

@cache
def participants_dialect_percentages(
    *, rounder: Callable[[int | float, int], int | float] = round_precise
) -> list[int | float]:
    percentages = []

    for participant in participants:
        participant_lines = conversations.participant_lines(participant)
        dialect_words = total_words = 0
        for word in generate_participant_words(participant_lines):
            dialect_words += word.is_dialect(strict=False)
            total_words += 1

        percentange_of_dialect_words = rounder(dialect_words / total_words * 100, 2)
        percentages.append(percentange_of_dialect_words)

    return percentages

def labeled_percentages() -> list[dict[str, Any]]:
    result = {
        key.name: value 
        for key, value in Generation.create_mapping().items()
    }
    
    region_percentages = defaultdict(float)

    dialect_percentages = iter(participants_dialect_percentages())
    for participant in participants:
        region = participant.geographic_origin
        percentage = next(dialect_percentages)
        participant_data = {
            "region": region,
            "dialect_percentage": percentage,
            "macro_region": participant.macro_region.name.lower()
        }
        region_percentages[region] += percentage
        result[participant.generation.name].append(participant_data)
    return result  # type: ignore

data = create_json_data()
data["metadata"]["title"] = "Dialect word percentages"
data["data"] = labeled_percentages()
export_json_data("dialect_percentages.json", data)