# Robot Language Conversion
script leverages fine-tuned model to obtain the standardized protocols, and converts to robot language
   
## input files
- **config file** 

    - In cofig file, please specify the following four items:

        - **input_file** : saved protocols from paper extraction 
        - **input_content** : mannually script content if the extraction didn't work 
        - **model_selection** : select model among five fine-tuned models from five-fold cross-validation 
        - **output_path** : save the converted robot language file
        
## output files

The obtained protocols are stored in the **"./output_language/"** folder.


In [1]:
import json
import os
import openai
import pandas as pd
import re
import csv
import ast
import spacy
import configparser
import errno
import logging

from word2number import w2n

In [2]:
config_path = "./robot_language_config.ini"

In [3]:
def content_reformat(input_path):
    # Read the CSV file into a DataFrame
    df = pd.read_csv(input_path)

    # Initialize variables to track the current article
    current_article = ''
    protocols = []
    i = None
    
    # Iterate through the DataFrame and combine headings and content when 'protocol index' is 0
    for index, row in df.iterrows():
        protocol_index = row['protocol index']
        heading = row['heading name']
        content = row['content']

        # Check if the protocol index is equal to i
        if protocol_index == i or i is None:
            # Concatenate heading and content into the current article
            current_article += f"{heading}\n{content}\n"
        else:
            # Save the current article to protocols
            protocols.append(current_article)

            # Start a new current article with the current row
            current_article = f"{heading}\n{content}\n"

        # Update the current protocol index
        i = protocol_index
    
    # Append the last article
    if current_article:
        protocols.append(current_article)
    
    return protocols

def gpt_extraction(content, openai_api_key, Model):
    
    openai.api_key = openai_api_key
    # input content
    completion = openai.ChatCompletion.create(
              model=Model,
              temperature = 0,
              messages=[
                {"role": "system", "content": "You are a biomedical expert"},
                {"role": "user", "content": content}
              ],
            )
    # obtain result
    output_message = completion.choices[0].message['content']
    return output_message

def extract_PCR(text):
    
    content = []
    result_content = ast.literal_eval(text)

    # find # of PCR
    if len(result_content)==2:
        rc = ['First PCR', 'Second PCR']
        for i in rc:
            reagents = result_content[i]['Thermal cycling condition'][0]
            # Iterate through the reagent components and write the data to the CSV
            cycle = reagents.get('Program')
            detail = reagents.get('Thermal cycler', [{}])[0]
            machine = detail.get('PCRmachine')
            content.append(cycle)
    else:
        reagents = result_content['Thermal cycling condition'][0]

        # Iterate through the reagent components and write the data to the CSV
        cycle = reagents.get('Program')
        content.append(cycle)
        detail = reagents.get('Thermal cycler', [{}])[0]
        machine = detail.get('PCRmachine')
        
    return content, machine

In [4]:
def split_text_into_clauses(text):
    split_pattern1 = re.compile(r'[;]\s*')
    split_pattern2 = re.compile(r'(?<!\d)[.](?!\d)\s*')
    split_pattern3 = re.compile(r'[,](?!\s+\d|\s+followed|\s+and)')

    # Split the text into sentences using the custom pattern
    sentences1 = split_pattern1.split(text)

    sentences2 = []
    sentences3 = []
    for sentence in sentences1:
        sub_sentences = split_pattern2.split(sentence)
        sentences2.extend(sub_sentences)
        
    for sentence in sentences2:
        sub_sentences = split_pattern3.split(sentence)
        sentences3.extend(sub_sentences)

    return sentences3

def extract_temp_dura_details(content):
#     temperature_pattern = r'(\d+\s*°\s*C)'
    temperature_pattern = r'(\d+(?:\.\d+)?\s*°\s*C)'
    duration_pattern = r'(\d+\s*(?:s|S|min|Min|MIN|Seconds|seconds|minutes|Minutes))'
    
    temp_dutn = re.findall(temperature_pattern + r'\s*.*?' + duration_pattern, content, re.I)
    if not temp_dutn:
        matches = re.findall(duration_pattern + r'\s*.*?' + temperature_pattern, content, re.I)
        temp_dutn = []
        for match in matches:
            temp_dutn.append((match[1], match[0]))
            
    return temp_dutn

