# Code that creates annotations from ELAN files

This code will convert elan files to txt files with the structure:

Speaker X: 

    tier1:something
    tier2:something
    tier3:something

And so on...

In [None]:
from xml.etree import ElementTree as ET
from collections import defaultdict

def extract_speaker_tiers(file_path):
    # Parse the EAF file 
    tree = ET.parse(file_path)
    root = tree.getroot() 

    # Create a nested dictionary to store annotations for each speaker
    speaker_annotations = defaultdict(lambda: defaultdict(list))

    # Itereate through each tier in the EAF file
    for tier in root.findall(".//TIER"):
        tier_name = tier.attrib.get('TIER_ID')

        # Identify the base name for the speaker and the type of tier (po, tn, dt, mb, gl)
        if "_" in tier_name: 
            speaker_base, tier_type = tier_name.rsplit("_", 1)
        else: 
            # For tiers that don't follow the pattern, we skip them
            continue

        # Iterate through each alignable annotation in the tier
        for annotation in tier.findall(".//ALIGNABLE_ANNOTATION"):

            ''' OMG this is O(n^2) but I can't think of a better way to do this right now)'''
            
            time_slot_ref1 = annotation.attrib.get('TIME_SLOT_REF1')
            time_slot_ref2 = annotation.attrib.get('TIME_SLOT_REF2')
            time_stamp_pair = (time_slot_ref1, time_slot_ref2)

            # Extract the annotation value
            anno_value_elem = annotation.find('ANNOTATION_VALUE')
            anno_value = anno_value_elem.text if anno_value_elem is not None else "None"

            # Store the annotation
            speaker_annotations[speaker_base][tier_type].append((time_stamp_pair, anno_value))

        # Iterate through each refence annotation in the tier to capture sub-tiers
        for ref_annotation in tier.findall(".//REF_ANNOTATION"):
            ref_annotation_id = ref_annotation.attrib.get('ANNOTATION_ID')
            annotation_ref = ref_annotation.attrib.get('ANNOTATION_REF')

            # Extract the annotation value
            anno_value_elem = ref_annotation.find('.//ANNOTATION_VALUE')
            anno_value = anno_value_elem.text if anno_value_elem is not None else "None"

            # Store annotation 
            speaker_annotations[speaker_base][tier_type].append((annotation_ref, anno_value))
        
    return speaker_annotations

In [None]:
# Function to merge speakers' tiers
def merge_speakers_tiers(speakers_tiers, speakers_to_merge):
    merged_speakers_tiers = {}
    
    # Loop through the speakers and their tiers
    for speaker, tiers in speakers_tiers.items():
        
        # Check if the speaker needs to be merged with another speaker
        if speaker in speakers_to_merge:
            # Create a new dictionary entry for the speaker and combine the tiers
            merged_tiers = {}
            for to_merge in [speaker] + speakers_to_merge[speaker]:
                merged_tiers.update(speakers_tiers.get(to_merge, {}))
            merged_speakers_tiers[speaker] = merged_tiers
        elif not any(speaker in to_merge_list for to_merge_list in speakers_to_merge.values()):
            # If the speaker is not in the merge list, keep it as is
            merged_speakers_tiers[speaker] = tiers
    
    return merged_speakers_tiers

In [15]:
def save_speaker_annotations_to_file(speaker_annotations, save_path):
    with open(save_path, 'w') as f:
        for speaker, tiers in speaker_annotations.items():
            f.write(f"Speaker: {speaker}\n")
            for tier_type, annotations in tiers.items():
                f.write(f"  Tier Type: {tier_type}, Annotations Count: {len(annotations)}\n")
                for time_stamp_pair, anno_value in annotations:
                    f.write(f"    Time Stamp: {time_stamp_pair}, Annotation: {anno_value}\n")

def read_saved_speaker_annotations_from_file(file_path):
    with open(file_path, 'r') as f:
        lines = f.readlines()
    
    current_speaker = None
    current_tier_type = None
    speaker_annotations = defaultdict(lambda: defaultdict(list))

    for line in lines:
        line = line.strip()
        if line.startswith("Speaker:"):
            current_speaker = line.split("Speaker: ")[1]
        elif line.startswith("Tier Type:"):
            current_tier_type = line.split("Tier Type: ")[1].split(",")[0]
        elif line.startswith("Time Stamp:"):
            time_stamp_str = line.split("Time Stamp: ")[1].split(", Annotation:")[0]
            time_stamp_pair = tuple(time_stamp_str[1:-1].split(", "))
            anno_value = line.split("Annotation: ")[1]
            speaker_annotations[current_speaker][current_tier_type].append((time_stamp_pair, anno_value))

    return speaker_annotations

In [None]:
import os

# Define directories
input_directory = 'Data/Simeon/Floyd ELDP 2023 deposit'  # Replace with your input directory
output_directory_annotations = 'Data/Simeon/Floyd_unmerged'  # Replace with your output directory for annotations
output_directory_merged = 'Data/Simeon/Floyd_merged'  # Replace with your output directory for merged annotations

