# IMPORT MODULES

In [15]:
import os
from io import StringIO
import pandas as pd
import chardet
import os
import numpy as np
import re
from datetime import date
from datetime import datetime, timedelta
from dateutil import parser

In [18]:
def fix_dates(date_str):
    if not isinstance(date_str, str):
        return pd.NaT  # Return a 'Not a Timestamp' for non-string inputs (likely NaN)
    try:
        # dateutil's parse function will automatically infer the date format
        parsed_date = parser.parse(date_str)
        # Convert to pandas Timestamp
        timestamp = pd.Timestamp(parsed_date)
        # If the time is 00:00:00 (i.e., it's a date only), add a time of 00:00:00
        if timestamp.time() == pd.Timestamp('00:00:00').time():
            timestamp = timestamp.replace(hour=0, minute=0, second=0)
        return timestamp
    except ValueError:
        return pd.NaT  # Return a 'Not a Timestamp' for unparseable formats



# DEFINE THE LANGUAGE TRANSLATION DICTIONARY

In [19]:
language_map = {
    
}

# define the folder directory
base_directory = r'D:\Khoa Tran\Work\04 Xpert results Python tool\Converting'


# PROCESS ENGLISH FILES

In [52]:
# Function to detect the encoding of a file
def detect_encoding(file_path):
    with open(file_path, "rb") as f:
        return chardet.detect(f.read())['encoding']

# Updated function for extraction focusing on the Detail section
def refined_extraction_with_detail(file_path, columns_of_interest, analyte_result_columns, encoding):
    within_result_table = False
    within_analyte_result_section = False
    within_detail_section = False
    processed_columns = set()
    analyte_result_columns_found = set()

    with open(file_path, encoding=encoding) as f:      
        for line in f:                                         # run through each line and extract data
            line = line.strip()

            if "RESULT TABLE" in line:
                processed_columns = set()
                within_analyte_result_section = False

            if line in ["<Insufficient privilege to access data>", "\ufeff"]:
                continue

            for header in metadata_sections + ["RESULT TABLE", "Analyte Result","Detail"]:       #check the header
                if header in line:
                    if header == "RESULT TABLE":
                        within_result_table = True
                        yield {}
                    elif header == "Analyte Result":
                        within_analyte_result_section = True
                        analyte_result_columns_found.clear()
                    elif header == "Detail":                                                #if it is under Detail, not extract
                        within_analyte_result_section = False
                    elif header in metadata_sections:
                        within_result_table = False
                    break

            if within_analyte_result_section and line.startswith(tuple(analyte_result_columns)) and "," in line:
                key, value = line.split(",", 1)
                key = key.strip()
                if key in analyte_result_columns_found:
                    continue
                analyte_result_columns_found.add(key)
                yield {key: value.strip()}

            elif within_result_table and not within_analyte_result_section:
                if line.startswith(tuple(columns_of_interest)) and "," in line:
                    key, value = line.split(",", 1)
                    key = key.strip()
                    if key in processed_columns:
                        continue
                    processed_columns.add(key)
                    yield {key: value.strip()}

# Function to get all CSV files in a directory and its subdirectories
def get_all_csv_files(directory):
    csv_files = []
    for root, dirs, files in os.walk(directory):
        for file in files:
            if file.endswith('.csv'):
                csv_files.append(os.path.join(root, file))
    return csv_files

# Process file function
def process_file(file_path, columns_of_interest, analyte_result_columns):
    try:
        encoding = detect_encoding(file_path)
        patient_data = {}
        for extracted_data in refined_extraction_with_detail(file_path, columns_of_interest, analyte_result_columns, encoding):
            if not extracted_data:
                if patient_data:
                    patient_data["file"] = os.path.basename(file_path)
                    all_df_rows.append(patient_data)
                    patient_data = {}
            else:
                patient_data.update(extracted_data)
        if patient_data:
            patient_data["file"] = os.path.basename(file_path)
            all_df_rows.append(patient_data)
    except UnicodeDecodeError:
        print(f"UnicodeDecodeError encountered while processing file: {file_path}")

# List of metadata sections to ignore
metadata_sections = ["GeneXpert Dx System", "ASSAY INFORMATION", "Analysis Settings"]

# Define the folder directory
base_directory = r'D:\Khoa Tran\Work\04 Xpert results Python tool\Converting'

# List of columns of interest
columns_of_interest = [
    "Cartridge S/N", "Module Name", "Module S/N",
    "Instrument S/N", "S/W Version", "Reagent Lot ID", "Expiration Date",
    "Start Time", "End Time", "Error Status", "Status", "User",
    "Sample ID", "Patient ID", "Assay", "Assay Version",
    "Test Type", "Sample Type", "Notes", "Test Result"
]

