In [2]:
import sk8s
import os
import re

In [3]:
build_image = False
populate_reference = False

## Set up the environment

### Construct the docker image for our NGS jobs

In [4]:
if build_image:
    image = sk8s.docker_build(image_name="ngs-1",
                              conda=["bwa", "gatk4", "samtools", "google-cloud-sdk", "bedtools"],
                              channels=["conda-forge", "bioconda"],
                              pip=["numpy", "scipy", "matplotlib", "pandas"],
                              additional_config="RUN apt-get install -y gcc python3-dev python3-setuptools && pip3 uninstall crcmod && pip3 install --no-cache-dir -U crcmod")
else:    
    image = "gcr.io/jared-genome-analysis/ngs-1"
    #image = "ngs-1"

image

'gcr.io/jared-genome-analysis/ngs-1'

### Create a volume to hold the reference genome

In [5]:
# Create a volume to store the reference
if populate_reference:
    reference_volume = sk8s.create_volume("100Gi", name="reference-volume")
else:
    reference_volume = "reference-volume"

# Note the default volumes that jobs should mount
default_volumes={reference_volume: f"/mnt/{reference_volume}"}
#default_volumes["gcs-creds"] = f"/root/.config/gcloud"

def populate_reference_volume(volume):
    import subprocess

    def run_silent(cmd):
        return subprocess.run(cmd, check=True, shell=True,
                              stdout=subprocess.PIPE,
                              stderr=subprocess.PIPE)

    #result = run_silent(f"""mkdir -p /mnt/{volume}/hg38/ && gsutil -m rsync -r gs://genomics-public-data/references/hg38/v0/ /mnt/{volume}/hg38/""")
    result = run_silent(f"""mkdir -p /mnt/{volume}/hg38/""")
    result = run_silent(f"""gsutil -m rsync -r  gs://jared-genome/ref/GRCh38/ /mnt/{volume}/GRCh38/""")
    return "OK"

if populate_reference:
    sk8s.run(populate_reference_volume, reference_volume,
             volumes=default_volumes, image=image, asynchro=False)

reference_volume

'reference-volume'

### Define a very helpful utility function

In [6]:
def run(cmd):
    import subprocess

    result = subprocess.run(cmd,
                            check=False,
                            shell=True,
                            stdout=subprocess.PIPE,
                            stderr=subprocess.STDOUT,
                            encoding="utf-8")
    if result.returncode == 0:
        return result.stdout
    else:
        print("Error Running Command.")
        print("Command:", cmd)
        print("Log:")
        print(result.stdout)
        raise subprocess.CalledProcessError(result.returncode, cmd=cmd, output=result.stdout)


## Prepare input data (not really part of the pipeline)

In [7]:
def generate_fq_chunk(bam, region, output_fq):
    import os

    token = run("gcloud auth application-default print-access-token").strip()
    os.environ["GCS_OAUTH_TOKEN"] = token

    run(f"samtools view -b {bam} {region} > ./in.bam")
    run(f"samtools index in.bam")
    run(f"samtools collate in.bam out")
    run(f"samtools fastq -o out.fq out.bam")
    run(f"gsutil cp out.fq {output_fq}")

    return output_fq


def concatenate(input_files, output_file):
    import os
    run("mkdir ./data/")
    run(f"gsutil -m cp {' '.join(input_files)} ./data/")
    local_input_files = ["./data/" + os.path.basename(f) for f in input_files]
    run(f"cat {' '.join(local_input_files)} > out")
    run(f"gsutil cp out {output_file}")
    return output_file


def generate_fq(bam, region_size, output_file):
    import subprocess
    cmd = f'bedtools makewindows -g /mnt/{reference_volume}/GRCh38/Homo_sapiens_assembly38.fasta.genome -w {region_size} | grep -vP "_|HLA"'

    regions = ["{}:{}-{}".format(*(r.split("\t")))
               for r in
               run(cmd).strip().split("\n")]

    jobs = [sk8s.run(generate_fq_chunk, bam, region, f"gs://jared-genome/sk8s/pipeline_test_1/fastq/chunk_{idx}.fq",
                     image=image, volumes=default_volumes, asynchro=True)
            for idx, region in enumerate(regions[0:3])]

    fastqs = list(map(sk8s.wait, jobs))

    return sk8s.run(concatenate, fastqs, output_file,
                    image=image, volumes=default_volumes,
                    asynchro=False)


results = sk8s.run(generate_fq, "gs://jared-genome/jared.bam", int(10e6), "gs://jared-genome/jared_interleaved.fq",
                   image=image,
                   volumes=default_volumes,
                   asynchro=False)
results


## Define pipeline tasks