def extract_cycle(content):
    # Extract number of cycles
    cycles = re.findall(r'(\d+\s*cycles)|(\w+\s*cycles)', content, re.I)
    return cycles

def allcycle(content, cycles):
    cyc_file = None
    # Add fixed sections
    fixed_sections = (
            "[Properties]\nName=\nComment=\nDWS-ability=0x00000015\n\n",
            "[EstTime]\nDuration=00:00:00\n\n",
            "[000]\nOpcode=HEADER\nLidTemp=105.0∞C\nWait=YES\nLowBlock=AUTO\nEmuMode=FULL\nTempMode=STANDARD\nControl=SIM_TUBE\nImpuls=NO\nBlocktype=ANY\nSlope=STEADY\nLidTrack=NO\n"
        )
    cyc_file = fixed_sections[0] + fixed_sections[1] + fixed_sections[2]
    
    temp_file = ""  # temporary file to store steps in cycles
    
    order_num = 1
    temp_num = 1
    for sentence in content:
        # extract temperature and duration
        temp_dutn = extract_temp_dura_details(sentence)
        
        # extract number of temperature and duration for cycles
        tempNum = len(temp_dutn)

        # set initial seconds and minutes
        minutes = 0
        seconds = 0
        
        # Define a regular expression pattern to match 'room temperature' case-insensitively
        pattern = re.compile(r'room temperature', re.IGNORECASE)

        # Search for the pattern in the sentence
        match = pattern.search(sentence)
        
        if match:
             for i in temp_dutn:
                minutes = '00'
                seconds = '00'
                temp = nlp(i[0])
                temp_number = temp[0]
                dur = i[1]

                # find minutes and seconds
                if 'm' in dur:
                    d = nlp(dur)
                    minutes = d[0]
                else:
                    d = nlp(dur)
                    seconds = d[0]
                cyc_file += f"\n[00{order_num}]\nOpcode=TEMP\nTemp={temp_number}∞C {temp_number}∞C {minutes}:{seconds} R=100%\n"
                order_num += 1
        else:
            for i in temp_dutn:
                minutes = '00'
                seconds = '00'
                temp = nlp(i[0])
                temp_number = temp[0]
                dur = i[1]

                # find minutes and seconds
                if 'm' in dur:
                    d = nlp(dur)
                    minutes = d[0]
                else:
                    d = nlp(dur)
                    seconds = d[0]
                temp_file += f"Temp{temp_num}={temp_number}∞C {temp_number}∞C {minutes}:{seconds} +0.0∞C +00:00 R=100%\n"
                temp_num += 1
            
    cyc_file += f"\n[00{order_num}]\nOpcode=CYCLE\nTempNum={temp_num-1}\nCycles={cycles}\n"
    cyc_file += temp_file
    order_num += 1
    cyc_file += f"\n[00{order_num}]\nOpcode=END\n\n[~CRC32~]\ncrc=0x1D14D6F3"
    return cyc_file

def Biorad_allcycle(content, cycles):
    cyc_file = "[ProtocolRunDefinition version 06.00]METHOD CALC;HOTLID 105,30;VOLUME 40;"
    allcycle_file = None
    
    temp_file = ""  # temporary file to store steps in cycles
    
    order_num = 1
    temp_num = 1
    for sentence in content:
        # extract temperature and duration
        temp_dutn = extract_temp_dura_details(sentence)
        
        # extract number of temperature and duration for cycles
        tempNum = len(temp_dutn)

        # set initial seconds and minutes
        minutes = 0
        seconds = 0
        times = 0
        # Define a regular expression pattern to match 'room temperature' case-insensitively
        pattern = re.compile(r'room temperature', re.IGNORECASE)

        # Search for the pattern in the sentence
        match = pattern.search(sentence)
        
        if match:
            for i in temp_dutn:
                times = 0
                temp = nlp(i[0])
                temp_number = temp[0]
                dur = i[1]
                
                # find minutes and seconds
                if 'm' in dur:
                    d = nlp(dur)
                    minutes = d[0]
                    times += int(minutes.text) * 60
                else:
                    d = nlp(dur)
                    seconds = d[0]
                    times += int(seconds.text)
                cyc_file += f"TEMP {float(temp_number.text)},{times};"
                order_num += 1
        else:
            for i in temp_dutn:
                times = 0
                temp = nlp(i[0])
                temp_number = temp[0]
                dur = i[1]

                # find minutes and seconds
                if 'm' in dur:
                    d = nlp(dur)
                    minutes = d[0]
                    times += int(minutes.text) * 60
                else:
                    d = nlp(dur)
                    seconds = d[0]
                    times += int(seconds.text)
                temp_file += f"TEMP {float(temp_number.text)},{times};"
                temp_num += 1
            
    cyc_file += temp_file
    cyc_file += f"GOTO {order_num},{cycles};"
    cyc_file += f"END;"
    return cyc_file