# Create directories if they don't exist
if not os.path.exists(output_directory_annotations):
    os.makedirs(output_directory_annotations)

if not os.path.exists(output_directory_merged):
    os.makedirs(output_directory_merged)

In [None]:

# First Loop: Read each ELAN file, extract the annotations, and save them to a file
for filename in os.listdir(input_directory):
    if filename.endswith(".eaf"):
        elan_file_path = os.path.join(input_directory, filename)
        speaker_annotations = extract_speaker_tiers(elan_file_path)
        save_path = os.path.join(output_directory_annotations, f"{filename}.txt")
        save_speaker_annotations_to_file(speaker_annotations, save_path)

In [None]:
import os
import pandas as pd

# Initialize an empty list to store the rows
data_rows = []

# Directory path where the text files are stored
txt_dir_path = "Data/Simeon/Floyd_unmerged"

# Iterate through each text file in the directory
for filename in os.listdir(txt_dir_path):
    if filename.endswith(".txt"):
        # Initialize variables to keep track of the current speaker and tier labels
        current_speaker = None
        tier_labels = []
        
        # Open the text file and read its contents
        with open(os.path.join(txt_dir_path, filename), 'r') as file:
            for line in file.readlines():
                line = line.strip()
                
                # Detect speaker lines and tier lines
                if line.startswith('Speaker: '):
                    # Save the previous speaker's information to the DataFrame
                    if current_speaker:
                        data_rows.append({
                            'File': filename,
                            'Speaker': current_speaker,
                            'Tier_Labels': ', '.join(tier_labels),
                        })
                    
                    # Reset the current speaker and tier labels
                    current_speaker = line.replace('Speaker: ', '')
                    tier_labels = []
                elif line.startswith('Tier Type: '):
                    tier_label = line.replace('Tier Type: ', '')
                    tier_labels.append(tier_label)
            
            # Save the last speaker's information to the DataFrame
            if current_speaker:
                data_rows.append({
                    'File': filename,
                    'Speaker': current_speaker,
                    'Tier_Labels': ', '.join(tier_labels),
                })

In [None]:
# Create the DataFrame from the list of dictionaries
df = pd.DataFrame.from_records(data_rows)

# Sort the DataFrame by the File column and the speaker name
df = df.sort_values(by=['File', 'Speaker'])

# Save the DataFrame to a CSV file
df.to_csv("Data/Simeon/speakers.csv", index=False)

In [None]:
df.head()

In [None]:
# '''
# Don't use this code, it doesn't work in jupyter and should be properly corrected to work as a python script if given the case
# '''

# # Second Loop: Read each saved annotations file, print the first 3 annotations, and merge speakers based on user input
# for filename in os.listdir(output_directory_annotations):
#     if filename.endswith(".txt"):
#         annotations_file_path = os.path.join(output_directory_annotations, filename)
#         speaker_annotations = read_saved_speaker_annotations_from_file(annotations_file_path)
        
#         # Print the first 3 annotations for each speaker and tier type
#         for speaker, tiers in speaker_annotations.items():
#             print(f"Speaker: {speaker}")
#             for tier_type, annotations in tiers.items():
#                 print(f"\tTier Type: {tier_type}, \n\tFirst 3 Annotations: {annotations[:3]}")
        
#         # Ask the user which speakers to merge
#         speakers_to_merge = {}  # Example: {'Simeon': ['A']}
#         user_input = input("Please enter speakers to merge (e.g., Simeon:A,Speaker2:Speaker3) or 'skip': ")
#         if user_input != 'skip':
#             for merge_pair in user_input.split(","):
#                 main_speaker, merge_speaker = merge_pair.split(":")
#                 if main_speaker in speakers_to_merge:
#                     speakers_to_merge[main_speaker].append(merge_speaker)
#                 else:
#                     speakers_to_merge[main_speaker] = [merge_speaker]
        
#         # Merge speakers and save to a new file
#         merged_speaker_annotations = merge_speakers_tiers(speaker_annotations, speakers_to_merge)
#         save_path = os.path.join(output_directory_merged, f"merged_{filename}")
#         save_speaker_annotations_to_file(merged_speaker_annotations, save_path)

# Code to merge speakers (manually) in case they were not grouped as expected

In [11]:
# Function to achieve a transitive merge of the speakers