# Additional columns to be extracted from the Analyte Result section
analyte_result_columns = ['SPC', 'IS1081-IS6110', 'rpoB1', 'rpoB2', 'rpoB3', 'rpoB4']


# Initialize a list to store all extracted rows
all_df_rows = []

# Process all files in the directory and its subdirectories
csv_files = get_all_csv_files(base_directory)
for file_path in csv_files:
    process_file(file_path, columns_of_interest, analyte_result_columns)

# Create the final DataFrame
dfe = pd.DataFrame(all_df_rows)
print(f"Data successfully extracted from {len(csv_files)} files, resulting in {dfe.shape[0]} rows and {dfe.shape[1]} columns.")
dfe.to_excel(r'D:\Khoa Tran\Work\04 Xpert results Python tool\Converting\results.xlsx', index=False)

Data successfully extracted from 1 files, resulting in 4 rows and 27 columns.


# PROCESS THE FRENCH FILES

In [19]:
import os
import pandas as pd
import chardet

# Function to detect the encoding of a file
def detect_encoding(file_path):
    with open(file_path, "rb") as f:
        return chardet.detect(f.read())['encoding']

# Function for extraction focusing on the Detail section
def refined_extraction_with_detail(file_path, columns_of_interest, detail_columns, encoding):
    within_result_table = False
    within_detail_section = False
    processed_columns = set()
    detail_columns_found = set()

    with open(file_path, encoding=encoding) as f:
        for line in f:
            line = line.strip()

            if "TABLEAU DE RÉSULTATS" in line:
                processed_columns = set()
                within_detail_section = False

            if line in ["<Privilège insuffisant pour accéder aux données>", "\ufeff"]:
                continue

            for header in metadata_sections + ["TABLEAU DE RÉSULTATS"]:
                if header in line:
                    if header == "TABLEAU DE RÉSULTATS":
                        within_result_table = True
                        yield {}
                    elif header in metadata_sections:
                        within_result_table = False
                    break

            if "Détail" in line:
                within_detail_section = True
                detail_columns_found.clear()
                continue

            if within_detail_section and line.startswith(tuple(detail_columns)) and "," in line:
                key, value = line.split(",", 1)
                key = key.strip()
                if key in detail_columns_found:
                    continue
                detail_columns_found.add(key)
                yield {key: value.strip()}

            elif within_result_table and not within_detail_section:
                if line.startswith(tuple(columns_of_interest)) and "," in line:
                    key, value = line.split(",", 1)
                    key = key.strip()
                    if key in processed_columns:
                        continue
                    processed_columns.add(key)
                    yield {key: value.strip()}

# Function to get all CSV files in a directory and its subdirectories
def get_all_csv_files(directory):
    csv_files = []
    for root, dirs, files in os.walk(directory):
        for file in files:
            if file.endswith('.csv'):
                csv_files.append(os.path.join(root, file))
    return csv_files

# Process file function
def process_file(file_path, columns_of_interest, detail_columns):
    try:
        encoding = detect_encoding(file_path)
        patient_data = {}
        for extracted_data in refined_extraction_with_detail(file_path, columns_of_interest, detail_columns, encoding):
            if not extracted_data:
                if patient_data:
                    patient_data["file"] = os.path.basename(file_path)
                    all_df_rows.append(patient_data)
                    patient_data = {}
            else:
                patient_data.update(extracted_data)
        if patient_data:
            patient_data["file"] = os.path.basename(file_path)
            all_df_rows.append(patient_data)
    except UnicodeDecodeError:
        print(f"UnicodeDecodeError encountered while processing file: {file_path}")

# List of metadata sections to ignore
metadata_sections = ["GeneXpert Dx System", "INFORMATIONS SUR LE TEST", "Paramètres d'analyse"]

# Define the folder directory
base_directory = r'C:\Users\Admin\Desktop\Xpert data processing'

# List of columns of interest
columns_of_interest = ['Numéro de série de la cartouche', 'Nom du module', 'N° de série du module', "N° de série de l'instrument",
                        'Version du logiciel', 'N° du lot',  "Date d'expiration", 'Heure de lancement',  'Heure de fin',  "État de l'erreur",
                        'État', 'Utilisateur',  "N° Id de l'échantillon",  'N° Id du patient',  'Test',  'Version du test',  'Type de test',
                        "Type d'échantillon",  'Remarques',  'Résultat du test']

# Additional columns to be extracted from the Detail section
detail_columns = ["SPC", "IS1081-IS6110", "rpoB1", "rpoB2", "rpoB3", "rpoB4"]

# Initialize a list to store all extracted rows
all_df_rows = []

# Process all files in the directory and its subdirectories
csv_files = get_all_csv_files(base_directory)
for file_path in csv_files:
    process_file(file_path, columns_of_interest, detail_columns)

