# Define Helper Functions

In [369]:
# from SCRIdb
def get_s3_objects(bucket, key, pattern, full_uri=False):
    
    s3r = boto3.resource("s3")
    bucket_s3 = s3r.Bucket(bucket)
    objects = []
    for obj in bucket_s3.objects.filter(Prefix=key):
        hit = pattern.search(obj.key)
        if hit:
            objects.append(obj.key)
    if full_uri:
        objects = [f"s3://{bucket}/{o}" for o in objects]
    return objects

In [299]:
def get_reference(species):
    if "Human" in species:
        return "https://cf.10xgenomics.com/supp/cell-arc/refdata-cellranger-arc-GRCh38-2020-A.tar.gz"
    
    elif "Mouse" in species:
        return "https://cf.10xgenomics.com/supp/cell-arc/refdata-cellranger-arc-mm10-2020-A-2.0.0.tar.gz"
    else:
        raise ValueError("Unknown Species")

In [312]:
def execute_query(query, user, password):
    with connect(
        host="peer-lab-db.cggxmlwgzzpw.us-east-1.rds.amazonaws.com",
        database="peer_lab_db",
        user=user,
        password=password,
    ) as connection:
        with connection.cursor(buffered=True) as cursor:
            cursor.execute(query)
            result = cursor.fetchall()
    return result

In [323]:
# Get species from database for given sample
from mysql.connector import connect, Error

def get_species(sample_id, user, password):
    try:
        table_sample_data = "peer_lab_db.sample_data"
        table_species = "peer_lab_db.species"
        table_genome_idx = "peer_lab_db.genome_index"
        query = f"""
        SELECT {table_species}.Species
        FROM {table_species}
        LEFT JOIN {table_genome_idx}
        ON {table_species}.id = {table_genome_idx}.species_id
        LEFT JOIN {table_sample_data}
        ON {table_genome_idx}.id = {table_sample_data}.genomeIndex_id
        WHERE {table_sample_data}.id = {sample_id}
        """
        result = execute_query(query, user, password)
        return result
    except Error as e:
        print(f"Error: {e}")

In [319]:
# Get species from database for given sample
from mysql.connector import connect, Error

def get_project_id(sample_id, user, password):
    try:
        table_sample_data = "peer_lab_db.sample_data"
        table_project_data = "peer_lab_db.project_data"
        query = f"""
        SELECT {table_project_data}.projectName
        FROM {table_project_data}
        LEFT JOIN {table_sample_data}
        ON {table_project_data}.id = {table_sample_data}.projectData_id
        WHERE {table_sample_data}.id = {sample_id}
        """
        result = execute_query(query, user, password)
        return result
    except Error as e:
        print(f"Error: {e}")

In [417]:
def run(
    exec_path: str,
    secrets: str,
    inputs: str,
    labels: str,
    options: str,
):
    # execute the pipeline command
    cmd = f"{exec_path} -k {secrets} -i {inputs} -l {labels} -o {options}"
    var = subprocess.run(shlex.split(cmd), universal_newlines=True, capture_output=True)
    out = var.__dict__
    return out

# Process Samples

## Setup

In [407]:
import re, subprocess, boto3, json, shlex
import pandas as pd
from s3path import S3Path
from tqdm import tqdm

In [413]:
# Set credentials based on SCRIdb CLI config file
db_credentials_path = f"{Path.home()}/.config.json"

with open(db_credentials_path) as f:
    creds = json.load(f)

In [414]:
# Location of this workflows config files
workflow_dir = f"{Path.home()}/scing/bin/cellranger-arc-2.0.0"
config_dir = f"{workflow_dir}/configs"

path_to_exec = f"{workflow_dir}/submit.sh"
path_to_options = f"{workflow_dir}/CellRangerArc.options.aws.json"
path_to_cromwell_secrets = f"{Path.home()}/.cromwell/cromwell-secrets.json"

# Location of docker files
common_docker_registry = "quay.io/hisplan"

# Workflow to run; .wdl filename
prefix = "CellRangerArc"
output_dirname = "cr-arc-results"

# Reads needed for each library relevant to this workflow
GEX_reads = ["I1", "I2", "R1", "R2"]
ATAC_reads = ["I1", "R1", "R2", "R3"]

