In [1]:
import os
import typing

# Dask Configuration

In [2]:
from dask_jobqueue import PBSCluster
from pathlib import Path

# Define the working directory path
working_directory = str(Path.cwd())

# Launch a scheduler and workers on HPC via PBS
cluster = PBSCluster(
     cores=4,
     memory="8GB",
     processes=1,
     queue="tamirQ",
     walltime="05:30:00",
     scheduler_options={"dashboard_address": ":12435"},
     # Additional custom options
     log_directory="dask-logs",
     #worker_extra_args=["--lifetime", "25m", "--lifetime-stagger", "4m"],  # for walltime="00:30:00"
     job_script_prologue=[f"cd {working_directory}"]
)

Perhaps you already have a cluster running?
Hosting the HTTP server on port 43470 instead


In [3]:
cluster

0,1
Dashboard: http://132.66.112.146:43470/status,Workers: 0
Total threads: 0,Total memory: 0 B

0,1
Comm: tcp://132.66.112.146:35580,Workers: 0
Dashboard: http://132.66.112.146:43470/status,Total threads: 0
Started: Just now,Total memory: 0 B


In [4]:
cluster.adapt(minimum=30, maximum=60)
print(cluster.job_script())

#!/usr/bin/env bash

#PBS -N dask-worker
#PBS -q tamirQ
#PBS -l select=1:ncpus=4:mem=7630MB
#PBS -l walltime=02:30:00
#PBS -e dask-logs/
#PBS -o dask-logs/
cd /tamir2/moranb/microbiome/Igem_TAU_2021
/tamir2/moranb/microbiome/Igem_TAU_2021/venv/bin/python -m distributed.cli.dask_worker tcp://132.66.112.146:35580 --nthreads 4 --memory-limit 7.45GiB --name dummy-name --nanny --death-timeout 60



In [5]:
from dask.distributed import Client, progress, wait, get_client, get_worker
client = Client(cluster)

In [6]:
client

0,1
Connection method: Cluster object,Cluster type: dask_jobqueue.PBSCluster
Dashboard: http://132.66.112.146:43470/status,

0,1
Dashboard: http://132.66.112.146:43470/status,Workers: 0
Total threads: 0,Total memory: 0 B

0,1
Comm: tcp://132.66.112.146:35580,Workers: 0
Dashboard: http://132.66.112.146:43470/status,Total threads: 0
Started: Just now,Total memory: 0 B


In [7]:
import dask.bag as db
import dask.dataframe as dd
from dask import delayed, compute, persist
import json
from collections import defaultdict
import matplotlib
import numpy as np
import pandas as pd
import re




# Analysis for homo sapiens genome

In [8]:
from Bio import SeqIO
from analysis.input_testing_data.generate_input_testing_data_for_modules import generate_testing_data
from analysis.input_testing_data.generate_input_testing_data_for_modules import generate_testing_data_for_ecoli_and_bacillus

from modules.main import run_modules

In [9]:
output_path = "/tamir2/moranb/microbiome/Igem_TAU_2021/analysis/results/homo_sapiens"

In [10]:
def get_orf_summary(summary: typing.Dict[str, typing.Any], evaluation_method: str = "average_distance_score") -> typing.Dict[str, typing.Any]:
    if len(summary["evaluation"]) == 1:
        return summary["orf"]
    final_evaluation = summary["final_evaluation"]
    for i, evaluation_summary in enumerate(summary["evaluation"]):
        if evaluation_summary[evaluation_method] == final_evaluation[evaluation_method]:
            return summary["orf"][i]

In [11]:
def get_total_run_time(summary: typing.Dict[str, typing.Any]) -> float:
     if len(summary["evaluation"]) == 1:
        return summary["orf"]["run_time"]
     run_time = 0
     for orf_summary in summary["orf"]:
         run_time += orf_summary["run_time"]
     return run_time

In [12]:
def convert_to_json_result(x):
    gene_name = x[0]
    result = run_modules(x[1], should_run_output_module=False)
    orf = get_orf_summary(result)
    if "orf" not in result or orf is None: # probably thrown in case of an error
        return {"error": result}
    run_time = get_total_run_time(result)
    iterations_count = orf.get("iterations_count", 1)
    return {
        "initial_optimization_score": orf.get("initial_sequence_optimization_score"),
        "final_optimization_score": orf.get("final_sequence_optimization_score"),
        "average_distance_score": result["final_evaluation"].get("average_distance_score"),
        "average_distance_non_normalized_score": result["final_evaluation"].get("average_distance_non_normalized_score"),
        "weakest_link_score": result["final_evaluation"].get("weakest_link_score"),
        "ratio_score": result["final_evaluation"].get("ratio_score"),
        "gene_name": gene_name,
        "orf_optimization_cub_index": result["module_input"].get("orf_optimization_cub_index"),
        "evaluation_score": result["module_input"].get("evaluation_score"),
        "run_time": run_time,
        "iterations_count": iterations_count,
    }

