## Multi-labeled

In [None]:
file_path1 = 'Kian.jsonl'
file_path2 = 'Annina.jsonl'

In [None]:
import json
import pandas as pd
import numpy as np
from sklearn.metrics import cohen_kappa_score
from collections import defaultdict
from itertools import combinations

# Define separate label sets for each task
label_mapping_speaker = ['Speaker 1', 'Speaker 2', 'Interviewee', 'Interviewer', 'Instructor']
label_mapping_emotion = ['Joy', 'Sadness', 'Anger', 'Fear', 'Surprise', 'Disgust', 'Trust', 'Anticipation']

# Group IDs (modify as needed)
group1_ids = {1, 2, 3, 4}
group2_ids = {9, 11, 15}
group3_ids = {5, 6, 7, 8, 10, 12, 13, 14, 16}
group_definitions = {'Group 1': group1_ids, 'Group 2': group2_ids, 'Group 3': group3_ids}

# Function to load JSONL data
def load_jsonl(file_path):
    data = []
    with open(file_path, 'r') as f:
        for line in f:
            data.append(json.loads(line.strip()))
    return data

# Function to aggregate annotations for each group
def aggregate_annotations_multilabel(spans, group_ids):
    aggregated_text = ""
    aggregated_annotations = []
    offset = 0

    for doc in spans:
        if doc['id'] in group_ids:
            text = doc['text']
            annotations = [(start + offset, end + offset, label) for start, end, label in doc['annotations']]
            aggregated_text += text
            aggregated_annotations.extend(annotations)
            offset += len(text)  # Adjust offset for next document in group

    return aggregated_text, aggregated_annotations

# Create one-hot encoding for multi-label annotations
def one_hot_encode_annotations(annotations, labels):
    encoding = []
    label_to_idx = {label: i for i, label in enumerate(labels)}
    for label_set in annotations:
        vec = [0] * len(labels)
        for label in label_set:
            if label in label_to_idx:
                vec[label_to_idx[label]] = 1
        encoding.append(vec)
    return encoding

# Jaccard similarity calculation
def jaccard_similarity_for_labels(annotator1, annotator2):
    """Calculate Jaccard Similarity for overlapping labels."""
    similarities = []
    for labels1, labels2 in zip(annotator1, annotator2):
        if labels1 or labels2:  # Only calculate for labeled positions
            intersection = len(set(labels1) & set(labels2))
            union = len(set(labels1) | set(labels2))
            similarity = intersection / union if union > 0 else 0
            similarities.append(similarity)
    return np.mean(similarities)

# Pad vectors to ensure consistent length for comparison
def pad_to_match_length(vec1, vec2, pad_value=0):
    """Pad two lists to match the length of the longer list."""
    max_len = max(len(vec1), len(vec2))
    return vec1 + [pad_value] * (max_len - len(vec1)), vec2 + [pad_value] * (max_len - len(vec2))

# Calculate multi-label agreement for grouped data for each task separately
def calculate_multi_label_agreement_for_tasks(spans_annotator1, spans_annotator2, labels_speaker, labels_emotion, group_definitions):
    results = []
    for group_name, group_ids in group_definitions.items():
        # Aggregate annotations for each group
        text_1, annotations_1 = aggregate_annotations_multilabel(spans_annotator1, group_ids)
        text_2, annotations_2 = aggregate_annotations_multilabel(spans_annotator2, group_ids)

        # Convert aggregated annotations to list of labels per position for each annotator
        annotator1_labels = [[] for _ in range(len(text_1))]
        annotator2_labels = [[] for _ in range(len(text_2))]

        for start, end, label in annotations_1:
            for i in range(start, end):
                annotator1_labels[i].append(label)

        for start, end, label in annotations_2:
            for i in range(start, end):
                annotator2_labels[i].append(label)

        # Speaker Identification Task
        speaker_jaccard = jaccard_similarity_for_labels(
            [[label for label in labels if label in labels_speaker] for labels in annotator1_labels],
            [[label for label in labels if label in labels_speaker] for labels in annotator2_labels]
        )

        speaker_encoded1 = one_hot_encode_annotations(
            [[label for label in labels if label in labels_speaker] for labels in annotator1_labels],
            labels_speaker
        )
        speaker_encoded2 = one_hot_encode_annotations(
            [[label for label in labels if label in labels_speaker] for labels in annotator2_labels],
            labels_speaker
        )

        speaker_kappas = []
        for label_idx in range(len(labels_speaker)):
            label_vec1 = [pos[label_idx] for pos in speaker_encoded1]
            label_vec2 = [pos[label_idx] for pos in speaker_encoded2]
            padded_vec1, padded_vec2 = pad_to_match_length(label_vec1, label_vec2)
            if any(padded_vec1) or any(padded_vec2):
                speaker_kappas.append(cohen_kappa_score(padded_vec1, padded_vec2))
        speaker_kappa = np.mean(speaker_kappas) if speaker_kappas else None

        # Emotion Detection Task
        emotion_jaccard = jaccard_similarity_for_labels(
            [[label for label in labels if label in labels_emotion] for labels in annotator1_labels],
            [[label for label in labels if label in labels_emotion] for labels in annotator2_labels]
        )

        emotion_encoded1 = one_hot_encode_annotations(
            [[label for label in labels if label in labels_emotion] for labels in annotator1_labels],
            labels_emotion
        )
        emotion_encoded2 = one_hot_encode_annotations(
            [[label for label in labels if label in labels_emotion] for labels in annotator2_labels],
            labels_emotion
        )

        emotion_kappas = []
        for label_idx in range(len(labels_emotion)):
            label_vec1 = [pos[label_idx] for pos in emotion_encoded1]
            label_vec2 = [pos[label_idx] for pos in emotion_encoded2]
            padded_vec1, padded_vec2 = pad_to_match_length(label_vec1, label_vec2)
            if any(padded_vec1) or any(padded_vec2):
                emotion_kappas.append(cohen_kappa_score(padded_vec1, padded_vec2))
        emotion_kappa = np.mean(emotion_kappas) if emotion_kappas else None

        # Append results for this group for each task
        results.append({
            'group': group_name,
            'speaker_jaccard': speaker_jaccard,
            'speaker_kappa': speaker_kappa,
            'emotion_jaccard': emotion_jaccard,
            'emotion_kappa': emotion_kappa
        })

    return pd.DataFrame(results)