def transitive_merge(speakers_to_merge):
    # Initialize a dictionary to hold the transitive closure of the speakers to be merged
    transitive_speakers_to_merge = {}

    # Initialize a set to keep track of processed speakers
    processed_speakers = set()

    # Iterate through each speaker and their list of speakers to be merged
    for speaker, to_merge_list in speakers_to_merge.items():
        # Skip if this speaker has already been processed
        if speaker in processed_speakers:
            continue

        # Initialize a stack to keep track of speakers to be processed
        stack = [speaker]

        # Initialize a list to hold the transitive closure for the current speaker
        transitive_to_merge = []

        # Process the stack
        while stack:
            current_speaker = stack.pop()
            processed_speakers.add(current_speaker)

            # Add the speakers to be merged with the current speaker to the transitive list
            # and to the stack for further processing
            for to_merge in speakers_to_merge.get(current_speaker, []):
                if to_merge not in processed_speakers:
                    stack.append(to_merge)
                transitive_to_merge.append(to_merge)

        # Update the transitive speakers to merge dictionary
        transitive_speakers_to_merge[speaker] = transitive_to_merge

    return transitive_speakers_to_merge


In [12]:
def merge_speaker_tiers(speakers_tiers, speakers_to_merge):
    merged_speakers_tiers = {}
    
    # Create a set to keep track of speakers that have been merged
    merged_speaker_set = set()
    
    # Loop through the speakers and their tiers
    for speaker, tiers in speakers_tiers.items():
        
        # If the speaker has already been merged, skip
        if speaker in merged_speaker_set:
            continue
        
        # Check if the speaker needs to be merged with another speaker
        if speaker in speakers_to_merge:
            
            # Create a new dictionary entry for the speaker and combine the tiers
            merged_tiers = {}
            
            # Add current speaker's tiers
            merged_tiers.update(tiers)
            
            # Initialize a list to keep track of speakers to merge
            speakers_to_merge_list = speakers_to_merge[speaker]
            
            # Loop to handle transitive merging
            while speakers_to_merge_list:
                next_speaker = speakers_to_merge_list.pop(0)
                
                if next_speaker in speakers_tiers:
                    merged_tiers.update(speakers_tiers[next_speaker])
                    merged_speaker_set.add(next_speaker)
                    
                    if next_speaker in speakers_to_merge:
                        speakers_to_merge_list.extend(speakers_to_merge[next_speaker])
            
            merged_speakers_tiers[speaker] = merged_tiers
        else:
            # If the speaker is not in the merge list, keep it as is
            merged_speakers_tiers[speaker] = tiers
    
    return merged_speakers_tiers

In [13]:
# Directory paths
input_dir = "Data/Simeon/Floyd_unmerged"
output_dir = "Data/Simeon/Floyd_merged"

# Read the CSV file into a DataFrame
csv_path = "Data/Simeon/speakers_with_guidelines.csv"
df = pd.read_csv(csv_path)

# Group the DataFrame by 'File'
grouped = df.groupby('File')

In [17]:
df.head(10)

Unnamed: 0,File,Speaker,Tier_Labels,Merge_With
0,QUSF2018_02_03S1 _pub.txt,A,"dt, Annotations Count: 205, mb, Annotations Co...",
1,QUSF2018_02_03S1 _pub.txt,B,"dt, Annotations Count: 243, mb, Annotations Co...",
2,QUSF2018_02_03S1 _pub.txt,C,"dt, Annotations Count: 52, mb, Annotations Cou...",
3,QUSF2018_02_03S1 _pub.txt,Hermelinda Tituaña,"po, Annotations Count: 53, tn, Annotations Cou...",C
4,QUSF2018_02_03S1 _pub.txt,Rosario Tupiza,"po, Annotations Count: 247, tn, Annotations Co...",B
5,QUSF2018_02_03S1 _pub.txt,Simeon,"po, Annotations Count: 205, tn, Annotations Co...",A
6,QUSF2018_02_03S2_pub.txt,A,"po, Annotations Count: 715, dt, Annotations Co...",
7,QUSF2018_02_03S2_pub.txt,B,"po, Annotations Count: 399, dt, Annotations Co...",
8,QUSF2018_02_09S1_pub.txt,A,"dt, Annotations Count: 81, mb, Annotations Cou...",
9,QUSF2018_02_09S1_pub.txt,B,"dt, Annotations Count: 38, mb, Annotations Cou...",


In [16]:
# Iterate through each group (i.e., each file)
for file_name, group in grouped:
    # Create a dictionary to hold the speakers to merge
    speakers_to_merge = {}
    
    for _, row in group.iterrows():
        speaker = row['Speaker']
        merge_with = row['Merge_With']
        
        if pd.notna(merge_with):
            speakers_to_merge[speaker] = merge_with.split()
    
    # Get the transitive merge dictionary
    transitive_speakers_to_merge = transitive_merge(speakers_to_merge)
    
    # Read the corresponding text file
    input_file_path = os.path.join(input_dir, file_name)  # No need to add ".txt"
    
    # Read the annotations from the text file
    try:
        speakers_data = read_saved_speaker_annotations_from_file(input_file_path)
    except FileNotFoundError:
        print(f"File not found: {input_file_path}")
        continue
    
    # Merge the speakers based on transitive_speakers_to_merge
    merged_speakers_data = merge_speakers_tiers(speakers_data, transitive_speakers_to_merge)
    
    # Write the merged data back to a new text file
    output_file_path = os.path.join(output_dir, file_name)
    
    # Write the merged annotations to the output text file
    save_speaker_annotations_to_file(merged_speakers_data, output_file_path)