def convert_text_to_number(input_string):
    words = input_string.split()
    
    for i, word in enumerate(words):
        try:
            numeric_value = w2n.word_to_num(word)
            words[i] = str(numeric_value)
        except ValueError:
            pass

    return ' '.join(words)

def Eppendorf_file(content, nlp, temp_dutn):
    cyc_file = None
    fixed_sections = (
            "[Properties]\nName=\nComment=\nDWS-ability=0x00000015\n\n",
            "[EstTime]\nDuration=00:00:00\n\n",
            "[000]\nOpcode=HEADER\nLidTemp=105.0∞C\nWait=YES\nLowBlock=AUTO\nEmuMode=FULL\nTempMode=STANDARD\nControl=SIM_TUBE\nImpuls=NO\nBlocktype=ANY\nSlope=STEADY\nLidTrack=NO\n"
        )
    cyc_file = fixed_sections[0] + fixed_sections[1] + fixed_sections[2]
    allcycle_file = None

    order_num = 1
    cycles = None

    for sentence in content:
        # extract temperature and duration
        temp_dutn = extract_temp_dura_details(sentence)

        # extract number of temperature and duration for cycles
        tempNum = len(temp_dutn)

        # set initial seconds and minutes
        minutes = 0
        seconds = 0

        if extract_cycle(sentence):
            cycles_match = re.search(r'(\d+\s*cycles)|(\w+\s*cycles)', sentence, re.I)
            match = nlp(cycles_match.group())

            # find number of cycles
            for word in match:
                try:
                    num_cycles = w2n.word_to_num(str(word))
                    cycles = num_cycles
                    break
                except:
                    continue

            if tempNum == 0:
                allcycle_file = allcycle(content, cycles)
                break
            else:
                cyc_file += f"\n[00{order_num}]\nOpcode=CYCLE\nTempNum={tempNum}\nCycles={cycles}\n"
                # if didn't mention how many cycles, will be all in cycles
                for i in range(len(temp_dutn)):
                    minutes = '00'
                    seconds = '00'
                    temp = nlp(temp_dutn[i][0])
                    temp_number = temp[0]
                    dur = temp_dutn[i][1]

                    # find minutes and seconds
                    if 'm' in dur:
                        d = nlp(dur)
                        minutes = d[0]
                    else:
                        d = nlp(dur)
                        seconds = d[0]
                    cyc_file += f"Temp{i+1}={temp_number}∞C {temp_number}∞C {minutes}:{seconds} +0.0∞C +00:00 R=100%\n"
            order_num += 1
        else:
            for i in temp_dutn:
                minutes = '00'
                seconds = '00'
                temp = nlp(i[0])
                temp_number = temp[0]
                dur = i[1]

                # find minutes and seconds
                if 'm' in dur:
                    d = nlp(dur)
                    minutes = d[0]
                else:
                    d = nlp(dur)
                    seconds = d[0]
                cyc_file += f"\n[00{order_num}]\nOpcode=TEMP\nTemp={temp_number}∞C {temp_number}∞C {minutes}:{seconds} R=100%\n"
                order_num += 1

    cyc_file += f"\n[00{order_num}]\nOpcode=END\n\n[~CRC32~]\ncrc=0x1D14D6F3"
    return cyc_file, allcycle_file