# Load your files and run
data_annotator1 = load_jsonl(file_path1)
data_annotator2 = load_jsonl(file_path2)

# Prepare spans for both annotators
def get_spans_for_jaccard(data):
    spans = []
    for doc in data:
        text = doc['text']
        doc_annotations = [(start, end, label) for start, end, label in doc['label']]
        spans.append({'id': doc['id'], 'text': text, 'annotations': doc_annotations})
    return spans

spans1 = get_spans_for_jaccard(data_annotator1)
spans2 = get_spans_for_jaccard(data_annotator2)

# Run agreement calculation for each group and task
agreement_results_grouped_tasks = calculate_multi_label_agreement_for_tasks(
    spans1, spans2, label_mapping_speaker, label_mapping_emotion, group_definitions
)

# Display the results
print(file_path1,file_path2)
print(agreement_results_grouped_tasks)


Lea.jsonl Kian.jsonl
     group  speaker_jaccard  speaker_kappa  emotion_jaccard  emotion_kappa
0  Group 1         0.918472       0.853876         0.089169       0.054961
1  Group 2         0.462954       0.424625         0.082166       0.076222
2  Group 3         0.000000       0.000000         0.176105       0.180288


## Labeled Only

In [None]:
import json
import pandas as pd
import numpy as np
from collections import defaultdict
from itertools import combinations

# Define separate label sets for each task
label_mapping_speaker = ['Speaker 1', 'Speaker 2', 'Interviewee', 'Interviewer', 'Instructor']
label_mapping_emotion = ['Joy', 'Sadness', 'Anger', 'Fear', 'Surprise', 'Disgust', 'Trust', 'Anticipation']

# Group IDs (modify as needed)
group1_ids = {1, 2, 3, 4}
group2_ids = {9, 11, 15}
group3_ids = {5, 6, 7, 8, 10, 12, 13, 14, 16}
group_definitions = {'Group 1': group1_ids, 'Group 2': group2_ids, 'Group 3': group3_ids}

# Function to load JSONL data
def load_jsonl(file_path):
    data = []
    with open(file_path, 'r') as f:
        for line in f:
            data.append(json.loads(line.strip()))
    return data

# Function to aggregate annotations for each group
def aggregate_annotations_multilabel(spans, group_ids):
    aggregated_text = ""
    aggregated_annotations = []
    offset = 0

    for doc in spans:
        if doc['id'] in group_ids:
            text = doc['text']
            annotations = [(start + offset, end + offset, label) for start, end, label in doc['annotations']]
            aggregated_text += text
            aggregated_annotations.extend(annotations)
            offset += len(text)  # Adjust offset for next document in group

    return aggregated_text, aggregated_annotations

# Create one-hot encoding for multi-label annotations
def one_hot_encode_annotations(annotations, labels):
    encoding = []
    label_to_idx = {label: i for i, label in enumerate(labels)}
    for label_set in annotations:
        vec = [0] * len(labels)
        for label in label_set:
            if label in label_to_idx:
                vec[label_to_idx[label]] = 1
        encoding.append(vec)
    return encoding