FileNotFoundError: [Errno 2] No such file or directory: 'Data/Simeon/Floyd_unmerged/QUSF2018_02_03S1 _pub'

# Code that worked to test the final processing

In [None]:

# Define the path to the uploaded ELAN file
elan_file_path = "Data/Simeon/Floyd ELDP 2023 deposit/QUSF2018_02_03S1 _pub.eaf"

# Exec the function and store annotations
speaker_annotations = extract_speaker_tiers(elan_file_path)

# Show the first 5 annotations for each speaker and tier type for debugging
for speaker, tiers in speaker_annotations.items():
    print(f"Speaker: {speaker}")

    for tier_type, annotations in tiers.items():
        print(f"\tTier Type: {tier_type}, \n\tFirst 3 Annotations: {annotations[:3]}")

In [None]:
# Speakers to merge
speakers_to_merge = {'Simeon': ['A'], 'Rosario Tupiza': ['B'], 'Hermelinda Tituaña': ['C']}

# Merge the speakers' tiers
merged_speakers_tiers = merge_speakers_tiers(speaker_annotations, speakers_to_merge)

# Show the first 5 annotations for each speaker and tier type for debugging
for speaker, tiers in merged_speakers_tiers.items():
    print(f"Speaker: {speaker}")

    for tier_type, annotations in tiers.items():
        print(f"\tTier Type: {tier_type}, \n\tFirst 3 Annotations: {annotations[:3]}")

# Utility functions

In [None]:
# Parse the EAF file again to take a closer look at the XML structure
tree = ET.parse(elan_file_path)
root = tree.getroot()

# Create an empty list to store tier names in the order they appear in the XML
ordered_tier_names = []

# Iterate through each tier in the EAF file to collect their names in order
for tier in root.findall(".//TIER"):
    tier_name = tier.attrib.get('TIER_ID')
    ordered_tier_names.append(tier_name)

# Display the first 10 and last 10 tier names to get a sense of the ordering
first_10_tiers = ordered_tier_names[:100]
last_10_tiers = ordered_tier_names[-100:]

first_10_tiers, last_10_tiers, len(ordered_tier_names)

In [None]:
# Re-run the function to count the number of annotations in each tier and sub-tier
def count_annotations_in_each_tier(all_layers_annotations):
    tier_annotation_counts = {}
    for tier, annotations in all_layers_annotations.items():
        tier_annotation_counts[tier] = len(annotations)
    return tier_annotation_counts

# Count the number of annotations in each tier and sub-tier
annotation_counts = count_annotations_in_each_tier(all_layers_annotations)

# Display all counts greater than 1 before the Rosario Tupiza_po tier starts
{k: v for k, v in annotation_counts.items() if v > 1}


In [None]:
# Calculate the number of tiers between 'Simeon_po' and 'Rosario Tupiza_po'
start_tier = 'Rosario Tupiza_po'
end_tier = 'Hermelinda Tituaña_po'
tiers_list = list(encountered_tiers_and_subtiers)

try:
    start_index = tiers_list.index(start_tier)
    end_index = tiers_list.index(end_tier)
    num_tiers_between = end_index - start_index - 1  # Exclude the start and end tiers themselves
except ValueError as e:
    num_tiers_between = "One of the tiers not found"

num_tiers_between

In [None]:

# Second Loop: Read each saved annotations file, print the first 3 annotations, and merge speakers based on user input
for filename in os.listdir(output_directory_annotations):
    if filename.endswith(".txt"):
        annotations_file_path = os.path.join(output_directory_annotations, filename)
        speaker_annotations = read_saved_speaker_annotations_from_file(annotations_file_path)
        
        # Print the first 3 annotations for each speaker and tier type
        for speaker, tiers in speaker_annotations.items():
            print(f"Speaker: {speaker}")
            for tier_type, annotations in tiers.items():
                print(f"\tTier Type: {tier_type}, \n\tFirst 3 Annotations: {annotations[:3]}")
        
        # Ask the user which speakers to merge
        speakers_to_merge = {}  # Example: {'Simeon': ['A']}
        user_input = input("Please enter speakers to merge (e.g., Simeon:A,Speaker2:Speaker3) or 'skip': ")
        if user_input != 'skip':
            for merge_pair in user_input.split(","):
                main_speaker, merge_speaker = merge_pair.split(":")
                if main_speaker in speakers_to_merge:
                    speakers_to_merge[main_speaker].append(merge_speaker)
                else:
                    speakers_to_merge[main_speaker] = [merge_speaker]
        
        # Merge speakers and save to a new file
        merged_speaker_annotations = merge_speakers_tiers(speaker_annotations, speakers_to_merge)
        save_path = os.path.join(output_directory_merged, f"merged_{filename}")
        save_speaker_annotations_to_file(merged_speaker_annotations, save_path)