# Create the final DataFrame
dff = pd.DataFrame(all_df_rows)
print(f"Data successfully extracted from {len(csv_files)} files, resulting in {dff.shape[0]} rows and {dff.shape[1]} columns.")

# Assuming language_map is a dictionary for column renaming
dff.rename(columns=language_map, inplace=True)
dff.replace(language_map, inplace=True)



Data successfully extracted from 1 files, resulting in 0 rows and 0 columns.


# COMBINE AND PROCESS XPERT FILES

### Define Lists, Dictionaries for replacement

#### Define list of Pool IDs in Patient ID to be Replaced in Sample ID

In [None]:
patient_id_list = [
    "P17 1-1-2-2",


    


]

#### Dictionary of Pool IDs to be replaceed with the Correct Ones

In [None]:
# dictionary of Pool IDs to repalce
pool_id_dict = {
   


    
}

#### Define a dictionary of Clean Notes to be replaced with the correct counterpart

In [None]:

# define a dictionary to further clean the notes field
clean_notes_replacement_dict = {

 
}


#### qr_code_replacement_dict

In [None]:
qr_code_replacement_dict = {






}

#### Define other Minor Dictionaries

In [None]:

# List of Notes to replace with noting
notes_to_replace_with_nothing = [




]

### Process Combined Files

In [None]:
# append the two data frames together
df = pd.concat([dfe,dff])

# create a dictionary to rename the columns using partial replacement
detail_dic = {"RÉUSSITE" : 'PASS',
	"ÉCHEC" : 'FAIL',
	'SO,' : 'NA,',
	"SO" : 'NA'
}
# do partial replacement of the detail columns
special_columns = ['SPC', 'IS1081-IS6110', 'rpoB1', 'rpoB2', 'rpoB3', 'rpoB4']
df[special_columns] = df[special_columns].replace(detail_dic, regex=True)

#extract only TB samples
df = df[df['Assay']=='Xpert MTB-RIF Ultra']

# determine pool IDs that are wrongly placed in the Patient ID field

def replace_sample_id_if_match(row, id_list):
    # Convert row['Patient ID'] to string and strip any whitespace
    patient_id = str(row['Patient ID']).strip()

    # Check if the formatted patient_id is in the list
    if patient_id in id_list:
        return patient_id
    else:
        return row['Sample ID']

# Update the Sample ID with the Patient ID value if it matches a pool ID
df['Sample ID'] = df.apply(lambda row: replace_sample_id_if_match(row, patient_id_list), axis=1)

# replace values in the SAMPLE ID using the replacement dictionary

df['Sample ID'] = df['Sample ID'].replace(pool_id_dict)

# define a function to extract the pool ID from the sample ID
def extract_pool_id(sample_id):
    """
    This function extracts the potential pool ID from the sample ID.
    - It returns None for sample IDs that start with a number from 1 to 6 followed by 3 letters.
    - It extracts and returns pool IDs that start with 'P' (case-insensitive) or 'S' followed by 3 digits and a dash.
    """
    if pd.isnull(sample_id) or not isinstance(sample_id, str):
        return None
    
    # Pattern to exclude sample IDs that start with a number from 1 to 6, followed by 3 letters
    individual_pattern = r'^[1-6][A-Za-z]{3}'
    
    # If the sample_id matches the individual_pattern, return None immediately
    if re.match(individual_pattern, sample_id):
        return None

    # Regular expression to capture pool patterns starting with "P" (case insensitive) or "S" followed by 3 digits and a dash
    pool_pattern = r'^(?:[Pp].*|[Ss]\d{3,}-.*)'
    matches = re.match(pool_pattern, sample_id)
    if matches:
        return matches.group(0)
    return None

# Extract potential pool IDs and save in a new column 'extracted_pool_ids'
df['extracted_pool_ids'] = df['Sample ID'].apply(extract_pool_id)

# function to clean the extracted pool IDs
def replace_space_in_extracted_id(extracted_id):
    """
    This function cleans up the extracted_pool_ids based on specific patterns.
    """
    if pd.isnull(extracted_id):
        return extracted_id

    # Pattern that matches a string starting with P or PS followed by at least two digits and then a space
    pattern1 = r'^(P|PS)\d{2,}\s'
    # Pattern that matches a string starting with P or p or PS followed by a space and then at least two digits
    pattern2 = r'^(P|p|PS)\s\d{2,}'
    # Pattern that matches a string starting with P or PS followed by a dash and then at least two digits
    pattern3 = r'^(P|PS)-\d{2,}'

    if re.match(pattern1, extracted_id):
        extracted_id = extracted_id.replace(" ", "-", 1)
    elif re.match(pattern2, extracted_id):
        extracted_id = extracted_id.replace(" ", "", 1)
    elif re.match(pattern3, extracted_id):
        extracted_id = extracted_id.replace("-", "", 1)
    
    # Remove all occurrences of -R and -r
    extracted_id1 = extracted_id.replace("-R", "").replace("-r", "").replace("(R)","").replace("RR","")
    
    # Replace consecutive dashes with a single dash
    extracted_id2 = re.sub(r'-+', '-', extracted_id1)
    # clean the extracted id
    extracted_id3 = extracted_id2.strip()

    # replace = with dash in extracted_id3
    extracted_id4 = extracted_id3.replace("=","-")

    # replace ; with nothing in extracted_id4
    extracted_id5 = extracted_id4.replace(";", "")
    # remove PATIENT DE REDCAP
    extracted_id6 = extracted_id5.replace("PATIENT DE REDCAP", "")

    # Replace the subsequent space with nothing
    extracted_id7 = extracted_id6.replace(" ","")

    return extracted_id7


