<a href="https://colab.research.google.com/github/thanh727/rational-primer-design/blob/main/Application_primer_design.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# @title üõ†Ô∏è Step 1: Install & Setup
# @markdown Run this cell to install dependencies and clone your updated repository.

import os
import sys

# 1. Install System Dependencies (BLAST)
print("‚è≥ Installing NCBI BLAST+...")
!apt-get update -qq > /dev/null
!apt-get install -y ncbi-blast+ > /dev/null

# 2. Clone Repository (Using the standard URL, change if private)
if not os.path.exists("rational-primer-design"):
    print("‚è≥ Cloning Repository...")
    !git clone -q https://github.com/thanh727/rational-primer-design.git
else:
    print("üîÑ Updating Repository...")
    %cd rational-primer-design
    !git pull -q
    %cd ..

# 3. Enter the Directory
try:
    %cd rational-primer-design
except:
    print("‚ùå Error: Could not find the repository folder.")

# 4. Install Python Libraries
print("‚è≥ Installing Python Dependencies...")
!pip install -q primer3-py biopython pandas tqdm

# 5. Create Workspace Folders
os.makedirs("config_files", exist_ok=True)
os.makedirs("results_auto", exist_ok=True)
os.makedirs("results_local", exist_ok=True)

print("\n‚úÖ Setup Complete! The pipeline is ready.")

In [None]:
# @title üìÇ Option A: Automatic Design (NCBI Download)
# @markdown Enter your search terms. You can now set specific Size and Count limits for EACH target/background.

import ipywidgets as widgets
from IPython.display import display, clear_output
import json
import os
import sys
import subprocess
import shutil
from google.colab import files, drive

# --- GUI ELEMENTS ---
header = widgets.HTML("<h2>üöÄ Automatic Primer Design (NCBI)</h2>")

# General Inputs
email_input = widgets.Text(description="NCBI Email:", placeholder="required@example.com", style={'description_width': 'initial'}, layout=widgets.Layout(width='60%'))
project_input = widgets.Text(description="Project Name:", value="Auto_Run_01", style={'description_width': 'initial'}, layout=widgets.Layout(width='60%'))
output_input = widgets.Text(description="Output Folder:", value="results_auto", style={'description_width': 'initial'}, layout=widgets.Layout(width='60%'))

# --- ADVANCED PARAMETERS ---
style = {'description_width': 'initial'}
layout_half = widgets.Layout(width='48%')

blast_checkbox = widgets.Checkbox(value=True, description='Enable BLAST Annotation', indent=False)
w_min_sens = widgets.FloatSlider(value=95.0, min=50.0, max=100.0, step=0.1, description='Min Sensitivity (%)', style=style)
w_min_cons = widgets.FloatSlider(value=0.90, min=0.50, max=1.00, step=0.01, description='Min Conservation', style=style)
w_max_xr = widgets.FloatSlider(value=5.0, min=0.0, max=100.0, step=0.1, description='Max Cross-Reactivity', style=style)
w_prod_min = widgets.IntText(value=100, description='Min Product (bp)', style=style, layout=layout_half)
w_prod_max = widgets.IntText(value=350, description='Max Product (bp)', style=style, layout=layout_half)
w_primer_len = widgets.IntText(value=20, description='Primer Length', style=style, layout=layout_half)
w_max_mm = widgets.IntText(value=2, description='Max Mismatches', style=style, layout=layout_half)
w_cpu = widgets.IntText(value=0, description='CPU (0=Auto)', style=style, layout=layout_half)

advanced_ui = widgets.Accordion(children=[widgets.VBox([
    widgets.HTML("<b>üß¨ Biological Parameters:</b>"), w_min_sens, w_min_cons, w_max_xr,
    widgets.HBox([w_prod_min, w_prod_max]), widgets.HBox([w_primer_len, w_max_mm]),
    widgets.HTML("<hr><b>üíª Processing:</b>"), widgets.HBox([w_cpu])
])])
advanced_ui.set_title(0, '‚öôÔ∏è Advanced Configuration'); advanced_ui.selected_index = None