# Discarded code for testing

In [None]:
import xml.etree.ElementTree as ET

# Parse the EAF file using ElementTree
tree = ET.parse("Data/Simeon/Floyd ELDP 2023 deposit/QUSF2018_02_03S1 _pub.eaf")
root = tree.getroot()

# Extract tier information and annotations
tier_data_xml = {}

# Iterate over the tiers in the EAF file
for tier in root.findall(".//TIER"):
    tier_name = tier.attrib.get('TIER_ID')
    annotations = []
    
    # Extract annotations for each tier
    for annotation in tier.findall(".//ALIGNABLE_ANNOTATION"):
        time_slot_ref1 = annotation.attrib.get('TIME_SLOT_REF1')
        time_slot_ref2 = annotation.attrib.get('TIME_SLOT_REF2')
        
        # Get the actual annotation value
        anno_value = annotation.find(".//ANNOTATION_VALUE").text
        annotations.append((time_slot_ref1, time_slot_ref2, anno_value))
    
    tier_data_xml[tier_name] = annotations

tier_data_xml


In [None]:
# Define the output file path
output_file_path = "Outputs/Clean txt/tests1.txt"

# Write the data to the text file
with open(output_file_path, 'w') as outfile:
    for tier_name, annotations in tier_data_xml.items():
        outfile.write(f"[{tier_name}]\n")
        for start_time, end_time, text in annotations:
            outfile.write(f"({start_time} - {end_time}) {text}\n")
        outfile.write("\n")

output_file_path

In [None]:
# Step 1: Parse the provided file to extract the content

def parse_annotation_file(file_path):
    with open(file_path, 'r', encoding='utf-8') as file:
        lines = file.readlines()

    conversations = []
    current_speaker = None
    annotations = []

    for line in lines:
        line = line.strip()
        # Check for speaker
        if line.startswith('[') and line.endswith(']'):
            # If we are switching speakers, save the previous speaker's annotations and reset
            if current_speaker and annotations:
                conversations.append({
                    "speaker": current_speaker,
                    "annotations": annotations.copy()
                })
                annotations = []

            current_speaker = line[1:-1]  # Extract the speaker name without brackets

        # Check for annotations
        elif line.startswith('(ts') and ')' in line:
            start, end = line.split(')')[0][1:].split(' - ')
            text = line.split(')')[1].strip()
            annotations.append({
                "start": start,
                "end": end,
                "text": text
            })

    # Add the last speaker's annotations
    if current_speaker and annotations:
        conversations.append({
            "speaker": current_speaker,
            "annotations": annotations
        })

    return {"conversations": conversations}

parsed_data = parse_annotation_file("Outputs/Clean txt/tests1.txt")
parsed_data


In [None]:
import json

# Save the parsed data to a JSON file
json_filename = "Outputs/Clean txt/tests1.json"

with open(json_filename, 'w') as json_file:
    json.dump(parsed_data, json_file, indent=4, ensure_ascii=False)

json_filename

In [None]:
import json

# Load parsed annotations
with open("Outputs/Clean txt/tests1.json", 'r') as file:
    parsed_data = json.load(file)

# Initialize the dataset list
dataset = []

# Loop through each conversation
for conversation in parsed_data["conversations"]:
    
    annotations = conversation["annotations"]
    if annotations:  # check if annotations are not empty
        
        # Check if there's a next conversation
        current_index = parsed_data["conversations"].index(conversation)
        if current_index + 1 < len(parsed_data["conversations"]):
            next_annotations = parsed_data["conversations"][current_index + 1]["annotations"]
        else:
            next_annotations = []
        
        for i in range(len(annotations)):
            entry = {}
            
            # If there's a corresponding response in the next annotations
            if i < len(next_annotations):
                entry["instruction"] = annotations[i]["text"]
                entry["response"] = next_annotations[i]["text"]
                dataset.append(entry)

# Convert the dataset into the desired JSON format
formatted_data = {
    "instruction,response": [
        {
            "instruction": entry["instruction"],
            "response": entry["response"]
        }
        for entry in dataset
    ]
}

# Save the formatted data to a new JSON file
output_path = "Outputs/Clean txt/tests1_F.json"
with open(output_path, 'w') as file:
    json.dump(formatted_data, file, indent=4)

output_path

In [None]:
# Re-parse the EAF file using ElementTree due to the internal reset
tree = ET.parse("Data/Simeon/Floyd ELDP 2023 deposit/QUSF2018_02_03S1 _pub.eaf")
root = tree.getroot()

# Create a dictionary to store texts grouped by time stamps
time_stamp_grouped_texts = {}