# Apply the function to the 'extracted_pool_ids' column
df['extracted_pool_ids'] = df['extracted_pool_ids'].apply(replace_space_in_extracted_id)


# define a function to remove the left over spaces and other foriegn characters
def replace_spaces_and_double_dashes(text):
    """
    Replace all spaces within a string with a dash and then replace all double dashes with a single dash.
    
    Parameters:
    - s (str): Input string
    
    Returns:
    - str: Modified string
    """
    if not isinstance(text, str):
        return text
    text1 = text.replace(" ", "-")  # Replace spaces with dashes
    text2 = text1.replace("--", "-")  # Replace double dashes with a single dash
    text3 = text2.replace("-R","") # replaece R character
    text4 = text3.replace("R","")
    text5 = text4.replace("=", "-") # Replace equal to sign with dash
    text6 = text5.replace("-2DJA-", "-")
    text7 = text6.strip() # Remove spaces 
    text8 = text7.replace("-()","")
    return text8

# Test
df['extracted_pool_ids'] = df['extracted_pool_ids'].apply(replace_spaces_and_double_dashes)


# DEFINE A FUNCTION FOR CLEANING THE NOTES FIELD
def clean_notes(notes):
    if pd.isnull(notes) or not isinstance(notes, str):
        return notes
    
    # Remove everything from "user" till the end
    notes1 = re.sub(r'user.*$', '', notes, flags=re.IGNORECASE)
    
    # Split on comma and retain the first part
    notes2 = notes1.replace(","," ")
    
    # Trim leading and trailing spaces
    notes3 = notes2.strip()
    
    # Insert a dash after the three letters for the specified pattern
    notes4 = re.sub(r'(?<=[1-6][A-Za-z]{3})(?=\d{5})', '-', notes3)
    
    # Replace space with a dash for the specified pattern
    notes5 = re.sub(r'(?<=[1-6][A-Za-z]{3})\s', '-', notes4)
    
    # Replace multiple spaces with a single space
    notes6 = re.sub(r'\s+', ' ', notes5)
    notes7 = notes6.replace("LS:" , '')
    notes8 = notes7.replace("LS " , ' ')    
    notes9 = notes8.replace(" LS " , ' ')
    notes10 = notes9.replace("ls " , ' ')   
    notes11 = notes10.replace(" ls " , ' ')
    notes12 = notes11.replace("ls:" , ' ')
    notes13 = notes12.replace("," , ' ')
    notes14 = notes13.replace("  " , ' ')
    notes15 = notes14.strip()
    notes16 = notes15.replace("." , '-')
    
    return notes16

def remove_names_and_dates(text):
    if not isinstance(text, str):
        return text

    # Find the last occurrence of a dash followed by any characters and then a space
    pattern = r'-(?:[^-]*?)(\s)([^-]*$)'
    match = re.search(pattern, text)
    if match:
        text = text[:match.start(1)].strip()  # Remove everything from the found space to the end

    return text

# Applying the function to the 'Notes' column
df['clean_notes'] = df['Notes'].apply(clean_notes)

# Re-applying the function to the 'clean_notes' column
df['clean_notes'] = df['clean_notes'].apply(remove_names_and_dates)

# Replace specified notes with NaN
df['clean_notes'] = df['clean_notes'].replace(notes_to_replace_with_nothing, np.nan)

# Replace 'extracted_pool_ids' with NaN where 'Notes' field is blank and 'extracted_pool_ids' is not null
df.loc[(df['clean_notes'] == '') & (df['extracted_pool_ids'].notnull()), 'extracted_pool_ids'] = np.nan

# replace the cleaned notes with the dictionary
df['clean_notes'] = df['clean_notes'].replace(clean_notes_replacement_dict)

# drop the rows where extracted_pool_ids are not blank but the clean notes is blank
# Condition to drop rows
condition = (df['extracted_pool_ids'].notna()) & (df['clean_notes'].isna())