def Biorad_file(content, nlp, temp_dutn):
    cyc_file = "[ProtocolRunDefinition version 06.00]METHOD CALC;HOTLID 105,30;VOLUME 40;"
    allcycle_file = None
    order_num = 1
    cycles = None

    for sentence in content:
        # extract temperature and duration
        temp_dutn = extract_temp_dura_details(sentence)
        # extract number of temperature and duration for cycles
        tempNum = len(temp_dutn)
        
        # set initial seconds and minutes
        minutes = 0
        seconds = 0
        times = 0

        if extract_cycle(sentence):
            cycles_match = re.search(r'(\d+\s*cycles)|(\w+\s*cycles)', sentence, re.I)
            match = nlp(cycles_match.group())

            # find number of cycles
            for word in match:
                try:
                    num_cycles = w2n.word_to_num(str(word))
                    cycles = num_cycles
                    break
                except:
                    continue

            # if sentence only mention cycles without temperature and duration, it will be all cycles
            if tempNum == 0:
                allcycle_file = Biorad_allcycle(content, cycles)
                break
            else:
                cyc_num = order_num
                for i in range(len(temp_dutn)):
                    temp = nlp(temp_dutn[i][0])
                    temp_number = temp[0]
                    dur = temp_dutn[i][1]
                    times = 0
                    
                    # find minutes and seconds
                    if 'm' in dur:
                        d = nlp(dur)
                        minutes = d[0]
                        times += int(minutes.text) * 60
                    else:
                        d = nlp(dur)
                        seconds = d[0]
                        times += int(seconds.text)
                    cyc_file += f"TEMP {float(temp_number.text)},{times};"
                cyc_file += f"GOTO {cyc_num},{int(cycles)-1};"
            order_num += 1
        else:
            for i in temp_dutn:
                times = 0
                temp = nlp(i[0])
                temp_number = temp[0]
                dur = i[1]
                # find minutes and seconds
                if 'm' in dur:
                    d = nlp(dur)
                    minutes = d[0]
                    times += int(minutes.text) * 60
                else:
                    d = nlp(dur)
                    seconds = d[0]
                    times += int(seconds.text)
                cyc_file += f"TEMP {float(temp_number.text)},{times};"
                order_num += 1

    cyc_file += f"END;"
    return cyc_file, allcycle_file

def cyc_generator(content, results_save_path, openai_api_key, model, logger, i=None):
    try:
        # Load the English language model
        nlp = spacy.load("en_core_web_sm")
    except Exception as e:
        logger.error(f"Error occured in loading spacy en_core_web_sm:{str(e)}")
        raise ValueError(f"Error occured in loading spacy en_core_web_sm:{str(e)}")
  
    try:
        # Obtain protocol using fine-tuned model
        gpt_content = gpt_extraction(content, openai_api_key, model)
    except Exception as e:
        logger.error(f"Error occured in extraction from fine-tuned model:{str(e)}")
        raise ValueError(f"Error occured in extraction from fine-tuned model:{str(e)}")
    try:
        if i:
            with open(f"{results_save_path}gpt_out_{i}.txt", "w", encoding="utf-8") as file:
                    file.write(gpt_content)
        else:
            with open(f"{results_save_path}gpt_out.txt", "w", encoding="utf-8") as file:
                    file.write(gpt_content)
    except Exception as e:
        logger.error(f"Error occured when save gpt output:{str(e)}")
        raise ValueError(f"Error occured when save gpt output:{str(e)}")
        
    try:
        # Obtain thermal cycler part
        pcr_content, _ = extract_PCR(gpt_content)
        
    except Exception as e:
        logger.error(f"Error occured in extracting thermal cycler part:{str(e)}")
        raise ValueError(f"Error occured in extracting thermal cycler part:{str(e)}")
        
    for j in range(len(pcr_content)):
        thermalcyc = []
        try:
            # split text for extraction
            pcr = split_text_into_clauses(pcr_content[j])

            # Convert text into number if it exist
            for k in range(len(pcr)):
                thermalcyc.append(convert_text_to_number(pcr[k]))
        except Exception as e:
            logger.error(f"Error occured in split/convert text:{str(e)}")
            raise ValueError(f"Error occured in split/convert text:{str(e)}")

        try:
            # Extract temperature and duration time
            temp_dutn = extract_temp_dura_details(str(thermalcyc))
        except Exception as e:
            logger.error(f"Error occured in extract temperature and duration time:{str(e)}")
            raise ValueError(f"Error occured in extract temperature and duration time:{str(e)}")

        try:
            # Obtain CYC and TEMP section
            cyc_file, allcyc = Eppendorf_file(thermalcyc, nlp, temp_dutn)
            
            path = f"{results_save_path}Eppendorf_file_{i}_{j}.txt"

            if allcyc != None:
                with open(path, "w", encoding="utf-8") as file:
                    file.write(allcyc)
            else:
                with open(path, "w", encoding="utf-8") as file:
                    file.write(cyc_file)
            
        except Exception as e:
            logger.error(f"Error occured in obtain Eppendorf CYC and TEMP section:{str(e)}")
            raise ValueError(f"Error occured in obtain Eppendorf CYC and TEMP section:{str(e)}")
        
        try:
            # Obtain CYC and TEMP section
            biocyc_file, bioallcyc = Biorad_file(thermalcyc, nlp, temp_dutn)
            
            biopath = f"{results_save_path}BioRad_file{i}_{j}.txt"

            if bioallcyc != None:
                with open(biopath, "w", encoding="utf-8") as file:
                    file.write(bioallcyc)
            else:
                with open(biopath, "w", encoding="utf-8") as file:
                    file.write(biocyc_file)
            
        except Exception as e:
            logger.error(f"Error occured in obtain BioRad CYC and TEMP section:{str(e)}")
            raise ValueError(f"Error occured in obtain BioRad CYC and TEMP section:{str(e)}")
    return cyc_file, allcyc, biocyc_file, bioallcyc
    

