# Spidroin curation
Setting up the required Python packages

In [None]:
import os
import pandas as pd
from Bio import SeqIO
import subprocess
import time
from dataclasses import dataclass
from collections import defaultdict

from spider_silkome_module import (
    PROJ_ROOT,
    RAW_DATA_DIR,
    INTERIM_DATA_DIR,
    PROCESSED_DATA_DIR,
    EXTERNAL_DATA_DIR,
    MODELS_DIR,
    FIGURES_DIR
)
from spider_silkome_module import (
    Position,
    GFFData
)
from spider_silkome_module import (
    positions_export
)

[32m2025-10-08 17:21:19.261[0m | [1mINFO    [0m | [36mspider_silkome_module.config[0m:[36m<module>[0m:[36m11[0m - [1mPROJ_ROOT path is: /home/gyk/project/spider_silkome[0m


Definition of the path for relevant data

In [None]:
spider_species_file = f"{EXTERNAL_DATA_DIR}/organisms.csv"
mechanical_properties_file = f"{EXTERNAL_DATA_DIR}/mechanical_properties.csv"
spidroin_fasta_file = f"{EXTERNAL_DATA_DIR}/spider-silkome-database.v1.prot.fixed.fasta"
spider_genome_path = f"{RAW_DATA_DIR}/spider_genome"
spidroin_path = f"{INTERIM_DATA_DIR}/spidroin"

Use miniprot to align the C-terminal and N-terminal sequences of the spidroin gene sequences to the genome of the new species.

In [None]:
spidroin_files = os.listdir(spidroin_path)
spider_genomes = [f for f in os.listdir(spider_genome_path) if f.endswith(".fa.gz")]
gnome_mpi_path = f"{interim_data_path}/genome_mpi"
miniprot_output_path = f"{interim_data_path}/miniprot"
os.makedirs(gnome_mpi_path, exist_ok=True)
os.makedirs(miniprot_output_path, exist_ok=True)
for spider_genome in spider_genomes:
    spider = spider_genome.split(".")[0]
    index_cmd = f"miniprot -t70 -d {gnome_mpi_path}/{spider}.mpi {spider_genome_path}/{spider_genome}"
    if not os.path.exists(f"{gnome_mpi_path}/{spider}.mpi"):
        indext_start_time = time.time()
        subprocess.run(index_cmd, shell=True)
        indext_end_time = time.time()
        print(f"Indexing {spider} takes {indext_end_time - indext_start_time} seconds")
    else:
        print(f"Indexing {spider} already exists")

    output_dir = f"{miniprot_output_path}/{spider}_all"
    os.makedirs(output_dir, exist_ok=True)
    align_cmd1 = f"miniprot -t 70 -I --gff {gnome_mpi_path}/{spider}.mpi {spidroin_fasta_file} > {output_dir}/{spider}.gff"
    align_cmd2 = f"miniprot -t 70 -I --aln {gnome_mpi_path}/{spider}.mpi {spidroin_fasta_file} > {output_dir}/{spider}.aln"
    if not os.path.exists(f"{output_dir}/{spider}.gff"):
        align_start_time = time.time()
        subprocess.run(align_cmd1, shell=True)
        align_end_time = time.time()
        print(f"Alignment {spider} takes {align_end_time - align_start_time} seconds")
    else:
        print(f"Alignment {spider} already exists")

    if not os.path.exists(f"{output_dir}/{spider}.mRNA.gff"):
        grep_cmd = f"grep 'mRNA' {output_dir}/{spider}.gff > {output_dir}/{spider}.mRNA.gff"
        subprocess.run(grep_cmd, shell=True)
    else:
        print(f"mRNA gff file already exists")

    if os.path.exists(f"{output_dir}/{spider}.mRNA.gff"):
        mRNA_gff = pd.read_csv(f"{output_dir}/{spider}.mRNA.gff", sep='\t', header=None)
        gff_header = ["seqid", "source", "type", "start", "end", "score", "strand", "frame", "attribute"]
        mRNA_gff.columns = gff_header
        spidroins = list(set([row["attribute"].split(';')[-1].split('|')[-2] for index, row in mRNA_gff.iterrows()]))
        print(f"Total number of spidroins in {spider}: {len(spidroins)}\n{spidroins}")
        for spidroin in spidroins:
            gff_spidroin_output = f"{output_dir}/{spider}.mRNA.{spidroin}.gff"
            if not os.path.exists(gff_spidroin_output):
                grep_cmd = f"grep '|{spidroin}|' {output_dir}/{spider}.mRNA.gff > {gff_spidroin_output}"
                subprocess.run(grep_cmd, shell=True)
            else:
                print(f"{gff_spidroin_output} already exists")

In [None]:
# for spidroin in spidroins:
spidroin = "MiSp"
spider = "Trichonephila_clavata"
output_dir = f"{miniprot_output_path}/{spider}_all"
spidroin_gff = f"{output_dir}/{spider}.mRNA.{spidroin}.gff"
attr_dict = {}
spidroin_gff_data = []
with open(spidroin_gff, 'r') as f:
    for line in f:
        if line.startswith('#'):
            continue

        fields = line.strip().split('\t')

        # Analysis of the attributes field
        for attr in fields[8].split(';'):
            if '=' in attr:
                key, value = attr.split('=', 1)
                attr_dict[key] = value

        # Create attributes object
        attr_obj = attributes(
            ID=attr_dict['ID'],
            Rank=int(attr_dict['Rank']),
            Identity=float(attr_dict['Identity']),
            Positive=float(attr_dict['Positive']),
            Target=attr_dict['Target'].split('|')
        )

        # Create gff_data object and add to list
        spidroin_gff_data.append(GFFData(
            seqid=fields[0],
            source=fields[1],
            type=fields[2],
            start=int(fields[3]),
            end=int(fields[4]),
            score=float(fields[5]),
            strand=fields[6],
            frame=fields[7],
            attributes=attr_obj
        ))

# Sort by positive with descending order
spidroin_gff_data.sort(key=lambda x: x.attributes.Positive, reverse=True)
spidroin_gff_data
# # Select the alignment with positive score > 0.99 if sepcies is same as spider
# spidroin_gff_data_C = []
# spidroin_gff_data_N = []
# spidroin_gff_data_filtered_C = []
# spidroin_gff_data_filtered_N = []
# for aln in spidroin_gff_data:
#     if aln.attributes.Target[-1].split(" ")[0] == 'CTD':
#         spidroin_gff_data_C.append(aln)
#         if aln.attributes.Positive > 0.9:
#             spidroin_gff_data_filtered_C.append(aln)
#     elif aln.attributes.Target[-1].split(" ")[0] == 'NTD':
#         spidroin_gff_data_N.append(aln)
#         if aln.attributes.Positive > 0.9:
#             spidroin_gff_data_filtered_N.append(aln)

In [None]:
from dataclasses import dataclass, field
from typing import Dict

@dataclass
class pos:
    chr: str
    strand: str
    start: Dict[int, int] = field(default_factory=dict)  # {position: count/score}
    end: Dict[int, int] = field(default_factory=dict)    # {position: count/score}

优化版本（带质量过滤），如果想根据比对质量进行加权或过滤：

In [None]:
# 用于存储每个染色体+链的位置信息
positions_dict = defaultdict(lambda: {'start': defaultdict(int), 'end': defaultdict(int)})

# 设置质量阈值
POSITIVE_THRESHOLD = 0.85

for aln in spidroin_gff_data:
    # 过滤低质量比对
    if aln.attributes.Positive < POSITIVE_THRESHOLD:
        continue

    chr_id = aln.seqid
    strand = aln.strand
    key = (chr_id, strand)

    # 判断是C端还是N端
    domain_type = aln.attributes.Target[-1].split(" ")[0]

    if domain_type == 'CTD':  # C-terminal domain
        if strand == '+':  # 正向：C端在后面，记录end位置
            pos_value = aln.end
            positions_dict[key]['end'][pos_value] += 1
        else:  # 反向：C端在前面（基因组坐标），记录start位置
            pos_value = aln.start
            positions_dict[key]['start'][pos_value] += 1

    elif domain_type == 'NTD':  # N-terminal domain
        if strand == '+':  # 正向：N端在前面，记录start位置
            pos_value = aln.start
            positions_dict[key]['start'][pos_value] += 1
        else:  # 反向：N端在后面（基因组坐标），记录end位置
            pos_value = aln.end
            positions_dict[key]['end'][pos_value] += 1

# 转换为 pos 对象列表
positions = []
for (chr_id, strand), pos_data in positions_dict.items():
    positions.append(pos(
        chr=chr_id,
        strand=strand,
        start=dict(pos_data['start']),
        end=dict(pos_data['end'])
    ))

# 按染色体和链排序
positions.sort(key=lambda x: (int(x.chr.replace('Chr', '')), x.strand))
positions

核心逻辑说明
关键理解：

正向链 (+)：基因从5'到3'，N端在前（start），C端在后（end）
反向链 (-)：基因在反向互补链上，基因组坐标中start < end，但生物学意义上C端对应较小的坐标
位置记录规则：

|类型|链方向|记录位置|原因|
|---|---|---|---|
|CTD|+|end|C端在基因末尾，向后延伸|
|CTD|-|start|C端在基因起始（基因组坐标小），向前延伸|
|NTD|+|start|N端在基因起始，向前延伸|
|NTD|-|end|N端在基因末尾（基因组坐标大），向后延伸|

In [None]:
# 导出数据
# 输出CSV
csv_output = spidroin_gff.replace('.gff', '.csv')
df = positions_export(positions, csv_output, format='csv')

# 输出GFF
gff_output = spidroin_gff.replace('.gff', '.combined.gff')
gff_records = positions_export(positions, gff_output, format='gff', spidroin=spidroin)

# 查看统计信息
print(f"\n统计信息-{spidroin}:")
print(f"总共 {len(positions)} 个染色体-链组合")
print(f"有效组合（同时有start和end）: {sum(1 for p in positions if p.start and p.end)}")
print(f"预测的基因数量: {len(gff_records)}")

代码说明：

1. CSV输出：
- 每行包含：染色体、链、类型（start/end）、位置、计数
- 跳过空的start和end
- 按染色体、链、位置排序
2. GFF输出：
- 只输出同时有start和end的记录
- 配对所有可能的start-end组合
- 过滤长度超过100kbp的基因
- 包含计数信息在attributes字段中
- GFF格式符合标准（version 3）
3. 配对逻辑：
- 正向链：start < end
- 反向链：start < end（基因组坐标，但生物学上C端在前）
- 计算基因长度并过滤