# Dropping rows that meet the condition
df = df.drop(df[condition].index)

#calculate the pool size by counting the number of spaces in the clean notes field
def count_spaces_in_notes(notes, extracted_pool_ids):
    """
    Count the number of spaces in the notes field after cleaning and trimming,
    but only if extracted_pool_ids is not null.

    :param notes: The notes field (string).
    :param extracted_pool_ids: The extracted pool IDs field.
    :return: The number of spaces in the cleaned and trimmed notes field if extracted_pool_ids is not null, otherwise 0.
    """
    if pd.isnull(notes) or pd.isnull(extracted_pool_ids):
        return 0

    # Clean and trim the notes field
    cleaned_notes = notes.strip()

    # Replace double spaces with a single space
    while "  " in cleaned_notes:
        cleaned_notes = cleaned_notes.replace("  ", " ")

    # Count the number of spaces
    return cleaned_notes.count(" ")


# Apply the function to create a new column with the number of spaces
df['pool_size'] = df.apply(lambda x: count_spaces_in_notes(x['clean_notes'], x['extracted_pool_ids']), axis=1) +1

#update the pool size to 1 if the clean notes field is empty
def set_pool_size(row):
    if pd.isnull(row['clean_notes']) or pd.isnull(row['extracted_pool_ids']):
        return 1
    else:
        return row['pool_size']

# Apply the function to each row
df['pool_size'] = df.apply(set_pool_size, axis=1)

# create a column is_pool if pool size is greater than 1
df['is_pool'] = df['pool_size'] > 1


# classify results into groups
error_results = ['ERROR', 'NO RESULT', 'INVALID']

#positive result groups
positive_results = [
        'MTB DETECTED MEDIUM||RIF Resistance NOT DETECTED',
        'MTB DETECTED HIGH||RIF Resistance NOT DETECTED', 
        'MTB DETECTED LOW||RIF Resistance NOT DETECTED',
        'MTB DETECTED VERY LOW||RIF Resistance NOT DETECTED',
        'MTB DETECTED MEDIUM||RIF Resistance DETECTED',
        'MTB DETECTED LOW||RIF Resistance DETECTED',
        'MTB DETECTED HIGH||RIF Resistance DETECTED',
        '|MTB Trace DETECTED|RIF Resistance INDETERMINATE', 
        'MTB DETECTED VERY LOW||RIF Resistance DETECTED',
        'MTB DETECTED LOW||RIF Resistance INDETERMINATE',
        'MTB DETECTED HIGH||RIF Resistance INDETERMINATE',
        'MTB DETECTED VERY LOW||RIF Resistance INDETERMINATE',
        'MTB Trace DETECTED|RIF Resistance INDETERMINATE',
        'MTB DETECTED MEDIUM||RIF Resistance INDETERMINATE',
        'MTB DETECTED VERY LOW||RIF Resistance INDETERMINATE',
        'MTB DETECTED HIGH||RIF Resistance INDETERMINATE',
        'MTB DETECTED MEDIUM||RIF Resistance DETECTED',
        'MTB DETECTED LOW||RIF Resistance INDETERMINATE',
       
       
       ]

#negative result groups
negative_results = ['MTB NOT DETECTED||']

#function to classify the test results
def classify_test_result(row):
    # Check the 'is_pool' value of the current row
    if row['is_pool'] == False:
        return 'individual'

    if row['Test Result'] in error_results:
        return 'error'
    elif row['Test Result'] in positive_results:
        return 'positive'
    elif row['Test Result'] in negative_results:
        return 'negative'
    else:
        return 'unknown'

# Applying the function to the DataFrame
df['pool_result'] = df.apply(classify_test_result, axis=1)

# segment the data frames by pools
df_pool = df[df['is_pool']==True]
df_individual = df[df['is_pool']==False]

# make a copy of the clean notes field
df_pool['clean_notes_exploded'] = df_pool['clean_notes']

# define a function to explode the dataframe using the Clean Notes field
def explode_dataframe(df_pool):
    # Split the 'clean_notes' column by space and explode it
    df_exploded = df_pool.assign(clean_notes=df_pool['clean_notes'].str.split(' ')).explode('clean_notes')
    return df_exploded
df_pool = explode_dataframe(df_pool)

df = pd.concat([df_pool, df_individual])