In [None]:
def split_fastq(fastq, n_chunks, output_prefix):
    import os

    run(f"gsutil cp {fastq} ./input.fq")

    os.mkdir("./output/")
    output_files = [f"./output/chunk_{i}.fq" for i in range(n_chunks)]
    output_fds = [open(f, "w") for f in output_files]
    current_output = 0
    
    with open("input.fq") as fp:
        while True:
            name = fp.readline()
            seq = fp.readline()
            strand = fp.readline()
            qual = fp.readline()
            if name == "": break
            print(name, seq, strand, qual, sep="\n", end="", file=output_fds[current_output])
            current_output = (current_output + 1) % n_chunks
    
    [f.close() for f in output_fds]

    fastqs = []
    for idx, file in enumerate(output_files):
        fastqs.append(f"{output_prefix}{idx}.fq")
        run(f"gsutil cp {file} {output_prefix}{idx}.fq")

    return fastqs

In [None]:
def align(fq, output_bam, reference, read_group):
    run(f"gsutil -m cp {fq} ./fq.fq")
    run(f'bwa mem -p -R "{read_group}" {reference} fq.fq | samtools sort > out.bam')
    run(f"samtools index out.bam")
    run(f"gsutil -m cp out.bam {output_bam}")
    run(f"gsutil -m cp out.bam.bai {output_bam}.bai")
    return output_bam

In [None]:
def merge_bams(bams, output_bam):
    import os
    import dill as pickle

    os.mkdir("./bams/")

    bams_str = " ".join(bams)
    bais_str = " ".join([f"{b}.bai" for b in bams])
    run(f"gsutil -m cp {bais_str} ./bams/")
    run(f"gsutil -m cp {bams_str} ./bams/")

    run(f"samtools merge ./out.bam ./bams/*.bam")
    run(f"samtools index out.bam")
    run(f"gsutil cp out.bam {output_bam}")
    run(f"gsutil cp out.bam.bai {output_bam}.bai")
    return output_bam

In [None]:
def call_snps(reference, bam, roi, output_vcf):
    import subprocess

    #def run_and_log(cmd):
    #    proc = subprocess.run(cmd, check=True, shell=True, encoding="utf-8",
    #                          stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    #    return dict(cmd=cmd,
    #                returncode=proc.returncode,
    #                stdout=proc.stdout,
    #                stderr=proc.stderr)

    region = "%s:%d-%d" % roi

    run(f"gsutil cp {bam} ./in.bam")
    run(f"gsutil cp {bam}.bai ./in.bam.bai")
    run(f"gatk HaplotypeCaller -R {reference} -I in.bam -O out.vcf -L {region}")
    run(f"gsutil cp out.vcf {output_vcf}")

    return output_vcf

## Execute pipeline

### Define the inputs and outputs

In [None]:
reference=f"/mnt/{reference_volume}/GRCh38/Homo_sapiens_assembly38.fasta"
sample_name="jared"
read_group=f'@RG\\tID:{sample_name}\\tSM:{sample_name}\\tPL:ILLUMINA'
#ROI = ("chr1", 14674463, 14697776)
#chr1:925,893-935,453
ROI = ("chr1", 925893, 935453)
#fq = "gs://jared-genome/jared_interleaved.fq"
fq = "gs://jared-genome/tiny_interleaved.fq"
output_prefix = "gs://jared-genome/sk8s/pipeline_test_1/"
output_bam = f"{output_prefix}bams/{sample_name}.bam"
output_vcf = f"{output_prefix}snps/{sample_name}.vcf"

### Divide the unaligned reads into chunks

In [None]:
fastqs = sk8s.run(split_fastq, fq, 3, f"{output_prefix}fastq_chunks/chunk_",
                  image=image, volumes=default_volumes, asynchro=False)
fastqs

### Align each chunk

In [None]:
bams = sk8s.map(lambda fq: align(fq, output_prefix + "bam_chunks/" + re.sub("\.fq$", ".bam", os.path.basename(fq)), reference, read_group), fastqs,
                image=image, volumes=default_volumes,
                requests={"memory": "8Gi", "ephemeral-storage": "10Gi", "cpu": "1"},
                limits={"memory": "8Gi", "ephemeral-storage": "10Gi"},
                asynchro=False)

bams

### Merge the aligned chunks

In [None]:
bam = sk8s.run(merge_bams, bams, output_bam,
               image=image, volumes=default_volumes,
               requests={"memory": "8Gi", "ephemeral-storage": "10Gi", "cpu": "1"},
               limits={"memory": "8Gi", "ephemeral-storage": "10Gi"},
               asynchro=False)
bam


### Call SNPs

In [None]:
snp_result = sk8s.run(call_snps, reference, bam, ROI, output_vcf,
                       image=image, asynchro=False, volumes=default_volumes,
                       requests={"memory": "8Gi", "ephemeral-storage": "10Gi", "cpu": "2"},
                       limits={"memory": "8Gi", "ephemeral-storage": "10Gi"})
snp_result

### Have a look 🧬👀

In [None]:
import subprocess
print(subprocess.run(f"gsutil cat {snp_result} | grep -v '^#' | head -n10",
               shell=True, stdout=subprocess.PIPE, encoding="utf-8").stdout)