In [None]:
import pandas as pd
import os
import glob
import re
import CBE_utils as CBE

In [None]:
input_dir = "/media/schmied.christopher/T7 Shield/Datasets/ECBL/raw/"
output_dir = "/media/schmied.christopher/T7 Shield/Datasets/ECBL/processed/"

annotation_dir = "/home/schmied.christopher/FMP_Docs/Projects/eu_os_ecbl_qc/annotation/"

input_dir = "/home/schmiedc/FMP_Docs/Projects/ECBL_Project/QualityControl_analysis_revision/tech_test_input/"
output_dir = "/home/schmiedc/FMP_Docs/Projects/ECBL_Project/QualityControl_analysis_revision/tech_test_output/"

annotation_dir = "/home/schmiedc/FMP_Docs/Projects/ECBL_Project/QualityControl_analysis_revision/annotation/"

In [None]:
def list_folders(directory):
    try:
        # List all entries in the specified directory
        entries = os.listdir(directory)
        
        # Filter out non-folder entries
        folders = [entry for entry in entries if os.path.isdir(os.path.join(directory, entry))]
        return folders
    
    except FileNotFoundError:
        
        return f"The directory '{directory}' does not exist."
    
    except PermissionError:
        
        return f"Permission denied to access the directory '{directory}'."

In [None]:
def process_profile_metadata(profiles_dataframe, timestamp_dataframe, datetime_dataframe, source, batch_name):

    # Parse staining date from batch_name (yyMMdd)
    ## The Metadata_staining_date is the batch date
    batch_name_parsed = pd.to_datetime(batch_name, format="%y%m%d", errors="coerce")
    if pd.isna(batch_name_parsed):
        staining_date = "invalid-date"
    else:
        staining_date = batch_name_parsed.strftime("%Y-%m-%d")

    # Extraction from timestamp df 
    ## user, instrument
    user = None
    instrument = None
    if timestamp_dataframe is not None and not timestamp_dataframe.empty:
        if "user" in timestamp_dataframe.columns:
            user = timestamp_dataframe["user"].iloc[0]
        if "instrument" in timestamp_dataframe.columns:
            instrument = timestamp_dataframe["instrument"].iloc[0]

    # Naming convention
    ## Metadata_Plate: B1001R1
    ## Metadata_plate_map_name: B1001_R1
    ## Metadata_plate_name: B1001
    plate_name = profiles_dataframe["Metadata_Plate"].astype(str).str[:-2]

    ## Metadata_replicate_number: R1
    replicate_number = profiles_dataframe["Metadata_Plate"].astype(str).str[-2:]

    # fixes the dataframe fragmentation issue
    profiles_dataframe = profiles_dataframe.copy() 

    # Rename + assign in a single shot to prevent fragmentation
    out = (
        profiles_dataframe.rename(columns={"Metadata_Well": "Metadata_Well_randomized"})
        .assign(
            Metadata_plate_name=plate_name,
            Metadata_replicate_number=replicate_number,
            Metadata_plate_map_name=plate_name + "_" + replicate_number,
            Metadata_user=user,
            Metadata_source=source,
            Metadata_instrument=instrument,
            Metadata_imaging_date=datetime_dataframe.date(),
            Metadata_imaging_time=datetime_dataframe.time(),
            Metadata_imaging_timezone=datetime_dataframe.strftime("%z"),
            Metadata_staining_date=staining_date,
        )
        .copy()  # de-fragment for speed downstream
    )

    return out

In [None]:
def pad_numbers(value):
    match = re.match(r"([A-Z])(\d+)", value)
    if match:
        letter = match.group(1)
        number = match.group(2).zfill(2)  # Pad number to 2 digits
        return f"{letter}{number}"
    return value

In [None]:
source_list = list_folders(input_dir)