# define a function to further clean the Notes field to enable the extraction of the QR Code
def further_clean_notes(text):
    if not isinstance(text, str):
        return text

    # Handle the format without a dash
    pattern1 = r'([1-6][A-Za-z]{3})(\d{1,4})(?!\d|-)'
    replacement1 = lambda m: m.group(1) + '-' + m.group(2).zfill(5)
    text = re.sub(pattern1, replacement1, text)

    # Handle the format with a dash but less than 5 digits after the dash
    pattern2 = r'([1-6][A-Za-z]{3}-)(\d{1,4})(?!\d)'
    replacement2 = lambda m: m.group(1) + m.group(2).zfill(5)
    text2 = re.sub(pattern2, replacement2, text)

    # Handle the format with 6 digits after the dash, where the first digit is 0
    pattern3 = r'([1-6][A-Za-z]{3}-)0(\d{5})'
    replacement3 = lambda m: m.group(1) + m.group(2)
    text3 = re.sub(pattern3, replacement3, text2)

    return text3

#clean the clean notes fied to extract qr codes
df['clean_notes'] = df['clean_notes'].apply(further_clean_notes)
df['clean_notes'] = df['clean_notes'].apply(further_clean_notes)
df['Sample ID'] = df['Sample ID'].apply(further_clean_notes)
df['Patient ID'] = df['Patient ID'].apply(further_clean_notes)

df['clean_notes'] = df['clean_notes'].str.upper()
df['Sample ID'] = df['Sample ID'].str.upper()
df['Patient ID'] = df['Patient ID'].str.upper()

# Replace the wrongly typed qr code with the qr_code_replacement_dict
df['clean_notes'] = df['clean_notes'].replace(qr_code_replacement_dict)

# do the replacement in Sample ID field
df['Sample ID'] = df['Sample ID'].replace(qr_code_replacement_dict)

# do the replacement in Patient ID field
df['Patient ID'] = df['Patient ID'].replace(qr_code_replacement_dict)

# define a function to extract the QR Code
def extract_qr_codes(df):
    # Define the regex pattern for a QR code
    qr_pattern = r'([1-6][A-Z]{3}-\d{5})'
    
    def get_qr_code(row):
        # First attempt to extract from the 'clean_notes' column if 'is_pool' is True
        if row['is_pool']:
            match = re.search(qr_pattern, str(row['clean_notes']))
            if match:
                return match.group(0)
        
        # If not found in 'clean_notes' or 'is_pool' is False, try the 'Sample ID' column
        match = re.search(qr_pattern, str(row['Sample ID']))
        if match:
            return match.group(0)
        
        # If not found in 'Sample ID', try the 'Patient ID' column
        match = re.search(qr_pattern, str(row['Patient ID']))
        if match:
            return match.group(0)
        
        return None  # If no QR code found in any of the fields

    # Extract QR codes based on the function
    df['qr_codes'] = df.apply(get_qr_code, axis=1)
    
    return df

df = extract_qr_codes(df)


# Function to extract a QR code from a single row
def get_qr_code2(row):
    qr_pattern = r'([1-6][A-Z]{3}-\d{5})'

    # Check 'Sample ID' for a QR code
    match = re.search(qr_pattern, str(row['Sample ID']))
    if match:
        return match.group(0)
    
    # If not found, check 'Patient ID'
    match = re.search(qr_pattern, str(row['Patient ID']))
    if match:
        return match.group(0)

    # If still not found, check 'clean_notes'
    match = re.search(qr_pattern, str(row['clean_notes']))
    if match:
        return match.group(0)
    
    return None  # Return None if no QR code found

# Update 'qr_codes' where it is currently null
mask = df['qr_codes'].isnull()
df.loc[mask, 'qr_codes'] = df[mask].apply(get_qr_code2, axis=1)

# Now df contains updated 'qr_codes' values
# define a function to extract the S4A QR Code
def extract_s4a_qr_codes(row, columns_to_search):
    """
    Extract the first S4A QR code from specified columns of a DataFrame row.

    :param row: A row of the DataFrame.
    :param columns_to_search: List of column names to search for the QR code.
    :return: The first extracted S4A QR code or NaN if none found.
    """
    pattern = r'SC[1-5]-\d{4}'

    for column in columns_to_search:
        if not isinstance(row[column], str):
            continue

        match = re.search(pattern, row[column])
        if match:
            return match.group(0)

    return np.nan

# Define columns to search for QR codes
columns_to_search = ['clean_notes', 'Patient ID', 'Sample ID']

# Apply the function to create a new column with the extracted S4A QR codes
df['s4a'] = df.apply(extract_s4a_qr_codes, args=(columns_to_search,), axis=1)

# Define a function to determine the project based on s4a and qr_codes
def determine_project(row):
    if pd.notnull(row['s4a']):
        return 'S4A'
    elif pd.notnull(row['qr_codes']):
        return 'Inspire'
    else:
        return 'Other'

# Apply the function to create a new column 'project'
df['project'] = df.apply(determine_project, axis=1)

# Update 'qr_codes' to the value of 's4a' if 'qr_codes' is null
df['qr_codes'] = df.apply(lambda row: row['s4a'] if pd.isnull(row['qr_codes']) else row['qr_codes'], axis=1)