In [13]:
def extract_ncbi_sequences_for_analysis(fasta_file_path: str) -> None:
    with open(fasta_file_path, "r") as fasta_handle:
        genome_dict = SeqIO.to_dict(SeqIO.parse(fasta_handle, "fasta"))

    print(f"Total number of sequences in file: {len(genome_dict)}")
    missing_genes = []
    gene_mapping = defaultdict(list)
    description_to_gene_mapping = {}
    genes_with_wrong_lenght = []
    genes_with_invalid_chars = []

    for record, value in genome_dict.items():
        # TODO - understand if this filtering is needed
        if "NC" not in record: # or "XP" in record:
            continue
        parameters = re.findall("gene=.*]", value.description)
        if not parameters:
            missing_genes.append(record)
            continue
        gene_parameter = parameters[0].split("]")[0]
        gene_name = gene_parameter.strip("gene=")

        if len(value.seq) % 3 != 0:
            genes_with_wrong_lenght.append(record)
            continue

        if any(x not in ["A", "C", "T", "G"] for x in str(value.seq)):
            print(F"Skip sequence {record} for {gene_name} because it contains an invalid character for CDS")
            genes_with_invalid_chars.append(record)
            continue

        gene_mapping[gene_name].append(record)
        description_to_gene_mapping[record] = gene_name

    print(f"Skipped {len(genes_with_wrong_lenght)} genes with length not divisible by 3")
    with open("gene_mapping.json", "w") as genes_file:
        json.dump(gene_mapping, genes_file)
    with open("description_to_gene_mapping.json", "w") as description_to_genes_file:
        json.dump(description_to_gene_mapping, description_to_genes_file)
    with open("missing_genes.txt", "w") as missing_genes_file:
        for gene in missing_genes:
            missing_genes_file.write(gene)
    
    gene_to_longest_sequence = {
        key: max(value, key=lambda x: len(genome_dict[x].seq)) for key, value in gene_mapping.items()
    }
    with open("gene_to_longest_sequence.json", "w") as genes_file:
        json.dump(gene_to_longest_sequence, genes_file)
    print(f"Total number of selected genes: {len(gene_to_longest_sequence)}")

In [14]:
homo_sapiens_fasta = "/tamir2/moranb/microbiome/homo_sapiens_genome/cds_from_genomic.fna"
selected_homo_sapiens_genes = "gene_to_longest_sequence.json"

In [15]:
extract_ncbi_sequences_for_analysis(homo_sapiens_fasta)

Total number of sequences in file: 145289
Skipped 391 genes with length not divisible by 3
Total number of selected genes: 20061


In [16]:
with open(selected_homo_sapiens_genes, "r") as selected_genes_file:
    selected_genes = json.load(selected_genes_file)

In [17]:
with open(homo_sapiens_fasta, "r") as fasta_handle:
    genome_dict = SeqIO.to_dict(SeqIO.parse(fasta_handle, "fasta"))

## Bacillus and E.coli

In [None]:
optimization_cub_index = "CAI"
is_ecoli_optimized = False
batch_size = 500

for optimization_method in [
    # "single_codon_diff", 
    # "single_codon_ratio", 
    # "zscore_bulk_aa_diff",
    # "zscore_bulk_aa_ratio",
    "zscore_single_aa_diff",
    # "zscore_single_aa_ratio",
]:
    configuration = f"e_coli_optimized_{is_ecoli_optimized}_bacillus_optimized_{not is_ecoli_optimized}"
    configuration_output_path = os.path.join(output_path, configuration)
    
    inputs = [(gene_name, 
               generate_testing_data_for_ecoli_and_bacillus(
                   orf_optimization_method=optimization_method,
                   orf_optimization_cub_index=optimization_cub_index,
                   is_ecoli_optimized=is_ecoli_optimized,
                   tuning_param=0.5, 
                   sequence=str(genome_dict[gene_name].seq),
                   output_path=os.path.join(configuration_output_path,gene_name),
                   evaluation_score="average_distance",
                   initiation_optimization_method="original",
               )) for gene_name in selected_genes.values()]
    for batch_index, batch_start_index in enumerate(range(0, len(inputs), batch_size)):
        if optimization_method == "zscore_single_aa_diff" and batch_index < 4:
            continue
        inputs_batch = inputs[batch_start_index: batch_start_index+batch_size]
        inputs_db = db.from_sequence(inputs_batch)
        results_db = inputs_db.map(convert_to_json_result)
        batch_file_path = os.path.join(configuration_output_path,optimization_method, f"batch-{batch_index}")
        batch_file_path = batch_file_path+"-debug"
        results_db.map(json.dumps).to_textfiles(os.path.join(batch_file_path, '*.json'))
        os.mknod(os.path.join(batch_file_path, "done"))

Resource specification for PBS not set, initializing it to select=1:ncpus=4:mem=7630MB
Resource specification for PBS not set, initializing it to select=1:ncpus=4:mem=7630MB
Resource specification for PBS not set, initializing it to select=1:ncpus=4:mem=7630MB
Resource specification for PBS not set, initializing it to select=1:ncpus=4:mem=7630MB
Resource specification for PBS not set, initializing it to select=1:ncpus=4:mem=7630MB
Resource specification for PBS not set, initializing it to select=1:ncpus=4:mem=7630MB
Resource specification for PBS not set, initializing it to select=1:ncpus=4:mem=7630MB
Resource specification for PBS not set, initializing it to select=1:ncpus=4:mem=7630MB
Resource specification for PBS not set, initializing it to select=1:ncpus=4:mem=7630MB
Resource specification for PBS not set, initializing it to select=1:ncpus=4:mem=7630MB
Resource specification for PBS not set, initializing it to select=1:ncpus=4:mem=7630MB
Resource specification for PBS not set, ini