# Data Preprocessing

In [11]:
def bio_to_tags(data, separator='\002'):
    def parse_bio_labels(bio_labels):
        entities = []
        current_entity = None
        
        for i, label in enumerate(bio_labels):
            if label == 'O':
                if current_entity is not None:
                    entities.append(current_entity)
                    current_entity = None
                continue
            
            # Correctly parse entity_type-B/I format
            parts = label.split('-')
            if len(parts) != 2:
                continue  # Skip invalid labels
            
            e_type, prefix = parts  # Format: disaster-B → e_type=disaster, prefix=B
            
            if prefix == 'B':
                if current_entity is not None:
                    entities.append(current_entity)
                current_entity = {
                    'start': i,
                    'end': i,
                    'type': e_type
                }
            elif prefix == 'I':
                if current_entity and current_entity['type'] == e_type:
                    current_entity['end'] = i
                else:
                    # Treat I tag as independent B tag when no corresponding B tag exists
                    if current_entity is not None:
                        entities.append(current_entity)
                    current_entity = {
                        'start': i,
                        'end': i,
                        'type': e_type
                    }
        
        if current_entity is not None:
            entities.append(current_entity)
        
        return entities

    for item in data:
        input_str = item.get('input', '')
        bio_str = item.get('label_BIO', '')
        
        if not input_str or not bio_str:
            continue
        
        bio_labels = bio_str.split(separator)
        if len(bio_labels) != len(input_str):
            continue  # Skip when lengths don't match
        
        entities = parse_bio_labels(bio_labels)
        # Insert in reverse start order to avoid tag position shifting
        sorted_entities = sorted(entities, key=lambda x: x['start'], reverse=True)
        
        chars = list(input_str)
        for ent in sorted_entities:
            start = ent['start']
            end = ent['end'] + 1  # Slice includes up to end-1
            e_type = ent['type']
            
            # Insert closing tag
            if end <= len(chars):
                chars.insert(end, f'</{e_type}>')
            else:
                chars.append(f'</{e_type}>')
            
            # Insert opening tag
            if start <= len(chars):
                chars.insert(start, f'<{e_type}>')
        
        item['output'] = ''.join(chars)
    
    return data

def loaddata(filename):
    """
    Read BIO format file (text_a\tlabel format)
    Automatically handle issues: character/label length mismatch, empty lines, format errors
    Return data list structure consistent with original annotation
    """
    data = []
    line_count = 0
    success_count = 0
    
    try:
        with open(filename, 'r', encoding='utf-8') as f:
            # Skip header
            next(f)
            
            for line in f:
                line_count += 1
                line = line.strip()
                if not line:
                    print(f"Skipping empty line (Line {line_count})")
                    continue
                
                # Split text and label by tab
                try:
                    text_part, label_part = line.split('\t', 1)
                except ValueError:
                    print(f"Format error (Line {line_count}): Missing tab separator")
                    continue
                
                # Parse text part (character-level splitting)
                input_chars = text_part.split('\002')
                original_text = ''.join(input_chars)
                
                # Parse label part (BIO tag splitting)
                bio_tags = label_part.split('\002')
                
                # Length validation (prioritize valid sections)
                min_len = min(len(input_chars), len(bio_tags))
                if len(input_chars) != len(bio_tags):
                    print(f"Length mismatch warning (Line {line_count}): "
                          f"Characters {len(input_chars)} vs Tags {len(bio_tags)}, truncated")
                    input_chars = input_chars[:min_len]
                    bio_tags = bio_tags[:min_len]
                
                # Verify character and tag consistency
                if ''.join(input_chars) != original_text[:min_len]:
                    print(f"Character misalignment warning (Line {line_count}): "
                          "Split characters don't match original text, possible \002 escape issue")
                
                # Build data item
                data_item = {
                    'input': original_text,
                    'label_BIO': '\002'.join(bio_tags)
                }
                data.append(data_item)
                success_count += 1
                
    except FileNotFoundError:
        print(f"Error: File {filename} not found")
        return []
    except UnicodeDecodeError:
        print(f"Error: File {filename} is not UTF-8 encoded")
        return []
    
    print(f"Successfully read {success_count}/{line_count} valid records (Skipped {line_count - success_count} exceptions)")
    return data

import re

def delpkl(name):
    for item in name:
        output = item['output']
        if '</think>' in output:
            output = output.split('</think>')[1]
        # Extract all time entities
        times = re.findall(r'<time>(.*?)</time>', output)
        # Extract all location entities
        locations = re.findall(r'<Location>(.*?)</Location>', output)
        # Extract all disaster entities
        disasters = re.findall(r'<disaster>(.*?)</disaster>', output)
        
        item['label_json'] = {
            'time': times,
            'location': locations,
            'disaster': disasters
        }

a = loaddata('./data_and_checkpoints/test.txt')

In [12]:
import os
import pickle

b = bio_to_tags(a)
c = delpkl(b)
# c
with open(os.path.join('data_and_checkpoints',"test.pkl"), "wb") as f:
    pickle.dump(b, f)  
print('The data was successfully saved as a pkl file.')