# define a function to update the qr code in place
def update_qr_codes_inplace(df):
    """
    Update the qr_codes column in the DataFrame in place.
    If is_pool is True and qr_codes is null, set qr_codes to clean_notes.
    Otherwise, set qr_codes to Sample ID where qr_codes is null.

    Parameters:
    df (pandas.DataFrame): The DataFrame to be updated.
    """
    # Compute the values to be assigned
    values_to_assign = np.where(
        df['is_pool'] & df['qr_codes'].isnull(), 
        df['clean_notes'], 
        df['Sample ID']
    )

    # Filter the values to match the length of rows where 'qr_codes' is null
    values_to_assign = values_to_assign[df['qr_codes'].isnull()]

    # Assign the computed values to 'qr_codes' where 'qr_codes' is null
    df.loc[df['qr_codes'].isnull(), 'qr_codes'] = values_to_assign

update_qr_codes_inplace(df)

# Merge the xpert files with the main dataframe
df = df.merge(xpert_files[['file', 'lab', 'start_date', 'end_date', 'date_created']], on = 'file', how = 'left')

# Convert 'date_created' to datetime format
df['date_created'] = pd.to_datetime(df['date_created'])

# Sort by 'Cartridge S/N' and 'date_created' in descending order
df = df.sort_values(by=['Cartridge S/N', 'date_created'], ascending=[True, False])

# Group by 'Cartridge S/N' and keep only the rows with the latest 'date_created' for each group
df = df[df['date_created'] == df.groupby('Cartridge S/N')['date_created'].transform('max')]

# sort the dataframe by the pool result, pool size, and qr codes
df.sort_values(by =['is_pool','Cartridge S/N', 'file'], ascending=[False, True, True], inplace = True)
df = df.drop_duplicates(subset = ['Cartridge S/N','qr_codes'])
df.dropna(subset = ['qr_codes'], inplace = True)

# Define the desired categories order
categories_order = ['positive', 'negative', 'individual', 'error']

# Convert the 'pool_result' column to categorical with the specified order
df['pool_result'] = pd.Categorical(df['pool_result'], categories=categories_order, ordered=True)

# Convert the 'pool_result' column to categorical with the specified order
df['pool_result'] = pd.Categorical(df['pool_result'], categories=categories_order, ordered=True)

#drop duplicates
df.drop_duplicates(subset = ['Cartridge S/N', 'qr_codes'], inplace = True)

df.sort_values(by=['date_created','is_pool', 'pool_result', 'qr_codes',], ascending=[False, False, True, True,], inplace=True)

#determine the duplicate number for each qr code
df['dup_no'] = df.groupby('qr_codes').cumcount() + 1
# Sort the dataframe
df.sort_values(by=['pool_result', 'qr_codes', 'dup_no'], ascending=[True, True, True], inplace=True)

#Write a function to get the individual results
def get_individual_values(row, df):
    # Check if the row is a positive pooled sample
    if row['is_pool'] and row['pool_result'] == 'positive':
        # Create a subset DataFrame for individual tests matching the QR code of the pool
        subset_df = df[(df['qr_codes'] == row['qr_codes']) & (~df['is_pool'])]
        # If individual tests are found, return the Test Result of the first one
        if not subset_df.empty:
            return subset_df['Test Result'].iloc[0]
        # If no individual tests are found, return "No Individual Results Yet"
        else:
            return "No Ind Test Yet"
    # For all other cases, return the Test Result of the row
    return row['Test Result']


# Apply the function to the 'Test Result' column of df_valid
df['individual_results'] = df.apply(lambda row: get_individual_values(row, df), axis=1)

# function to get the individual cartridge SN
def get_individual_cartridge_sn(row):
    # If the row represents a pool and the result is positive
    if row['is_pool'] and row['pool_result'] == 'positive':
        # Filter df to find the corresponding row where 'is_pool' is False
        subset_df = df[(df['qr_codes'] == row['qr_codes']) & (~df['is_pool'])]

        # If there's a match, return the 'Cartridge S/N' value from the subset
        if not subset_df.empty:
            return subset_df['Cartridge S/N'].iloc[0]
        # If no individual test is found, return "No Individual Results Yet"
        else:
            return None
    
    # For all other cases (including pools that are not positive), return None
    return None

df['individual_cartridge_sn'] = df.apply(lambda row: get_individual_cartridge_sn(row), axis=1)

# Define the categories based on individual results
def categorize_result(individual_result):
    if individual_result in positive_results:
        return 'positive'
    elif individual_result in negative_results:
        return 'negative'
    elif individual_result in error_results:
        return 'invalid'
    else:
        return 'unknown'

df['indiv_result_category'] = df['individual_results'].apply(categorize_result)