# If need to add comment, put here
comment = ""

In [415]:
samples = '''
+-----------+--------------------------------+---------+------------+-------------------------------------------------------------------------------------------+
| SAMPLE_ID | SAMPLE_NAME                    | REQ_ID  | gIndex     | S3                                                                                        |
+-----------+--------------------------------+---------+------------+-------------------------------------------------------------------------------------------+
|      2748 | D11_MP150Cre_5_multiome_ATAC   | SZ-896  | NULL       | s3://dp-lab-data/collaborators/sawyers/OrgP53RbMultiomics/D11_MP150Cre_5_multiome_ATAC/   |
|      2556 | D11_MP150Cre_5_multiome        | SZ-896  | mm10-3.0.0 | s3://dp-lab-data/collaborators/sawyers/OrgP53RbMultiomics/D11_MP150Cre_5_multiome/        |
|      2749 | D14_MP150CRE_6_multiome_ATAC   | SZ-897  | NULL       | s3://dp-lab-data/collaborators/sawyers/OrgP53RbMultiomics/D14_MP150CRE_6_multiome_ATAC/   |
|      2564 | D14_MP150CRE_6_multiome        | SZ-897  | mm10-3.0.0 | s3://dp-lab-data/collaborators/sawyers/OrgP53RbMultiomics/D14_MP150CRE_6_multiome/        |
|      2750 | D18_MP150CRE_7_multiome_ATAC   | SZ-975  | NULL       | s3://dp-lab-data/collaborators/sawyers/OrgP53RbMultiomics/D18_MP150CRE_7_multiome_ATAC/   |
|      2570 | D18_MP150CRE_7_multiome        | SZ-975  | mm10-3.0.0 | s3://dp-lab-data/collaborators/sawyers/OrgP53RbMultiomics/D18_MP150CRE_7_multiome/        |
|      2751 | D21_MP150CRE_8_multiome_ATAC   | SZ-976  | NULL       | s3://dp-lab-data/collaborators/sawyers/OrgP53RbMultiomics/D21_MP150CRE_8_multiome_ATAC/   |
|      2576 | D21_MP150CRE_8_multiome        | SZ-976  | mm10-3.0.0 | s3://dp-lab-data/collaborators/sawyers/OrgP53RbMultiomics/D21_MP150CRE_8_multiome/        |
+-----------+--------------------------------+---------+------------+-------------------------------------------------------------------------------------------+
'''

## Execution

In [416]:
# Convert formatted string to DataFrame
table_fmt = re.compile("(?!^$)(^(?!(\+-*)+\+$))")
rows = list(filter(table_fmt.match, samples.split('\n')))
data = [[item.strip() for item in row.strip("|").split("|")] for row in rows]
df = pd.DataFrame(
    data = data[1:],
    columns = data[0]
)
df

Unnamed: 0,SAMPLE_ID,SAMPLE_NAME,REQ_ID,gIndex,S3
0,2748,D11_MP150Cre_5_multiome_ATAC,SZ-896,,s3://dp-lab-data/collaborators/sawyers/OrgP53R...
1,2556,D11_MP150Cre_5_multiome,SZ-896,mm10-3.0.0,s3://dp-lab-data/collaborators/sawyers/OrgP53R...
2,2749,D14_MP150CRE_6_multiome_ATAC,SZ-897,,s3://dp-lab-data/collaborators/sawyers/OrgP53R...
3,2564,D14_MP150CRE_6_multiome,SZ-897,mm10-3.0.0,s3://dp-lab-data/collaborators/sawyers/OrgP53R...
4,2750,D18_MP150CRE_7_multiome_ATAC,SZ-975,,s3://dp-lab-data/collaborators/sawyers/OrgP53R...
5,2570,D18_MP150CRE_7_multiome,SZ-975,mm10-3.0.0,s3://dp-lab-data/collaborators/sawyers/OrgP53R...
6,2751,D21_MP150CRE_8_multiome_ATAC,SZ-976,,s3://dp-lab-data/collaborators/sawyers/OrgP53R...
7,2576,D21_MP150CRE_8_multiome,SZ-976,mm10-3.0.0,s3://dp-lab-data/collaborators/sawyers/OrgP53R...


