# Motif Detection of Every Gene Detected in Experimental Data

Teague McCracken 
03/30/2025

In [11]:
# Initialize the notebook, set environment to transcriptomics
import pandas as pd
import subprocess
import os
import multiprocessing
from pathlib import Path

## Step 1 - Extract promoter sequences for each gene

Inputs needed: <br>
1. gtf_file (reference genome information) <br>
2. gene_list_file (detected genes from experiment) <br>
3. genome.fa <br>
4. genomes size file Arabidopsis.genome (contains genome size information) <br>

Outputs: all outputs go to project checkpoint folder <br>
1. filtered_gtf (reduced version of Arabidopsis gtf file) <br>
2. tss.bed (bed format with transcription start sites) <br>
3. promoters.bed (bed format with 1000 bp upstream regions (promoters) of genes) <br>
4. promoters.fasta (the final output, sequences of promoter regions of each gene of interest) <br>

In [12]:
# File paths for step 1
gtf = "/home/temccrac/Programs/data/genomes/Arabidopsis_thaliana.TAIR10.54.gtf"
gene_list = "/home/temccrac/Programs/git_clones/ECE759_Project/cellular_clarity/genes_of_interest.txt"
genome_fa = "/home/temccrac/Programs/data/genomes/Arabidopsis_thaliana.TAIR10.dna.toplevel.fa"
genome_sizes = "/home/temccrac/Programs/data/genomes/Arabidopsis.genome"

checkpoint_path = '/home/temccrac/Programs/cellular_clarity_project/checkpoints'

In [13]:
# Core functions for promoter extraction
def filter_gtf_by_genes(gtf_file, gene_list_file, output_gtf):
    with open(gene_list_file, 'r') as f:
        genes = set(f.read().splitlines())

    with open(gtf_file, 'r') as gtf_in, open(output_gtf, 'w') as gtf_out:
        for line in gtf_in:
            if line.startswith('#'):
                continue
            if any(gene in line for gene in genes):
                gtf_out.write(line)

def extract_tss_bed(filtered_gtf, output_bed):
    awk_cmd = (
        '''awk '$3 == "transcript" {
            match($0, /gene_id "([^"]+)"/, m);
            gene = m[1];
            if ($7 == "+") {
                print $1 "\\t" $4-1 "\\t" $4 "\\t" gene "\\t.\\t" $7;
            } else {
                print $1 "\\t" $5-1 "\\t" $5 "\\t" gene "\\t.\\t" $7;
            }
        }' '''
    )
    full_cmd = f"{awk_cmd} {filtered_gtf} > {output_bed}"
    subprocess.run(full_cmd, shell=True, check=True)

def create_promoter_bed(tss_bed, genome_sizes, output_bed, length=1000):
    with open(output_bed, 'w') as out_f:
        subprocess.run([
            "bedtools", "flank",
            "-i", tss_bed,
            "-g", genome_sizes,
            "-l", str(length),
            "-r", "0",
            "-s"
        ], stdout=out_f, check=True)

def extract_promoter_fasta(genome_fa, promoter_bed, output_fasta):
    subprocess.run([
        "bedtools", "getfasta",
        "-fi", genome_fa,
        "-bed", promoter_bed,
        "-fo", output_fasta,
        "-s",
        "-name"  # preserves gene IDs in FASTA headers
    ], check=True)

# Functions to execute the above commands in parallel
def run_promoter_pipeline(chunk_id, gene_list_chunk, gtf, genome_fa, genome_sizes, outdir):
    filtered_gtf = f"{outdir}/filtered_{chunk_id}.gtf"
    tss_bed = f"{outdir}/tss_{chunk_id}.bed"
    promoter_bed = f"{outdir}/promoters_{chunk_id}.bed"
    fasta_out = f"{outdir}/promoters_{chunk_id}.fa"

    # Write chunked gene list
    gene_list_file = f"{outdir}/genes_{chunk_id}.txt"
    with open(gene_list_file, 'w') as f:
        f.write('\n'.join(gene_list_chunk))

    # Run sub-steps
    filter_gtf_by_genes(gtf, gene_list_file, filtered_gtf)
    extract_tss_bed(filtered_gtf, tss_bed)
    create_promoter_bed(tss_bed, genome_sizes, promoter_bed)
    extract_promoter_fasta(genome_fa, promoter_bed, fasta_out)

def parallel_promoter_extraction(genes_file, gtf, genome_fa, genome_sizes, outdir, num_cpus=10):
    with open(genes_file) as f:
        genes = f.read().splitlines()

    # Split gene list into chunks
    chunk_size = len(genes) // num_cpus
    chunks = [genes[i:i + chunk_size] for i in range(0, len(genes), chunk_size)]

    os.makedirs(outdir, exist_ok=True)

    args = [(i, chunks[i], gtf, genome_fa, genome_sizes, outdir) for i in range(len(chunks))]
    with multiprocessing.Pool(num_cpus) as pool:
        pool.starmap(run_promoter_pipeline, args)

    # Merge all .fa files
    merged_fasta = os.path.join(outdir, "promoters_merged.fa")
    with open(merged_fasta, 'w') as outfile:
        for i in range(len(chunks)):
            chunk_fasta = os.path.join(outdir, f"promoters_{i}.fa")
            with open(chunk_fasta, 'r') as infile:
                outfile.write(infile.read())

    print("✅ All promoters extracted and merged.")

In [14]:
# Run steps for Step 1
parallel_promoter_extraction(
    genes_file=gene_list,
    gtf=gtf,
    genome_fa=genome_fa,
    genome_sizes=genome_sizes,
    outdir=checkpoint_path,
    num_cpus=10
)

print("✅ Promoter FASTA extraction complete.")

✅ All promoters extracted and merged.
✅ Promoter FASTA extraction complete.


After running, I deleted all the unneccessary intermediate files and kept only the promoter_merged.fa file

Now, I check that all the genes of interests have promoters that were saved in the merged promoter file

In [None]:
def check_promoter_coverage(gene_list_file, promoters_fasta):
    # Read expected gene list
    with open(gene_list_file) as f:
        expected_genes = set(f.read().splitlines())

    # Read FASTA headers and extract gene IDs from lines like "AT5G23180::5:..."
    with open(promoters_fasta) as f:
        found_genes = set(
            line[1:].strip().split("::")[0]
            for line in f if line.startswith(">")
        )

    missing = expected_genes - found_genes
    extra = found_genes - expected_genes

    print(f"✅ Found promoters for {len(found_genes)} / {len(expected_genes)} genes.")
    if missing:
        print(f"❌ Missing {len(missing)} genes:")
        for gene in list(missing)[:10]:
            print("  ", gene)
    else:
        print("🎉 No missing genes.")

    if extra:
        print(f"⚠️ {len(extra)} unexpected gene(s) in output (not in your original list):")
        for gene in list(extra)[:5]:
            print("  ", gene)


# Example usage:
check_promoter_coverage(
    gene_list_file=gene_list,
    promoters_fasta="/home/temccrac/Programs/cellular_clarity_project/checkpoints/promoters_merged.fa"
)

✅ Found promoters for 32833 / 32833 genes.
🎉 No missing genes.


## Step # - Scan promoter sequences from each DPGP cluster for enriched motifs by comparing to a random 