In [5]:
def main(input_path, manu_content, results_save_path, openai_api_key, model, logger):
    
    # if use manual input
    if manu_content:
        try:
            content = manu_content
        except Exception as e:
            logger.error(f"Error occured in loading custom content:{str(e)}")
            raise ValueError(f"Error occured in loading custom content:{str(e)}")
            
        cyc_file, allcyc, biocyc_file, bioallcyc = cyc_generator(content, results_save_path, openai_api_key, model, logger)
        
    else:   
        try:
            input_content = content_reformat(input_path)
        except Exception as e:
            logger.error(f"Error occured in loading input content:{str(e)}")
            raise ValueError(f"Error occured in loading input content:{str(e)}")

        for i in range(len(input_content)):
            content = input_content[i]
            cyc_file, allcyc, biocyc_file, bioallcyc = cyc_generator(content, results_save_path, openai_api_key, model, logger, i)
    return cyc_file, allcyc, biocyc_file, bioallcyc

In [6]:
### read ini file
config = configparser.ConfigParser()

if not os.path.exists(config_path):
    raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), config_path)

try:
    config.read(config_path, encoding="utf-8")

    ## input link
    input_path = config.get("input_file", "input_path")
    
    ## input content
    manu_content = config.get("input_content", "content")
    
    ## model selection
    model_num = config.getint("model_selection", "model_num")
    
    ## output dir
    results_save_path = config.get("output_path", "output_path")
    
    ## openai key
    openai_api_key = config.get("openai_key", "key")
    
    ## fine-tuned models
    models = config.get("finetuned_models", "models")
    
    # Set the file path for the log file
    log_file = "./error.log"

    # Configure the logging settings
    logger = logging.getLogger()
    logger.setLevel(logging.ERROR)
    logger.setLevel(logging.INFO)

    # Create a FileHandler to save logs to the specified file path
    file_handler = logging.FileHandler(log_file)
    file_handler.setLevel(logging.ERROR)
    file_handler.setLevel(logging.INFO)

    # Create a formatter to customize the log message format
    formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
    file_handler.setFormatter(formatter)
    # Add the FileHandler to the logger
    logger.addHandler(file_handler)
    
    # Parse the string representation of the list into an actual list
    models = ast.literal_eval(models)
    
    model = models[model_num-1]
    
except Exception as e:
    print("Error occured in reading setting.ini:", e)

Eppendorf_file1, Eppendorf_file2, BioRad_file1, BioRad_file2 = main(input_path, manu_content, results_save_path, openai_api_key, model, logger)