In [418]:
# Each Req ID is a collection of libraries for one project

stdouts = [] # to store all outputs

for name, g in tqdm([('SZ-896', df.groupby('REQ_ID').get_group('SZ-896'))]):
    
    is_atac = g["S3"].str.contains("ATAC") # 2 rows per group, each is either ATAC or GEX sample
    gex_path = S3Path.from_uri(g.loc[~is_atac, "S3"].iloc[0])
    atac_path = S3Path.from_uri(g.loc[is_atac, "S3"].iloc[0])
    
    # Add inputs to dictionary
    inputs = dict()

    # Set Run ID to GEX sample name
    sample_name = g.loc[~is_atac, "SAMPLE_NAME"].iloc[0] # Also used in labels below
    inputs[f"{prefix}.runID"] = sample_name
    inputs[f"{prefix}.dockerRegistry"] = common_docker_registry
    
    # Get species from database to decide reference
    sample_id = g["SAMPLE_ID"].iloc[0]
    species = get_species(sample_id, creds["user"], creds["password"])
    ref = get_reference(species) # TODO: 'get_reference' should be replaced with a more comprehensive mapping
    inputs[f"{prefix}.reference"] = ref

    # For each library, assemble inputs
    for library, reads, s3_path in zip(
        ['gex', 'atac'], 
        [GEX_reads, ATAC_reads],
        [gex_path, atac_path],
    ):
        # Add FASTQ-related inputs 
        inputs[f"{prefix}.{library}FastqFiles"] = []

        bucket = s3_path.parts[1]
        key = '/'.join(s3_path.parts[2:])+"/"

        for r in reads:
            fastq_re = re.compile(f"{r}_\d{{3}}.fastq.gz$")
            fastqs = get_s3_objects(bucket, key, fastq_re, full_uri=True)
            inputs[f"{prefix}.{library}FastqFiles"] += fastqs

        # Note: FASTQ name is file name up to lane id (e.g. L001, L002, etc.)
        fastq_name_re = r".*/(.*)_L\d{3}_[A-Za-z]\d_\d{3}.fastq.gz$"
        fastq_name = re.match(fastq_name_re, inputs[f"{prefix}.{library}FastqFiles"][0])[1]
        inputs[f"{prefix}.{library}FastqName"] = fastq_name

    # Add labels to dictionary
    labels = dict()
    labels["pipelineType"] = prefix
    labels["project"] = get_project_id(sample_id, creds["user"], creds["password"])
    labels["sample"] = sample_name
    labels["owner"] = creds["user"]
    labels["destination"] = str((gex_path/output_dirname).as_uri())
    labels["transfer"] = "-"
    labels["comment"] = comment
    
    # Write inputs and labels to file
    path_to_inputs = f"{config_dir}/{sample_name}.inputs.json"
    with open(path_to_inputs, "w") as f_inputs:
        json.dump(inputs, f_inputs, indent=4)
        
    path_to_labels = f"{config_dir}/{sample_name}.labels.json"
    with open(path_to_labels, "w") as f_labels:
        json.dump(labels, f_labels, indent=4)
        
    stdouts.append(run(
        exec_path = path_to_exec,
        secrets = path_to_cromwell_secrets,
        inputs = path_to_inputs,
        labels = path_to_labels,
        options = path_to_options,
    ))

100%|██████████| 1/1 [00:01<00:00,  1.64s/it]


In [419]:
stdouts

[{'args': ['/Users/moormana/scing/bin/cellranger-arc-2.0.0/submit.sh',
   '-k',
   '/Users/moormana/.cromwell/cromwell-secrets.json',
   '-i',
   '/Users/moormana/scing/bin/cellranger-arc-2.0.0/configs/D11_MP150Cre_5_multiome.inputs.json',
   '-l',
   '/Users/moormana/scing/bin/cellranger-arc-2.0.0/configs/D11_MP150Cre_5_multiome.labels.json',
   '-o',
   '/Users/moormana/scing/bin/cellranger-arc-2.0.0/CellRangerArc.options.aws.json'],
  'returncode': 127,
  'stdout': '',
  'stderr': '/Users/moormana/scing/bin/cellranger-arc-2.0.0/submit.sh: line 32: cromwell-tools: command not found\n'}]