# Jaccard similarity calculation
def jaccard_similarity_for_labels(annotator1, annotator2):
    """Calculate Jaccard Similarity for overlapping labels."""
    similarities = []
    for labels1, labels2 in zip(annotator1, annotator2):
        if labels1 and labels2:  # Only calculate for labeled positions
            intersection = len(set(labels1) & set(labels2))
            union = len(set(labels1) | set(labels2))
            similarity = intersection / union if union > 0 else 0
            similarities.append(similarity)
    return np.mean(similarities)

# Calculate multi-label agreement for grouped data for each task separately
def calculate_multi_label_agreement_for_tasks(spans_annotator1, spans_annotator2, labels_speaker, labels_emotion, group_definitions):
    results = []
    for group_name, group_ids in group_definitions.items():
        # Aggregate annotations for each group
        text_1, annotations_1 = aggregate_annotations_multilabel(spans_annotator1, group_ids)
        text_2, annotations_2 = aggregate_annotations_multilabel(spans_annotator2, group_ids)

        # Convert aggregated annotations to list of labels per position for each annotator
        annotator1_labels = [[] for _ in range(len(text_1))]
        annotator2_labels = [[] for _ in range(len(text_2))]

        for start, end, label in annotations_1:
            for i in range(start, end):
                annotator1_labels[i].append(label)

        for start, end, label in annotations_2:
            for i in range(start, end):
                annotator2_labels[i].append(label)

        # Speaker Identification Task
        speaker_jaccard = jaccard_similarity_for_labels(
            [[label for label in labels if label in labels_speaker] for labels in annotator1_labels],
            [[label for label in labels if label in labels_speaker] for labels in annotator2_labels]
        )

        # Emotion Detection Task
        emotion_jaccard = jaccard_similarity_for_labels(
            [[label for label in labels if label in labels_emotion] for labels in annotator1_labels],
            [[label for label in labels if label in labels_emotion] for labels in annotator2_labels]
        )

        # Append results for this group for each task
        results.append({
            'group': group_name,
            'speaker_jaccard': speaker_jaccard,
            'emotion_jaccard': emotion_jaccard
        })

    return pd.DataFrame(results)

# Load your files and run
data_annotator1 = load_jsonl(file_path1)
data_annotator2 = load_jsonl(file_path2)

# Prepare spans for both annotators
def get_spans_for_jaccard(data):
    spans = []
    for doc in data:
        text = doc['text']
        doc_annotations = [(start, end, label) for start, end, label in doc['label']]
        spans.append({'id': doc['id'], 'text': text, 'annotations': doc_annotations})
    return spans

spans1 = get_spans_for_jaccard(data_annotator1)
spans2 = get_spans_for_jaccard(data_annotator2)

# Run agreement calculation for each group and task
agreement_results_grouped_tasks = calculate_multi_label_agreement_for_tasks(
    spans1, spans2, label_mapping_speaker, label_mapping_emotion, group_definitions
)

# Display the results
print(file_path1,file_path2)
print(agreement_results_grouped_tasks)


Lea.jsonl Kian.jsonl
     group  speaker_jaccard  emotion_jaccard