# Iterate over the tiers in the EAF file
for tier in root.findall(".//TIER"):
    # Extract annotations for each tier
    for annotation in tier.findall(".//ALIGNABLE_ANNOTATION"):
        time_slot_ref1 = annotation.attrib.get('TIME_SLOT_REF1')
        time_slot_ref2 = annotation.attrib.get('TIME_SLOT_REF2')
        time_stamp_pair = (time_slot_ref1, time_slot_ref2)
        
        # Get the actual annotation value
        anno_value = annotation.find(".//ANNOTATION_VALUE").text
        
        # Store the annotation value in the dictionary
        if time_stamp_pair not in time_stamp_grouped_texts:
            time_stamp_grouped_texts[time_stamp_pair] = []
        time_stamp_grouped_texts[time_stamp_pair].append(anno_value)

# Preview the first few entries
dict(list(time_stamp_grouped_texts.items())[:15])  # Displaying only the first 5 for brevity

In [None]:
import xml.etree.ElementTree as ET

# Parse the EAF file again
tree = ET.parse("Data/Simeon/Floyd ELDP 2023 deposit/QUSF2018_02_03S1 _pub.eaf")
root = tree.getroot()

# Iterate over the tiers in the EAF file and aggregate texts by time stamps
aggregated_texts_by_time_stamp = {}

for tier in root.findall(".//TIER"):
    for annotation in tier.findall(".//ALIGNABLE_ANNOTATION"):
        time_slot_ref1 = annotation.attrib.get('TIME_SLOT_REF1')
        time_slot_ref2 = annotation.attrib.get('TIME_SLOT_REF2')
        time_stamp_pair = (time_slot_ref1, time_slot_ref2)
        anno_value = annotation.find(".//ANNOTATION_VALUE").text
        
        print(f"annotation in {annotation} is {anno_value}\n")

        # Check if the annotation value is not None
        if anno_value:
            if time_stamp_pair in aggregated_texts_by_time_stamp:
                aggregated_texts_by_time_stamp[time_stamp_pair].append(anno_value)
            else:
                aggregated_texts_by_time_stamp[time_stamp_pair] = [anno_value]

# Join the aggregated texts for each time stamp
for time_stamp, texts in aggregated_texts_by_time_stamp.items():
    aggregated_texts_by_time_stamp[time_stamp] = ' '.join(texts)

# Preview the first few entries
dict(list(aggregated_texts_by_time_stamp.items())[:5])  # Displaying only the first 5 for brevity


In [None]:
# Import the ElementTree library again
import xml.etree.ElementTree as ET

# Re-parse the EAF file as the environment was reset
tree = ET.parse("Data/Simeon/Floyd ELDP 2023 deposit/QUSF2018_02_03S1 _pub.eaf")
root = tree.getroot()

# Extract and aggregate the annotations again
aggregated_texts_by_time_stamp = {}

for tier in root.findall(".//TIER"):
    for annotation in tier.findall(".//ALIGNABLE_ANNOTATION"):
        time_slot_ref1 = annotation.attrib.get('TIME_SLOT_REF1')
        time_slot_ref2 = annotation.attrib.get('TIME_SLOT_REF2')
        time_stamp_pair = (time_slot_ref1, time_slot_ref2)
        anno_value = annotation.find(".//ANNOTATION_VALUE").text
        
        if anno_value:
            if time_stamp_pair in aggregated_texts_by_time_stamp:
                aggregated_texts_by_time_stamp[time_stamp_pair].append(anno_value)
            else:
                aggregated_texts_by_time_stamp[time_stamp_pair] = [anno_value]

# Join the aggregated texts for each time stamp
for time_stamp, texts in aggregated_texts_by_time_stamp.items():
    aggregated_texts_by_time_stamp[time_stamp] = ' '.join(texts)

# Define the output file path
output_file_path_with_metadata = "Outputs/Clean txt/tests2.txt"

# Extract metadata from the EAF file
elan_version = root.attrib.get('VERSION', 'Unknown')
author = root.attrib.get('AUTHOR', 'Unknown')
date = root.attrib.get('DATE', 'Unknown')

# Write metadata and dialogues to the text file
with open(output_file_path_with_metadata, 'w') as outfile:
    # Write metadata
    outfile.write(f"ELAN File: QUSF2018_02_03S1 _pub.eaf\n")
    outfile.write(f"ELAN Version: {elan_version}\n")
    outfile.write(f"Author: {author}\n")
    outfile.write(f"Date: {date}\n")
    
    # Add a separator
    outfile.write("\nDialogues:\n--------------\n")
    
    # Write dialogues
    for time_stamp, text in aggregated_texts_by_time_stamp.items():
        outfile.write(f"{time_stamp} {text}\n")

output_file_path_with_metadata