In [None]:
for source in source_list:
    
    print(source)
    
    source_path = os.path.join(input_dir, source)
    
    plate_list = list_folders(source_path)
    
    for plate_name in plate_list:
        
        print(plate_name)
    
        folder_path = os.path.join(source_path, plate_name)

        print(folder_path)

        if os.path.exists(folder_path):

            # The new data contains another layer with the batch date
            batch_list = list_folders(folder_path)

            # Only process the latest batch
            latest_batch_name = sorted(batch_list)[-1]

            print(latest_batch_name)

            # Skip processing batch if already present
            replicate_number = plate_name[-2:]
            plate_map_name = plate_name[:-2] + "_" + replicate_number
            staining_date = pd.to_datetime(latest_batch_name, format="%y%m%d", errors="coerce")
            
            if not pd.isna(staining_date):
                staining_date = staining_date.strftime("%Y-%m-%d")
            else:
                staining_date = "invalid-date"

            date_plate_map_name = staining_date + "_" + plate_map_name
            output_plate_path = os.path.join(output_dir, source, date_plate_map_name)
            filename = os.path.join(output_plate_path, plate_map_name + ".csv")

            if os.path.exists(filename):
                print(f"Info: Skipping {plate_map_name}: already processed (early skip)")
                continue

            batch_path = os.path.join(folder_path, latest_batch_name)

            # IMPORTANT: there can be multiple merged files. 
            profiles_name = plate_name + "*_CP_Profiles_Aggregated.csv"
            profiles_file_list = glob.glob(os.path.join(batch_path, profiles_name))
        
            if len(profiles_file_list) > 0:
            
                # read profiles file
                try:
                
                    profiles = pd.read_csv(profiles_file_list[0])
                
                    # Check row count of original file
                    row_count = profiles.shape[0]
                    print(f"Info: {plate_name} has {row_count} rows")
                
                except Exception as e:
                
                    print(f"Error reading {profiles_file_list[0]}: {str(e)}")
            
                # read timestamp file
                timestamp_file_path = os.path.join(batch_path, "TimeStamp.csv")
            
                try:
                
                    timestamp = pd.read_csv(timestamp_file_path)
                    
                except Exception as e:
                
                    print(f"Error reading {timestamp_file_path}: {str(e)}")
            
                # get datatime info from the timestamp file
                datetime = pd.to_datetime(timestamp['date'].iloc[0], errors='coerce')
                processed_profiles = process_profile_metadata(profiles, timestamp, datetime, source, latest_batch_name)

                # load correct plate layout based on replication number
                replicate_number = processed_profiles['Metadata_replicate_number'].iloc[0]

                replicate_layout_name = None

                if (replicate_number == 'R1'):
                
                    replicate_layout_name = 'Picklist_Replicate1.csv'
            
                elif (replicate_number == 'R2'):
                
                    replicate_layout_name = 'Picklist_Replicate2.csv'
                
                elif (replicate_number == 'R3'):
                
                    replicate_layout_name = 'Picklist_Replicate3.csv'
                
                elif (replicate_number == 'R4'):
                
                    replicate_layout_name = 'Picklist_Replicate4.csv'
                
                else: 
                
                    print(f"Error: {replicate_number} not valid")


                print(f"{replicate_layout_name} selected")
                
                plate_layout_path = os.path.join(annotation_dir, 'plate_layout', replicate_layout_name) # type: ignore
                
                try:
                
                    plate_layout = pd.read_csv(plate_layout_path)
                
                    # Issue in the plate layout the well numbers are not padded
                    plate_layout['Destination Well'] = plate_layout['Destination Well'].apply(pad_numbers)
                    plate_layout['Source Well'] = plate_layout['Source Well'].apply(pad_numbers)
            
                except Exception as e:
                
                    print(f"Error reading { plate_layout_path}: {str(e)}")

                # rename the columns accordingly
                # processed_profiles['Metadata_picklist_name'] = replicate_layout_name 
                processed_profiles = pd.concat(
                    [processed_profiles, pd.Series(replicate_layout_name, index=processed_profiles.index, name="Metadata_picklist_name")],
                    axis=1).copy()

                # Destination well are the randomized wells 
                plate_layout = plate_layout.rename(columns={"Destination Well": "Metadata_Well_randomized"})

                # Source well are the well IDs of the compound mother plate
                plate_layout = plate_layout.rename(columns={"Source Well": "Metadata_Well"})
                processed_profiles = pd.merge(processed_profiles, plate_layout, on='Metadata_Well_randomized')
            
                # get info for pos. and neg. controls
                controls_layout_path = os.path.join(annotation_dir, "pos_neg_ctrl.csv")
            
                try:
                
                    controls_layout = pd.read_csv(controls_layout_path)
                
                    # Issue in the plate layout the well numbers are not padded
                    controls_layout['Metadata_Well'] = controls_layout['Metadata_Well'].apply(pad_numbers)
            
                except Exception as e:
                
                    print(f"Error reading {controls_layout}: {str(e)}")
                
                processed_profiles = pd.merge(processed_profiles, controls_layout, on='Metadata_Well')

                # Bring metadata columns to the front
                features = CBE.get_feature_vector(processed_profiles)

                # metadata_dataframe = set(processed_profiles.columns) - set(features)
                # processed_profiles = processed_profiles[list(metadata_dataframe) + list(features)]
                metadata_cols = [c for c in processed_profiles.columns if c not in features]
                processed_profiles = processed_profiles[metadata_cols + list(features)]

                # create output directory if not exists and save output file
                os.makedirs(output_plate_path, exist_ok=True)
                processed_profiles.to_csv(filename, index=False)

                print(f"{staining_date} staining date")
                print(f"{plate_map_name} loaded and resaved")
                print("Finished plate")
            
            else: 
            
                print(f"Error: {plate_name} no profiles")

print("Finished all plates")