0  Group 1         0.928014         0.386147
1  Group 2         0.482335         0.291584
2  Group 3              NaN         0.441328


  return _methods._mean(a, axis=axis, dtype=dtype,
  ret = ret.dtype.type(ret / rcount)


## Sentiment Only

In [None]:
# Define positive and negative emotions based on the user's instruction
positive_emotions = {'Joy', 'Surprise', 'Trust', 'Anticipation'}
negative_emotions = {'Sadness', 'Anger', 'Fear', 'Disgust'}

# Function to classify labels as positive or negative
def classify_sentiment_labels(labels, positive_emotions, negative_emotions):
    """Classify a list of labels as 'positive', 'negative', or [] if neither."""
    sentiment_labels = []
    for label_set in labels:
        if any(label in positive_emotions for label in label_set):
            sentiment_labels.append("positive")
        elif any(label in negative_emotions for label in label_set):
            sentiment_labels.append("negative")
        else:
            sentiment_labels.append([])  # Empty if no sentiment category applies
    return sentiment_labels

# Function to calculate sentiment agreement
def calculate_sentiment_agreement(spans_annotator1, spans_annotator2, positive_emotions, negative_emotions, group_definitions):
    results = []
    for group_name, group_ids in group_definitions.items():
        # Aggregate annotations for each group
        text_1, annotations_1 = aggregate_annotations_multilabel(spans_annotator1, group_ids)
        text_2, annotations_2 = aggregate_annotations_multilabel(spans_annotator2, group_ids)

        # Convert aggregated annotations to list of labels per position for each annotator
        annotator1_labels = [[] for _ in range(len(text_1))]
        annotator2_labels = [[] for _ in range(len(text_2))]

        for start, end, label in annotations_1:
            for i in range(start, end):
                annotator1_labels[i].append(label)

        for start, end, label in annotations_2:
            for i in range(start, end):
                annotator2_labels[i].append(label)

        # Classify labels by sentiment (positive or negative)
        annotator1_sentiment = classify_sentiment_labels(annotator1_labels, positive_emotions, negative_emotions)
        annotator2_sentiment = classify_sentiment_labels(annotator2_labels, positive_emotions, negative_emotions)

        # Filter to only keep positions where both annotators have classified a sentiment
        sentiment_jaccard = jaccard_similarity_for_labels(
            [[sentiment] for sentiment in annotator1_sentiment if sentiment],
            [[sentiment] for sentiment in annotator2_sentiment if sentiment]
        )

        # Append results for this group
        results.append({
            'group': group_name,
            'sentiment_jaccard': sentiment_jaccard
        })

    return pd.DataFrame(results)

# Run sentiment agreement calculation
sentiment_agreement_results = calculate_sentiment_agreement(
    spans1, spans2, positive_emotions, negative_emotions, group_definitions
)

# Display the results
print(file_path1,file_path2)
print(sentiment_agreement_results)


Kian.jsonl Inka.jsonl
    group  sentiment_jaccard
0  Group1           0.634883
1  Group2           0.757422
2  Group3           0.794830


## Speaker switch

In [None]:
def aggregate_annotations_multilabel(spans, group_ids):
    """Aggregate annotations from spans for a specific group."""
    text = ''
    annotations = []
    for span in spans:
        if span['id'] in group_ids:
            text += span['text']  # Concatenate text for the group
            annotations.extend(span['annotations'])  # Extend annotations list
    return text, annotations

def identify_speaker_switches(labels):
    """Identify positions where a speaker switch occurs in the annotation labels."""
    switch_points = [0] * len(labels)  # Initialize a list to mark switches
    for i in range(1, len(labels)):
        if labels[i] != labels[i - 1]:  # A switch occurs if the label changes from previous
            switch_points[i] = 1
    return switch_points

def get_spans_for_kappa(data):
    """Convert JSONL data into spans format expected by calculation functions."""
    spans = []
    for doc in data:
        text = doc['text']
        doc_annotations = [(start, end, label) for start, end, label in doc['label']]
        spans.append({'id': doc.get('id'), 'text': text, 'annotations': doc_annotations})
    return spans

In [None]:
# Adjust group definitions based on the user's request
group_definitions = {
    'Group1': {1, 2, 3, 4},
    'Group2': {9, 11, 15},
    'Group3': {5, 6, 7, 8, 10, 12, 13, 14, 16},
}

# List of labels to filter
valid_labels = {'Speaker 1', 'Speaker 2', 'Interviewee', 'Interviewer', 'Instructor'}

def calculate_speaker_switch_agreement_filtered(spans_annotator1, spans_annotator2, group_definitions, valid_labels):
    results = []
    for group_name, group_ids in group_definitions.items():
        # Aggregate annotations for each group
        text_1, annotations_1 = aggregate_annotations_multilabel(spans_annotator1, group_ids)
        text_2, annotations_2 = aggregate_annotations_multilabel(spans_annotator2, group_ids)

        # Convert aggregated annotations to list of labels per position for each annotator
        annotator1_labels = [[] for _ in range(len(text_1))]
        annotator2_labels = [[] for _ in range(len(text_2))]

        for start, end, label in annotations_1:
            if label in valid_labels:  # Only include valid labels
                for i in range(start, end):
                    annotator1_labels[i].append(label)

        for start, end, label in annotations_2:
            if label in valid_labels:  # Only include valid labels
                for i in range(start, end):
                    annotator2_labels[i].append(label)

        # Simplify to primary speaker label per position, ignore multi-label
        annotator1_primary_labels = [labels[0] if labels else None for labels in annotator1_labels]
        annotator2_primary_labels = [labels[0] if labels else None for labels in annotator2_labels]

        # Identify switch points for each annotator
        annotator1_switches = identify_speaker_switches(annotator1_primary_labels)
        annotator2_switches = identify_speaker_switches(annotator2_primary_labels)

        # Calculate Cohen's Kappa for switch points
        switch_kappa = cohen_kappa_score(annotator1_switches, annotator2_switches)

        # Append results for this group
        results.append({
            'group': group_name,
            'switch_kappa': switch_kappa
        })

    return pd.DataFrame(results)

# Calculate Kappa for the filtered data
speaker_switch_agreement_results_filtered = calculate_speaker_switch_agreement_filtered(spans1, spans2, group_definitions, valid_labels)

# Display the results
print(file_path1,file_path2)
print(speaker_switch_agreement_results_filtered)


Kian.jsonl Inka.jsonl
    group  switch_kappa
0  Group1      0.707508
1  Group2      0.707756
2  Group3      0.000000
