In [1]:
#=== Installs and imports ===#
import pipeline_utils
import os
import gzip
import requests

## 
import pandas as pd

In [2]:
#=== rewrite this using pandas ===#

##
genome_fasta_url = "http://ftp.ensembl.org/pub/release-111/fasta/homo_sapiens/dna/Homo_sapiens.GRCh38.dna.primary_assembly.fa.gz"
genome_gff3_url = "http://ftp.ensembl.org/pub/release-111/gff3/homo_sapiens/Homo_sapiens.GRCh38.111.gff3.gz"
output_dir = "./genome_data"
max_proximity_bp = 1000

#=== ===#
# Note, proximity is measured by 
def identify_converging_genes(fasta_url, gff3_url, output_dir = "../0.local/generic-single-cell-pipeline/genome_data", gene_proximity = 1000, verbose=True):

    ## Log the function and params
    if verbose:
        print("Called identify_converging_genes function")
        print(f"Target fasta url: {fasta_url}")
        print(f"Target gff3 url: {gff3_url}")
        print(f"Output directory for genome files: {output_dir}")
        print(f"Minimum distance between gene pairs: {gene_proximity}")

    ## Instantiate the dataframe
    df_gene = pd.DataFrame(columns=["gene", "chromosome", "start", "end", "strand"])

    ## Download the required files, skip if already present
    os.makedirs(output_dir, exist_ok=True)
    fasta_path = os.path.join(output_dir, os.path.basename(genome_fasta_url))
    gff3_path = os.path.join(output_dir, os.path.basename(genome_gff3_url))

    ##
    if verbose:
        print(f"Destination filepath for fasta: {fasta_path}")
        print(f"Destination filepath for gff3: {gff3_url}")

    ## Download the required genome and annotation files ##
    if verbose:
        print("Attempting file downloads")
    ##
    download_file(genome_fasta_url, fasta_path)
    download_file(genome_gff3_url, gff3_path)

# --- PARSE GFF3 ---
def parse_gff3_genes(gff3_file):
    genes = []
    with gzip.open(gff3_file, 'rt') as f:
        for line in f:
            if line.startswith("#"):
                continue
            fields = line.strip().split("\t")
            if len(fields) < 9:
                continue
            chrom, source, feature, start, end, score, strand, phase, attributes = fields
            if feature != "gene":
                continue
            attr_dict = {kv.split("=")[0]: kv.split("=")[1] for kv in attributes.split(";") if "=" in kv}
            gene_name = attr_dict.get("Name", "unknown")
            genes.append(Gene(chrom, int(start), int(end), strand, gene_name))
    return genes

genes = parse_gff3_genes(gff3_path)
print(f"Parsed {len(genes)} genes.")

# --- FIND CLOSE OPPOSITE GENE PAIRS ---
def find_facing_gene_pairs(genes, max_distance):
    genes_by_chrom = {}
    for gene in genes:
        genes_by_chrom.setdefault(gene.chrom, []).append(gene)

    close_pairs = []
    for chrom, chrom_genes in genes_by_chrom.items():
        sorted_genes = sorted(chrom_genes, key=lambda g: g.start)
        for i in range(len(sorted_genes) - 1):
            g1 = sorted_genes[i]
            g2 = sorted_genes[i + 1]
            distance = g2.start - g1.end
            if 0 <= distance <= max_distance:
                # Check if they face each other
                if g1.strand == '+' and g2.strand == '-':
                    close_pairs.append((g1, g2))
                elif g1.strand == '-' and g2.strand == '+':
                    close_pairs.append((g2, g1))
    return close_pairs

facing_pairs = find_facing_gene_pairs(genes, max_proximity_bp)
print(f"Found {len(facing_pairs)} facing gene pairs within {max_proximity_bp} bp.")

# --- OUTPUT RESULTS ---
for g1, g2 in facing_pairs:
    print(f"{g1.name} ({g1.chrom}:{g1.start}-{g1.end} {g1.strand}) ↔ {g2.name} ({g2.chrom}:{g2.start}-{g2.end} {g2.strand})")


SyntaxError: unterminated string literal (detected at line 32) (2019479794.py, line 32)

In [None]:
gene_pairs = [
    # (gene1_name, chr1, start1, end1, strand1, gene2_name, chr2, start2, end2, strand2)
    ("GeneA", "chr1", 100, 300, "+", "GeneB", "chr1", 500, 700, "-"),
    ("GeneC", "chr2", 1000, 1200, "+", "GeneD", "chr2", 1400, 1600, "+"),
    ("GeneE", "chr3", 2000, 2200, "-", "GeneF", "chr3", 2400, 2600, "-"),
]

import matplotlib.pyplot as plt
import matplotlib.patches as patches

# Sample input
gene_pairs = [
    ("GeneA", "chr1", 100, 300, "+", "GeneB", "chr1", 500, 700, "-"),
    ("GeneC", "chr2", 1000, 1200, "+", "GeneD", "chr2", 1400, 1600, "+"),
    ("GeneE", "chr3", 2000, 2200, "-", "GeneF", "chr3", 2400, 2600, "-"),
]

def plot_gene(ax, name, start, end, strand, y, color="skyblue"):
    width = end - start
    arrowprops = dict(facecolor=color, edgecolor="black", width=1, head_width=4, head_length=10)
    if strand == "+":
        ax.add_patch(patches.FancyArrow(start, y, width, 0, **arrowprops))
    else:
        ax.add_patch(patches.FancyArrow(end, y, -width, 0, **arrowprops))
    ax.text((start + end) / 2, y + 1.5, name, ha="center", fontsize=8)

# Set up figure
fig, ax = plt.subplots(figsize=(10, len(gene_pairs)*2))
ax.set_xlim(0, 3000)  # Adjust based on max coordinate in your data
ax.set_ylim(-1, len(gene_pairs)*5)

# Draw each gene pair
for i, (g1, chr1, s1, e1, strand1, g2, chr2, s2, e2, strand2) in enumerate(gene_pairs):
    y = i * 5
    ax.text(10, y + 2, f"{chr1}", fontsize=8, ha="left", color="gray")
    plot_gene(ax, g1, s1, e1, strand1, y)
    plot_gene(ax, g2, s2, e2, strand2, y)
    ax.plot([e1, s2], [y, y], "k--", linewidth=0.5)  # optional dashed line between genes

# Formatting
ax.set_xlabel("Genomic Coordinate")
ax.set_yticks([])
ax.set_title("Gene Pairs")
plt.tight_layout()
plt.show()