# --- DYNAMIC SEARCH ROWS ---
target_container = widgets.VBox([])
background_container = widgets.VBox([])

def create_row(placeholder, default_count):
    """Creates a row with [Query Text] [Min Size] [Max Count] [Delete Button]"""
    t_query = widgets.Text(placeholder=placeholder, layout=widgets.Layout(width='50%'))
    t_size = widgets.FloatText(value=0.0, description='Min MB:', style={'description_width': 'initial'}, layout=widgets.Layout(width='15%'))
    t_count = widgets.IntText(value=default_count, description='Max:', style={'description_width': 'initial'}, layout=widgets.Layout(width='15%'))
    btn_del = widgets.Button(icon='trash', layout=widgets.Layout(width='40px'), button_style='danger')

    row = widgets.HBox([t_query, t_size, t_count, btn_del])

    # Delete logic
    def delete_self(b):
        row.close()
    btn_del.on_click(delete_self)

    return row

add_target_btn = widgets.Button(description="Add Target", icon="plus", button_style='success')
add_bg_btn = widgets.Button(description="Add Background", icon="plus", button_style='warning')

def on_add_target(b):
    row = create_row("e.g. Salmonella enterica[Org]...", default_count=50)
    target_container.children += (row,)

def on_add_bg(b):
    row = create_row("e.g. Escherichia coli[Org]...", default_count=100)
    background_container.children += (row,)

add_target_btn.on_click(on_add_target)
add_bg_btn.on_click(on_add_bg)

# Initialize with one empty row each
on_add_target(None)
on_add_bg(None)

# Saving
save_drive_btn = widgets.Button(description="üíæ Save to Drive", button_style='success', disabled=True)
download_zip_btn = widgets.Button(description="‚¨áÔ∏è Download ZIP", button_style='info', disabled=True)
run_btn = widgets.Button(description="‚ñ∂ RUN PIPELINE", button_style='danger', layout=widgets.Layout(width='100%', margin='20px 0px'))
log_output = widgets.Output(layout={'border': '1px solid #ddd', 'height': '400px', 'overflow_y': 'scroll', 'font_family': 'monospace'})

# --- LOGIC ---
def run_command_live(command):
    process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True, text=True, bufsize=1)
    while True:
        line = process.stdout.readline()
        if not line and process.poll() is not None: break
        if line: print(line.strip())
    return process.poll()

def extract_row_data(container):
    """Helper to pull (Query, Size, Count) from a list of HBoxes"""
    data = []
    for row in container.children:
        # Structure: [Text, Float, Int, Button]
        query = row.children[0].value.strip()
        size = row.children[1].value
        count = row.children[2].value
        if query:
            data.append([query, size, count])
    return data