#create the final pool size for individuals where the pool was positive
def update_pool_size(df):
    # Create a new column 'final_pool_size' with the same values as 'pool_size' initially
    df['final_pool_size'] = df['pool_size']
    
    # Find the unique xpert_ids with more than one occurrence
    xpert_ids = df['qr_codes'].value_counts()
    xpert_ids = xpert_ids[xpert_ids > 1].index.tolist()
    
    # Iterate through each unique xpert_id
    for xpert_id in xpert_ids:
        # Check if xpert_id has both is_pool True and False entries
        if df[(df['qr_codes'] == xpert_id) & (df['is_pool'])].empty or df[(df['qr_codes'] == xpert_id) & (~df['is_pool'])].empty:
            continue  # Skip ids that don't have both True and False entries
        
        # Get the pool size where is_pool is True
        true_pool_size = df[(df['qr_codes'] == xpert_id) & (df['is_pool'])]['pool_size'].iloc[0]
        
        # Update the 'final_pool_size' where is_pool is False for the same xpert_id
        df.loc[(df['qr_codes'] == xpert_id) & (~df['is_pool']), 'final_pool_size'] = true_pool_size
        
    return df

# Now call the function to update the DataFrame
df = update_pool_size(df) 

# if final pool size  = 1, then test type = individual else pooled
df['test_type'] = np.where(df['final_pool_size']==1, 'individual', 'pooled')

# Define the QR code pattern
qr_pattern = r'([1-6][A-Z]{3}-\d{5})'

# Function to extract the first four characters if qr_code matches the pattern
def extract_site(qr_code):
    if re.match(qr_pattern, qr_code):
        return qr_code[:4]
    else:
        return None  # Or return '' if you prefer an empty string for non-matches

# Create the 'site' column based on 'qr_codes'
df['site'] = df['qr_codes'].apply(extract_site)

# determine if someone has TB
df['has_tb'] = df['indiv_result_category'] == 'positive'

# where site is not null, xpert_id = qr_codes
df['xpert_id'] = np.where(df['site'].notnull(), df['qr_codes'], np.nan)

# if both extracted_pool_ids and Notes are notnull, then pool_id = extracted_pool_ids
df['pool_id'] = np.where(df['extracted_pool_ids'].notnull() & df['Notes'].notnull(), df['extracted_pool_ids'], np.nan)


# convert datetime columns to datetime
df['Expiration Date'] = df['Expiration Date'].apply(fix_dates)
df['Start Time'] = df['Start Time'].apply(fix_dates)
df['End Time'] = df['End Time'].apply(fix_dates)
# Define the date for comparison
date_threshold = pd.Timestamp('2023-06-27')
current_date = pd.Timestamp.today()

# Create the 'date' column based on the conditions
df['date'] = np.where(
    df['End Time'] < date_threshold, df['start_date'],
    np.where(df['End Time'] > current_date, df['end_date'], df['End Time']))

#fill missing values in the date column with the enddate value
df['date'] = df['date'].fillna(df['end_date'])

def classify_pool(group):
    """
    Classify the pool based on individual test results within the group.
    """
    if group['pool_result'].iloc[0] == 'error':
        return 'Error'
    elif group['pool_result'].iloc[0] == 'negative':
        return 'Negative'
    elif group['pool_result'].iloc[0] == 'positive':
        if all(group['indiv_result_category'] == 'negative'):
            return 'False Positive'
        elif any(group['indiv_result_category'] == 'positive'):
            return 'True Positive'
        elif all((group['indiv_result_category'] == 'negative') | (group['indiv_result_category'] == 'unknown')):
            return 'Incomplete'
    return 'Unknown'

# Filter the DataFrame for pooled samples and apply the classification function to each pooled group
pooled_df = df[df['is_pool']]
pooled_classifications = pooled_df.groupby('Cartridge S/N').apply(classify_pool)

# Map the classifications back to the original DataFrame for pooled samples
df.loc[df['is_pool'], 'pool_classification'] = df['Cartridge S/N'].map(pooled_classifications)

# Set pool_classification to NaN where final_pool_size is 1
df.loc[df['final_pool_size'] == 1, 'pool_classification'] = np.nan

#drop unnecessary columns
df.drop(columns= ['rpoB1 melt', 'rpoB2 melt', 'rpoB3 melt', 'rpoB4 melt', 'rpoB1 Mut melt', 'rpoB2 Mut melt', 'rpoB3 Mut melt',
       'rpoB4 Mut melt A', 'rpoB4 Mut melt B', 'Patient ID 2', 'Assay', 'Assay Version', 'Assay Type', 'Test Type',
       'clean_notes', 'Module Name', 'start_date' , 's4a', 'start_date', 'end_date', 'date_created', 'Sample Type', 'clean_notes',], inplace=True)

In [None]:
df

: 