In [None]:
def print_debug_dialogues(debug_count, file_path):
    # Parse the EAF file
    tree = ET.parse(file_path)
    root = tree.getroot()
    
    # Create a nested dictionary to store annotations for each time slot from all tiers
    all_tiers_annotations = {}
    
    # Counter to keep track of the number of dialogues printed
    dialogues_printed = 0
    
    # Iterate through each tier in the EAF file
    for tier in root.findall(".//TIER"):
        tier_name = tier.attrib.get('TIER_ID')
        
        # Iterate through each alignable annotation in the tier
        for annotation in tier.findall(".//ALIGNABLE_ANNOTATION"):
            time_slot_ref1 = annotation.attrib.get('TIME_SLOT_REF1')
            time_slot_ref2 = annotation.attrib.get('TIME_SLOT_REF2')
            time_stamp_pair = (time_slot_ref1, time_slot_ref2)
            
            # Extract the annotation value
            anno_value_elem = annotation.find(".//ANNOTATION_VALUE")
            anno_value = anno_value_elem.text if anno_value_elem is not None else "None"
            
            # Store the annotation in the nested dictionary
            if time_stamp_pair not in all_tiers_annotations:
                all_tiers_annotations[time_stamp_pair] = {}
            
            all_tiers_annotations[time_stamp_pair][tier_name] = anno_value
            
            # Print for debugging
            if dialogues_printed < debug_count:
                print(f"Time Stamp: {time_stamp_pair}, Tier: {tier_name}, Annotation: {anno_value}")
                dialogues_printed += 1

In [None]:
# Test the function with a sample ELAN file and print the first 10 dialogues for debugging
print_debug_dialogues(1000, "Data/Simeon/Floyd ELDP 2023 deposit/QUSF2018_02_03S1 _pub.eaf")

In [None]:
from collections import defaultdict

def extract_and_debug_all_layers(debug_count, file_path):
    # Parse the EAF file
    tree = ET.parse(file_path)
    root = tree.getroot()
    
    # Create a nested dictionary to store annotations for each time slot from all tiers and sub-tiers
    all_layers_annotations = defaultdict(list)
    
    # Set to keep track of encountered tiers and sub-tiers
    encountered_tiers_and_subtiers = set()
    
    # Iterate through each tier in the EAF file
    for tier in root.findall(".//TIER"):
        tier_name = tier.attrib.get('TIER_ID')
        
        # Counter to keep track of the number of dialogues printed for each tier
        dialogues_printed = 0
        
        # Add the tier name to the set of encountered tiers and sub-tiers
        encountered_tiers_and_subtiers.add(tier_name)
        
        # Iterate through each alignable annotation in the tier
        for annotation in tier.findall(".//ALIGNABLE_ANNOTATION"):
            time_slot_ref1 = annotation.attrib.get('TIME_SLOT_REF1')
            time_slot_ref2 = annotation.attrib.get('TIME_SLOT_REF2')
            time_stamp_pair = (time_slot_ref1, time_slot_ref2)
            
            # Extract the annotation value
            anno_value_elem = annotation.find(".//ANNOTATION_VALUE")
            anno_value = anno_value_elem.text if anno_value_elem is not None else "None"
            
            # Store the annotation in the nested dictionary
            all_layers_annotations[tier_name].append((time_stamp_pair, anno_value))
            
            # Print the first N dialogues for debugging
            if dialogues_printed < debug_count:
                print(f"Time Stamp: {time_stamp_pair}, Tier: {tier_name}, Annotation: {anno_value}")
                dialogues_printed += 1
        
        # Iterate through each reference annotation in the tier to capture sub-tiers
        for ref_annotation in tier.findall(".//REF_ANNOTATION"):
            ref_annotation_id = ref_annotation.attrib.get('ANNOTATION_ID')
            annotation_ref = ref_annotation.attrib.get('ANNOTATION_REF')
            
            # Add the sub-tier name to the set of encountered tiers and sub-tiers
            sub_tier_name = f"{tier_name}_ref_to_{annotation_ref}"
            encountered_tiers_and_subtiers.add(sub_tier_name)
            
            # Extract the annotation value
            anno_value_elem = ref_annotation.find(".//ANNOTATION_VALUE")
            anno_value = anno_value_elem.text if anno_value_elem is not None else "None"
            
            # Store the annotation in the nested dictionary
            all_layers_annotations[sub_tier_name].append((annotation_ref, anno_value))
            
            # Print the first N dialogues for debugging
            if dialogues_printed < debug_count:
                print(f"Reference Annotation ID: {ref_annotation_id}, Sub-Tier: {sub_tier_name}, Annotation: {anno_value}")
                dialogues_printed += 1
    
    # Return the set of all encountered tiers and sub-tiers, and the nested dictionary of annotations
    return encountered_tiers_and_subtiers, all_layers_annotations

In [None]:
# Test the function with a sample ELAN file and print the first 3 dialogues for debugging
encountered_tiers_and_subtiers, all_layers_annotations = extract_and_debug_all_layers(1, "Data/Simeon/Floyd ELDP 2023 deposit/QUSF2018_02_03S1 _pub.eaf")

# Display the set of all encountered tiers and sub-tiers
encountered_tiers_and_subtiers

