## Preprocessing

In [None]:
! pip install --upgrade -q datasets

In [None]:
import pandas as pd
import re
import json
import os
import glob
from tqdm import tqdm
from datasets import load_dataset, DatasetDict, Dataset, load_from_disk
from sklearn.model_selection import train_test_split

In [None]:
def recalculate_line_positions_with_newlines(data_lines):
    """
    Recalculate the start positions of each line in a list of text lines, considering the newline characters.

    Parameters:
    data_lines (list of str): A list of strings, where each string represents a line from a text file.

    Returns:
    list: A list of integers, each representing the starting character position of a line in the combined text.
          The last position (which would be the end of the file) is not included.

    Example:
    If data_lines = ["Hello", "World"], the function will return [0, 6].
    "Hello" starts at position 0, and "World" starts at position 6 (5 characters of "Hello" + 1 newline character).
    """

    positions = [0] # Initialize with 0 for the start of the first line
    current_position = 0

    for line in data_lines:
        current_position += len(line) + 1  # Increment by line length plus newline character
        positions.append(current_position)

    return positions[:-1]  # Exclude the last position which is end of file, not start of a new line

def parse_txt(data_files, answer_file=None):
    """
    Parse text files to extract specific annotations and prepare data for further processing.

    This function reads text files and associated answer files (if provided) to create a structured dataset.

    Parameters:
    data_files (list of str): List of paths to text files that need to be processed.
    answer_file (str, optional): Path to a file containing answers or annotations for the text files.
                                 The structure of this file should be tab-separated values
                                 with at least 5 columns: file_id, tag_type, start_line, end_line, and text_info.
                                 An optional sixth column for normalized_time can also be included.

    Returns:
    list: A list of dictionaries, each containing a 'prompt' key with text to process and a 'completion' key
          with the corresponding extracted information.

    Each text file is processed to find lines that contain information specified in the answer_file.
    If an answer is found in a line, the corresponding metadata is extracted and added to the output.
    If the answer file is provided, the function searches for specific tagged information in each line
    and compiles found tags along with their positions.
    """

    output_data = []
    answers = {}

    if answer_file:
        # Load contents from the answer file
        with open(answer_file, 'r', encoding='utf-8') as file:
            for line in file:
                parts = line.strip().split('\t')
                if len(parts) < 5:
                    continue
                file_id, tag_type, start_line, end_line, text_info = parts[:5]
                normalized_time = parts[5] if len(parts) == 6 else None
                if file_id not in answers:
                    answers[file_id] = []
                answers[file_id].append({
                    'tag_type': tag_type,
                    'start_line': int(start_line),
                    'end_line': int(end_line),
                    'text_info': text_info,
                    'normalized_time': normalized_time
                })

    for data_file in tqdm(data_files, desc="Processing files"):
        # Process each .txt file
        file_name_no_ext = os.path.basename(data_file).split('.')[0]
        associated_answers = answers.get(file_name_no_ext, [])

        with open(data_file, 'r', encoding='utf-8') as file:
            data_content = file.read()
            data_lines = data_content.split('\n')
            line_positions = recalculate_line_positions_with_newlines(data_lines)

        for line_number, line_content in enumerate(data_lines):
            line_start_pos = line_positions[line_number]
            if not line_content.strip():
                continue

            found_answers = []
            for ans in associated_answers:
                if ans['text_info'] in line_content:
                    if not any(a['tag_type'] == ans['tag_type'] and a['text_info'] == ans['text_info'] for a in found_answers):
                        start_index = line_content.find(ans['text_info'])
                        if start_index != -1:
                            ans['start_line'] = line_start_pos + start_index
                            ans['end_line'] = ans['start_line'] + len(ans['text_info'])
                            found_answers.append(ans)

            json_entry = {
                "prompt": f"Please extract HIPAA related information from the given text: file ID:{file_name_no_ext}, start:{line_start_pos}, content:{line_content}\n\n###\n\n"
            }

            if found_answers:
                ans_text = ' | '.join([f"Type: {ans['tag_type']}, Content: {ans['text_info']}" + (f" (Normalized: {ans['normalized_time']})" if ans['normalized_time'] else "") for ans in found_answers])
                json_entry["completion"] = f"{ans_text} END"
            elif answer_file:
                json_entry["completion"] = "PHI: NULL"

            output_data.append(json_entry)

    return output_data


In [None]:
# Define file paths and load datasets
file_path = "/mnt/nas/HYZ/AICUP/Dataset/"

train_data_files1 = glob.glob(f'{file_path}Train/First_Dataset/*.txt')
train_answer_file1 = f'{file_path}Train/First_answer.txt'
train_data1 = parse_txt(train_data_files1, train_answer_file1)

train_data_files2 = glob.glob(f'{file_path}Train/Second_Dataset/*.txt')
train_answer_file2 = f'{file_path}Train/Second_answer.txt'
train_data2 = parse_txt(train_data_files2, train_answer_file2)

val_data_files = glob.glob(f'{file_path}Validation/*.txt')
val_answer_file = f'{file_path}Validation_answer.txt'
val_data = parse_txt(val_data_files, val_answer_file)

test_data_files = glob.glob(f'{file_path}Test/*.txt')
test_data = parse_txt(test_data_files, answer_file=None)

In [None]:
# Combine and convert datasets to pandas DataFrame
train_data_combined = train_data1 + train_data2 + val_data
train_df = pd.DataFrame(train_data_combined)
test_df = pd.DataFrame(test_data)

# Convert DataFrames to Hugging Face Dataset format
train_dataset = Dataset.from_pandas(train_df)
test_dataset = Dataset.from_pandas(test_df)

# Split training data into train and validation sets
train_dataset, val_dataset = train_test_split(train_dataset, test_size=0.1)

# Convert the split datasets back to Hugging Face Dataset format
train_dataset = Dataset.from_dict(train_dataset)
val_dataset = Dataset.from_dict(val_dataset)

# Create a DatasetDict for easy handling of the datasets
dataset_dict = DatasetDict({
    "train": train_dataset,
    "validation": val_dataset,
    "test": test_dataset
})

# Save the dataset dictionary to disk
dataset_dir = f"/mnt/nas/HYZ/AICUP/dataset_dict_v2"
dataset_dict.save_to_disk(dataset_dir)

In [None]:
# Loading the saved dataset dictionary from disk
dataset_dir = "/mnt/nas/HYZ/AICUP/dataset_dict_v2"
dataset = load_from_disk(dataset_dir)

dataset  # Display the loaded dataset