In [None]:
import os
import pandas as pd
from dask import delayed, compute
import dask.dataframe as dd

# Define the base path to the files (pointing to the `files` folder itself)
base_path = '/path/to/files'

# Function to read text content and return a single row of data
@delayed
def process_text_file(patient_id, study_id, text_file_path):
    # Read the content of the text file
    with open(text_file_path, 'r') as f:
        text_content = f.read()
    return [patient_id, study_id, text_content]

# List to hold all delayed tasks
tasks = []

# Traverse through the aggregation directories (e.g., `p10`, `p11`, etc.)
for agg_folder in os.listdir(base_path):
    agg_path = os.path.join(base_path, agg_folder)
    
    # Only proceed if the item is a directory
    if os.path.isdir(agg_path):
        
        # Traverse through patient directories within each aggregation folder
        for patient_folder in os.listdir(agg_path):
            if patient_folder.startswith("p"):
                # Extract patient identifier
                patient_id = patient_folder[1:]  # Remove 'p' prefix
                
                # Define patient path
                patient_path = os.path.join(agg_path, patient_folder)
                
                # Traverse study folders within the patient folder
                for study_folder in os.listdir(patient_path):
                    if study_folder.startswith("s"):
                        # Extract study identifier
                        study_id = study_folder[1:]  # Remove 's' prefix
                        
                        # Define study path
                        study_path = os.path.join(patient_path, study_folder)
                        
                        # Look for text files within the study folder
                        for file in os.listdir(study_path):
                            if file.endswith(".txt"):
                                # Define text file path
                                text_file_path = os.path.join(study_path, file)
                                
                                # Create a delayed task for each text file
                                tasks.append(process_text_file(patient_id, study_id, text_file_path))

# Execute tasks in parallel and collect results
results = compute(*tasks, num_workers=20)  # Adjust number of workers to 20 for parallel execution

# Convert results into a Dask DataFrame
df = dd.from_pandas(pd.DataFrame(results, columns=["Patient Identifier", "Study Identifier", "Text Content"]), npartitions=20)

# Optionally, if you want to convert it to a Pandas DataFrame for final analysis
df = df.compute()

# Display the DataFrame to user
import ace_tools as tools; tools.display_dataframe_to_user(name="Patient Study Data", dataframe=df)