In [None]:
def write_annotations_to_txt(output_file_path, all_layers_annotations):
    with open(output_file_path, 'w') as f:
        for tier, annotations in all_layers_annotations.items():
            # Write the tier name
            f.write(f"{tier}:\n")
            
            # Write the annotations for this tier
            for time_stamp_pair, anno_value in annotations:
                f.write(f"  Time Stamp: {time_stamp_pair}, Annotation: {anno_value}\n")
            
            # Add a blank line to separate tiers
            f.write("\n")

# Define the output file path
output_txt_file_path = "Outputs/Clean txt/tests3.txt"

# Write the extracted annotations to a text file
write_annotations_to_txt(output_txt_file_path, all_layers_annotations)

output_txt_file_path


In [None]:
from collections import defaultdict

def write_grouped_annotations_to_txt(output_file_path, all_layers_annotations):
    with open(output_file_path, 'w') as f:
        # Group annotations by their parent tier
        parent_tier_groups = defaultdict(list)
        
        for tier, annotations in all_layers_annotations.items():
            parent_tier = tier.split('_ref_to_')[0] if '_ref_to_' in tier else tier  # Extract parent tier name
            parent_tier_groups[parent_tier].extend(annotations)
        
        # Sort and write the grouped annotations
        for parent_tier, grouped_annotations in parent_tier_groups.items():
            # Sort the annotations by their time stamp or reference annotation ID
            grouped_annotations.sort()
            
            f.write(f"{parent_tier}:\n")
            
            for i, (time_stamp_pair, anno_value) in enumerate(grouped_annotations):
                f.write(f"  Sequence {i+1}, Time Stamp: {time_stamp_pair}, Annotation: {anno_value}\n")
            
            # Add a blank line to separate tiers
            f.write("\n")

# Define the output file path for the grouped annotations
grouped_output_txt_file_path = "Outputs/Clean txt/tests4.txt"

# Write the extracted and grouped annotations to a text file
# Using all_layers_annotations from the previous function; you'll use the variable from your local setup
write_grouped_annotations_to_txt(grouped_output_txt_file_path, all_layers_annotations)

grouped_output_txt_file_path


In [None]:
def write_grouped_by_timestamp_to_txt(output_file_path, all_layers_annotations):
    with open(output_file_path, 'w') as f:
        # Dictionary to segregate annotations into their respective tiers and sub-tiers
        segregated_annotations = defaultdict(list)
        
        # Segregate annotations by their parent tier
        for tier, annotations in all_layers_annotations.items():
            parent_tier = tier.split('_ref_to_')[0] if '_ref_to_' in tier else tier  # Extract parent tier name
            segregated_annotations[parent_tier].append((tier, annotations))
        
        # Write the grouped annotations
        for parent_tier, tier_and_annotations_list in segregated_annotations.items():
            f.write(f"{parent_tier}:\n")
            
            # Create a dictionary to store annotations by their time stamp for the main tier
            main_tier_dict = {}
            main_tier_name, main_tier_annotations = tier_and_annotations_list[0]
            
            # Create dictionaries for sub-tiers, indexed by the reference annotation ID
            sub_tier_dicts = {sub_tier_name: {} for sub_tier_name, _ in tier_and_annotations_list[1:]}
            
            # Populate the main tier dictionary
            for time_stamp_pair, main_anno_value in main_tier_annotations:
                main_tier_dict[time_stamp_pair] = main_anno_value
            
            # Populate the sub-tier dictionaries
            for sub_tier_name, sub_tier_annotations in tier_and_annotations_list[1:]:
                for ref_anno_id, sub_anno_value in sub_tier_annotations:
                    sub_tier_dicts[sub_tier_name][ref_anno_id] = sub_anno_value
            
            # Loop through sorted annotations of the main tier
            for time_stamp_pair, main_anno_value in sorted(main_tier_dict.items(), key=lambda x: x[0]):
                f.write(f"  Time Stamp: {time_stamp_pair}, Annotation: {main_anno_value}\n")
                
                # Look for corresponding annotations in the sub-tiers
                for sub_tier_name, sub_tier_dict in sub_tier_dicts.items():
                    # Find the annotation with the matching reference annotation ID (which should be same as the main tier's time stamp)
                    matching_sub_tier_annotation = sub_tier_dict.get(time_stamp_pair, None)
                    
                    # Write the annotation from this sub-tier (if any)
                    f.write(f"  Annotation: {matching_sub_tier_annotation if matching_sub_tier_annotation else 'None'}  # from {sub_tier_name}\n")
                
                # Add a blank line to separate different time stamps
                f.write("\n")

In [None]:
# Define the output file path for the annotations grouped by time stamp
grouped_by_timestamp_output_txt_file_path = "Outputs/Clean txt/tests5.txt"

# Write the extracted and grouped-by-time-stamp annotations to a text file
# Using all_layers_annotations from the previous function; you'll use the variable from your local setup
write_grouped_by_timestamp_to_txt(grouped_by_timestamp_output_txt_file_path, all_layers_annotations)

grouped_by_timestamp_output_txt_file_path