def on_run_click(b):
    log_output.clear_output()
    run_btn.disabled = True
    save_drive_btn.disabled = True
    download_zip_btn.disabled = True
    run_btn.description = "‚è≥ Running..."

    with log_output:
        print("‚è≥ Initializing Configuration...")
        email = email_input.value.strip()
        folder = output_input.value.strip()

        # Extract Data using the new helper
        targets_data = extract_row_data(target_container)
        bg_data = extract_row_data(background_container)

        if not email or "@" not in email:
            print("‚ùå Error: Invalid email."); run_btn.disabled = False; run_btn.description = "‚ñ∂ RUN PIPELINE"; return
        if not targets_data:
            print("‚ùå Error: No targets specified."); run_btn.disabled = False; run_btn.description = "‚ñ∂ RUN PIPELINE"; return
        if not bg_data:
            print("‚ùå Error: No background specified."); run_btn.disabled = False; run_btn.description = "‚ñ∂ RUN PIPELINE"; return

        # 1. GENERATE JSON CONFIGS
        # Format: { "t1": [Query, Size, Count], ... }
        t_conf = {f"t{i+1}": item for i, item in enumerate(targets_data)}
        b_conf = {f"b{i+1}": item for i, item in enumerate(bg_data)}

        os.makedirs("config_files", exist_ok=True)
        with open("config_files/t_conf.json", "w") as f: json.dump(t_conf, f, indent=4)
        with open("config_files/b_conf.json", "w") as f: json.dump(b_conf, f, indent=4)

        # 2. GENERATE PARAMS
        params = {
            "min_sensitivity": w_min_sens.value,
            "design_min_conservation": w_min_cons.value,
            "validation_max_cross_reactivity": w_max_xr.value,
            "product_size_min": w_prod_min.value,
            "product_size_max": w_prod_max.value,
            "primer_length": w_primer_len.value,
            "max_mismatch": w_max_mm.value,
            "cpu_cores": w_cpu.value,
            "design_target_sampling_size": 0,
            "design_background_sampling_size": 100,
            "validation_target_sampling_size": 0,
            "validation_background_sampling_size": 200,
            "design_max_candidates": 50,
            "enable_blast": blast_checkbox.value
        }
        with open("config_files/params.json", "w") as f: json.dump(params, f, indent=4)

        # 3. BUILD COMMAND
        cmd = (f"{sys.executable} -u -m rational_design.cli pipeline "
               f"--out '{folder}' "
               f"--email '{email}' "
               f"--target_config 'config_files/t_conf.json' "
               f"--bg_config 'config_files/b_conf.json' "
               f"--params 'config_files/params.json'")

        print(f"üöÄ Launching Pipeline...")
        print(f"   Targets: {len(targets_data)} | Backgrounds: {len(bg_data)}")
        print("-" * 50)

        exit_code = run_command_live(cmd)

        print("-" * 50)
        if exit_code == 0:
            print(f"‚úÖ‚úÖ PIPELINE FINISHED SUCCESS!")
            save_drive_btn.disabled = False
            download_zip_btn.disabled = False
        else:
            print(f"‚ùå PIPELINE FAILED. Check logs above.")

        run_btn.disabled = False
        run_btn.description = "‚ñ∂ RUN PIPELINE"

# Drive & Download Logic
def on_save_drive(b):
    src = output_input.value
    dest = f"/content/drive/MyDrive/Rational_Design/{project_input.value}"
    with log_output:
        print(f"\nüíæ Saving to Drive: {dest}...")
        if not os.path.exists('/content/drive'): drive.mount('/content/drive')
        if os.path.exists(dest): shutil.rmtree(dest)
        shutil.copytree(src, dest)
        print("‚úÖ Saved!")

def on_download_zip(b):
    src = output_input.value
    with log_output:
        print(f"\nüì¶ Zipping {src}...")
        shutil.make_archive(src, 'zip', src)
        files.download(f"{src}.zip")

save_drive_btn.on_click(on_save_drive)
download_zip_btn.on_click(on_download_zip)
run_btn.on_click(on_run_click)

ui = widgets.VBox([
    header, widgets.HBox([email_input, project_input, output_input]),
    widgets.HTML("<hr>"), advanced_ui,
    widgets.HTML("<h4>üéØ Targets (Query | Min Size MB | Max Count)</h4>"), target_container, add_target_btn,
    widgets.HTML("<h4>üå´Ô∏è Background (Query | Min Size MB | Max Count)</h4>"), background_container, add_bg_btn,
    widgets.HTML("<hr>"), run_btn,
    widgets.HBox([save_drive_btn, download_zip_btn]),
    log_output
])
display(ui)

In [None]:
# @title üìÇ Option B: Local Files & Drive (Optimized)
# @markdown Upload your own .fasta files directly or pull them from Google Drive.

import ipywidgets as widgets
from IPython.display import display
import os
import shutil
import json
import sys
import subprocess
from google.colab import files, drive

# --- GUI SETUP ---
header = widgets.HTML("<h2>üìÇ Local & Drive Pipeline Manager</h2>")

# General Settings
project_input = widgets.Text(description="Project Name:", value="Local_Run_01")
output_input = widgets.Text(description="Output Folder:", value="results_local")

# Advanced Settings
style = {'description_width': 'initial'}; layout_half = widgets.Layout(width='48%')
blast_checkbox = widgets.Checkbox(value=True, description='Enable BLAST Annotation', indent=False)

w_min_sens = widgets.FloatSlider(value=95.0, min=50.0, max=100.0, step=0.1, description='Min Sensitivity', style=style)
w_min_cons = widgets.FloatSlider(value=0.90, min=0.50, max=1.00, step=0.01, description='Min Conservation', style=style)
w_max_xr = widgets.FloatSlider(value=5.0, min=0.0, max=100.0, step=0.1, description='Max Cross-Reactivity', style=style)
w_cpu = widgets.IntText(value=0, description='CPU (0=Auto)', style=style, layout=layout_half)

advanced_ui = widgets.Accordion(children=[widgets.VBox([
    widgets.HTML("<b>üß¨ Biological Parameters:</b>"), w_min_sens, w_min_cons, w_max_xr,
    widgets.HTML("<hr><b>üíª System:</b>"), w_cpu
])])
advanced_ui.set_title(0, '‚öôÔ∏è Advanced Configuration'); advanced_ui.selected_index = None

# Tabs for Input Source
tab_nest = widgets.Tab()

# Tab 1: Upload
upload_target = widgets.FileUpload(accept='.fasta,.fa', multiple=True, description='Select Targets', button_style='success')
upload_bg = widgets.FileUpload(accept='.fasta,.fa', multiple=True, description='Select Background', button_style='warning')
tab1 = widgets.VBox([
    widgets.HTML("<b>Method 1: Direct Upload</b> (Good for small batches)"),
    widgets.HBox([upload_target, upload_bg])
])

# Tab 2: Drive
btn_mount = widgets.Button(description="Mount Drive", icon="google", button_style='info')
path_target = widgets.Text(placeholder="/content/drive/MyDrive/Target_Folder", layout=widgets.Layout(width='80%'))
path_bg = widgets.Text(placeholder="/content/drive/MyDrive/Bg_Folder", layout=widgets.Layout(width='80%'))
def on_mount(b): drive.mount('/content/drive'); btn_mount.description="Drive Mounted"; btn_mount.disabled=True
btn_mount.on_click(on_mount)
tab2 = widgets.VBox([
    widgets.HTML("<b>Method 2: Google Drive</b> (Best for large datasets)"),
    btn_mount,
    widgets.HTML("<i>Enter the full path to your folders on Drive:</i>"),
    path_target, path_bg
])

tab_nest.children = [tab1, tab2]
tab_nest.set_title(0, 'üíª Upload'); tab_nest.set_title(1, '‚òÅÔ∏è Google Drive')

# Run Controls
run_btn = widgets.Button(description="‚ñ∂ RUN PIPELINE", button_style='danger', layout=widgets.Layout(width='100%', margin='20px 0px'))
save_drive_btn = widgets.Button(description="üíæ Save to Drive", button_style='success', disabled=True)
download_zip_btn = widgets.Button(description="‚¨áÔ∏è Download ZIP", button_style='info', disabled=True)
log_output = widgets.Output(layout={'border': '1px solid #ddd', 'height': '400px', 'overflow_y': 'scroll', 'font_family': 'monospace'})

# --- LOGIC ---
def run_command_live(command):
    process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True, text=True, bufsize=1)
    while True:
        line = process.stdout.readline()
        if not line and process.poll() is not None: break
        if line: print(line.strip())
    return process.poll()

def on_run(b):
    log_output.clear_output(); run_btn.disabled = True
    temp_t = "temp_local_target"; temp_b = "temp_local_bg"
    folder_out = output_input.value

    # Reset Temps
    if os.path.exists(temp_t): shutil.rmtree(temp_t)
    if os.path.exists(temp_b): shutil.rmtree(temp_b)
    os.makedirs(temp_t); os.makedirs(temp_b)

    with log_output:
        try:
            print("‚è≥ Staging Data...")
            # 1. PREPARE DATA
            if tab_nest.selected_index == 0: # UPLOAD
                print("üíª Mode: Direct Upload")
                if not upload_target.value: raise Exception("Missing Target files.")
                if not upload_bg.value: raise Exception("Missing Background files.")

                # Save uploaded files
                for f in upload_target.value:
                    with open(os.path.join(temp_t, f.name), "wb") as w: w.write(f.content)
                for f in upload_bg.value:
                    with open(os.path.join(temp_b, f.name), "wb") as w: w.write(f.content)
                print(f"   ‚úÖ Saved {len(upload_target.value)} Targets, {len(upload_bg.value)} Backgrounds.")

            else: # DRIVE
                print("‚òÅÔ∏è Mode: Google Drive Copy")
                src_t = path_target.value.strip(); src_b = path_bg.value.strip()
                if not os.path.exists(src_t) or not os.path.exists(src_b): raise Exception("Invalid Drive Paths. Did you mount Drive?")

                # Copy from Drive
                t_files = [f for f in os.listdir(src_t) if f.endswith(('.fa', '.fasta'))]
                b_files = [f for f in os.listdir(src_b) if f.endswith(('.fa', '.fasta'))]

                for f in t_files: shutil.copy2(os.path.join(src_t, f), temp_t)
                for f in b_files: shutil.copy2(os.path.join(src_b, f), temp_b)
                print(f"   ‚úÖ Copied {len(t_files)} Targets, {len(b_files)} Backgrounds.")

            # 2. GENERATE PARAMS
            os.makedirs("config_files", exist_ok=True)
            params = {
                "min_sensitivity": w_min_sens.value,
                "design_min_conservation": w_min_cons.value,
                "validation_max_cross_reactivity": w_max_xr.value,
                "product_size_min": 100, "product_size_max": 350,
                "primer_length": 20, "max_mismatch": 2, "cpu_cores": w_cpu.value,
                "design_target_sampling_size": 0, "design_background_sampling_size": 100,
                "validation_target_sampling_size": 0, "validation_background_sampling_size": 200,
                "design_max_candidates": 50, "enable_blast": blast_checkbox.value
            }
            with open("config_files/local_params.json", "w") as f: json.dump(params, f, indent=4)

            # 3. RUN CLI
            cmd = (f"{sys.executable} -u -m rational_design.cli pipeline "
                   f"--out '{folder_out}' "
                   f"--local_target '{temp_t}' "
                   f"--local_bg '{temp_b}' "
                   f"--params 'config_files/local_params.json'")

            print("-" * 50)
            print("üöÄ Launching Pipeline...")
            exit_code = run_command_live(cmd)
            print("-" * 50)

            if exit_code == 0:
                print("‚úÖ‚úÖ PIPELINE FINISHED SUCCESS!")
                save_drive_btn.disabled = False
                download_zip_btn.disabled = False
            else:
                print("‚ùå PIPELINE FAILED. Check logs above.")

        except Exception as e:
            print(f"‚ùå Error: {e}")

        run_btn.disabled = False

# Save/Download Logic
def on_save_drive(b):
    src = output_input.value
    dest = f"/content/drive/MyDrive/Rational_Design/{project_input.value}"
    with log_output:
        print(f"\nüíæ Saving to Drive: {dest}...")
        if not os.path.exists('/content/drive'): drive.mount('/content/drive')
        if os.path.exists(dest): shutil.rmtree(dest)
        shutil.copytree(src, dest)
        print("‚úÖ Saved!")

def on_download_zip(b):
    src = output_input.value
    with log_output:
        print(f"\nüì¶ Zipping {src}...")
        shutil.make_archive(src, 'zip', src)
        files.download(f"{src}.zip")

run_btn.on_click(on_run)
save_drive_btn.on_click(on_save_drive)
download_zip_btn.on_click(on_download_zip)

ui = widgets.VBox([
    header, widgets.HBox([project_input, output_input]), blast_checkbox, advanced_ui,
    widgets.HTML("<hr>"), tab_nest,
    widgets.HTML("<hr>"), run_btn,
    widgets.HBox([save_drive_btn, download_zip_btn]),
    log_output
])
display(ui)