In [3]:
import os

import pandas as pd

In [None]:
gene_info = pd.DataFrame(columns=["gene", "chr", "start", "end", "strand"])


with open("D:/OneDrive/NAS/01.科研相关/01.博士/01.data/03.生信挖掘/20.水稻NLR-pairs/data/gff3/534M.gff3") as f:
    for line in f:
        if line.startswith("#") or not line.strip():
            continue
        parts = line.strip().split("\t")
        # parse gene info
        if parts[2] == "gene":
            gene_id = parts[8].split(";")[0].replace("ID=", "")
            gene_chr = parts[0]
            gene_start = parts[3]
            gene_end = parts[4]
            gene_strand = parts[6]

            data = [[gene_id, gene_chr, gene_start, gene_end, gene_strand]]
            col_name = ["gene", "chr", "start", "end", "strand"]
            gene_info_temp = pd.DataFrame(data=data, columns=col_name)

            gene_info = pd.concat([gene_info, gene_info_temp])
    
    gene_info.head()




In [29]:
import pandas as pd

def parse_gff_genes(gff_path):
    """
    高效解析GFF3文件中的基因信息
    改进点：
    1. 增强属性字段解析能力
    2. 增加错误处理机制
    3. 优化内存管理和执行效率
    4. 规范数据类型转换
    """
    # 预分配内存代替重复concat 
    data_records = []
    
    with open(gff_path) as f:
        for line_num, line in enumerate(f, 1):
            # 跳过注释和空行 
            if line.startswith("#") or not line.strip():
                continue
            
            # 分割字段并进行格式校验 
            parts = line.strip().split("\t")
            if len(parts) < 9:
                print(f"Line {line_num} 格式错误，跳过: {line.strip()}")
                continue
            
            # 仅处理基因记录
            if parts[2].lower() != "gene":
                continue
            
            try:
                # 解析基础字段（强制类型转换）
                seqid = parts[0]
                start = int(parts[3])
                end = int(parts[4])
                strand = parts[6] if parts[6] in ('+', '-', '.') else None
                
                # 增强型属性解析 
                attributes = {}
                for pair in parts[8].split(';'):
                    pair = pair.strip()
                    if not pair:
                        continue
                    if '=' in pair:
                        key, value = pair.split('=', 1)
                        attributes[key.strip()] = value.strip()
                    else:
                        attributes[pair] = None  # 处理无值属性
                
                # 优先获取标准ID，兼容非标准字段 
                gene_id = attributes.get('ID') or attributes.get('gene_id') or f"unknown_gene_{line_num}"
                
                # 收集数据（避免内存碎片）
                data_records.append({
                    "gene": gene_id,
                    "chr": seqid,
                    "start": start,
                    "end": end,
                    "strand": strand,
                    "attributes": str(attributes)  # 保留原始属性备用
                })
                
            except Exception as e:
                print(f"解析第 {line_num} 行失败: {line.strip()}")
                print(f"错误详情: {str(e)}")
    
    # 批量创建DataFrame 
    return pd.DataFrame(data_records, columns=["gene", "chr", "start", "end", "strand", "attributes"])

In [92]:
import pandas as pd

def parse_gff_features(gff_path):
    """
    增强版GFF3解析器，同时提取基因和mRNA信息并建立关联
    改进点：
    1. 同步解析gene和mRNA信息
    2. 建立基因-mRNA的层级关系
    3. 增强属性解析兼容性
    4. 增加跨特征校验
    """
    # 预分配双缓存结构
    gene_records = []
    mrna_records = []
    gene_id_map = {}  # 用于校验基因存在性
    
    with open(gff_path) as f:
        for line_num, line in enumerate(f, 1):
            # 跳过注释和空行
            if line.startswith("#") or not line.strip():
                continue

            # 分割字段并进行格式校验
            parts = line.strip().split("\t")
            if len(parts) < 9:
                print(f"Line {line_num} 格式错误，跳过: {line.strip()}")
                continue

            feature_type = parts[2].lower()
            if feature_type not in ['gene', 'mrna']:  # 根据GFF3标准处理关键特征
                continue

            try:
                # 解析通用字段
                seqid = parts[0]
                start = int(parts[3])
                end = int(parts[4])
                strand = parts[6] if parts[6] in ('+', '-', '.') else None
                
                # 增强型属性解析（兼容不同格式）
                attributes = {}
                for pair in parts[8].replace("%20", " ").split(';'):  # 处理URL编码
                    pair = pair.strip()
                    if not pair:
                        continue
                    if '=' in pair:
                        key, value = pair.split('=', 1)
                        attributes[key.strip().lower()] = value.strip()  # 统一小写处理
                    else:
                        attributes[pair] = None

                # 特征类型分处理
                if feature_type == 'gene':
                    # 基因ID解析（兼容NCBI/Ensembl不同格式）
                    gene_id = attributes.get('id') or attributes.get('geneid') or f"GENE_{line_num}"
                    gene_id_map[gene_id] = True  # 注册基因ID
                    
                    gene_records.append({
                        "gene_id": gene_id,
                        "chr": seqid,
                        "gene_start": start,
                        "gene_end": end,
                        "strand": strand,
                        "gene_type": attributes.get('biotype') or attributes.get('type'),
                        "gene_name": attributes.get('name') or attributes.get('gene_name')
                        # "attributes": str(attributes)
                    })
                    
                elif feature_type == 'mrna':
                    # 关联基因ID（处理不同Parent格式）
                    parent_id = attributes.get('parent') or attributes.get('gene_id')
                    if not parent_id:
                        print(f"Line {line_num} mRNA缺少Parent属性: {line.strip()}")
                        continue
                    
                    # 标准化父ID（处理ID:XXXX格式）
                    parent_id = parent_id.split(':')[-1].replace("gene_", "")  # 兼容NCBI的gene:XXX格式
                    
                    # 校验父基因存在性
                    if parent_id not in gene_id_map:
                        print(f"Line {line_num} mRNA的Parent基因不存在: {parent_id}")
                        continue
                        
                    mrna_records.append({
                        "gene_id": parent_id,
                        "mrna_id": attributes.get('id') or f"mRNA_{line_num}",
                        "mrna_start": start,
                        "mrna_end": end,
                        "exon_count": len([k for k in attributes if 'exon' in k]),
                        "transcript_type": attributes.get('transcript_type') or attributes.get('biotype'),
                        "product": attributes.get('product') or attributes.get('description')
                    })

            except Exception as e:
                print(f"解析第 {line_num} 行失败: {line.strip()}")
                print(f"错误详情: {str(e)}")
                continue

    # 创建DataFrame并关联
    gene_df = pd.DataFrame(gene_records)
    gene_df['index_temp'] =  gene_df.groupby('chr').cumcount() + 1

    mrna_df = pd.DataFrame(mrna_records)
    
    # 左连接合并（保留无mRNA的基因）
    merged_df = pd.merge(gene_df, mrna_df, on='gene_id', how='left')
    
    # 后处理
    merged_df.sort_values(["chr", "gene_start", "mrna_start"], inplace=True)
    merged_df.reset_index(drop=True, inplace=True)
    # merged_df.sort_values(by=['chr', 'gene_start'], ascending=False, inplace=True)
    # merged_df['index'] = merged_df.groupby(['chr','gene_name']).ngroup() + 1

    return merged_df

In [93]:
result = parse_gff_features("D:/OneDrive/NAS/01.科研相关/01.博士/01.data/03.生信挖掘/20.水稻NLR-pairs/data/gff3/Nip_hifi.gff3")

# result.to_csv("D:/OneDrive/NAS/01.科研相关/01.博士/01.data/03.生信挖掘/20.水稻NLR-pairs/test.csv")

result.head()

result[result.index_temp == 1]

Unnamed: 0,gene_id,chr,gene_start,gene_end,strand,gene_type,gene_name,index_temp,mrna_id,mrna_start,mrna_end,exon_count,transcript_type,product
0,AGIS_Os01g000010,Chr1,7305,15219,+,,AGIS_Os01g000010,1,AGIS_Os01g000010.mRNA1,7305,15219,0,,
1,AGIS_Os01g000010,Chr1,7305,15219,+,,AGIS_Os01g000010,1,AGIS_Os01g000010.mRNA2,7386,14964,0,,
8080,AGIS_Os10g000010,Chr10,8582,10243,-,,AGIS_Os10g000010,1,AGIS_Os10g000010.mRNA1,8582,10243,0,,
12222,AGIS_Os11g000010,Chr11,14845,18301,-,,AGIS_Os11g000010,1,AGIS_Os11g000010.mRNA1,14845,16967,0,,
12223,AGIS_Os11g000010,Chr11,14845,18301,-,,AGIS_Os11g000010,1,AGIS_Os11g000010.mRNA2,14845,18301,0,,
16990,AGIS_Os12g000010,Chr12,11015,22326,-,,AGIS_Os12g000010,1,AGIS_Os12g000010.mRNA1,11015,22326,0,,
16991,AGIS_Os12g000010,Chr12,11015,22326,-,,AGIS_Os12g000010,1,AGIS_Os12g000010.mRNA2,11015,22326,0,,
16992,AGIS_Os12g000010,Chr12,11015,22326,-,,AGIS_Os12g000010,1,AGIS_Os12g000010.mRNA3,11015,22326,0,,
16993,AGIS_Os12g000010,Chr12,11015,22326,-,,AGIS_Os12g000010,1,AGIS_Os12g000010.mRNA4,11015,22326,0,,
16994,AGIS_Os12g000010,Chr12,11015,22326,-,,AGIS_Os12g000010,1,AGIS_Os12g000010.mRNA5,11015,22326,0,,


In [94]:
import pandas as pd

def parse_gff_features(gff_path):
    """
    Enhanced GFF3 parser that extracts both gene and mRNA information while establishing their relationships
    Improvements:
    1. Simultaneously parses gene and mRNA information
    2. Establishes gene-mRNA hierarchical relationships
    3. Enhanced attribute parsing compatibility
    4. Added cross-feature validation
    """
    # Pre-allocate dual cache structure
    gene_records = []
    mrna_records = []
    gene_id_map = {}  # For gene existence validation
    
    with open(gff_path) as f:
        for line_num, line in enumerate(f, 1):
            # Skip comments and empty lines
            if line.startswith("#") or not line.strip():
                continue

            # Split fields and validate format
            parts = line.strip().split("\t")
            if len(parts) < 9:
                print(f"Line {line_num} format error, skipping: {line.strip()}")
                continue

            feature_type = parts[2].lower()
            if feature_type not in ['gene', 'mrna']:  # Process key features according to GFF3 standard
                continue

            try:
                # Parse common fields
                seqid = parts[0]
                start = int(parts[3])
                end = int(parts[4])
                strand = parts[6] if parts[6] in ('+', '-', '.') else None
                
                # Enhanced attribute parsing (compatible with different formats)
                attributes = {}
                for pair in parts[8].replace("%20", " ").split(';'):  # Handle URL encoding
                    pair = pair.strip()
                    if not pair:
                        continue
                    if '=' in pair:
                        key, value = pair.split('=', 1)
                        attributes[key.strip().lower()] = value.strip()  # Unified lowercase processing
                    else:
                        attributes[pair] = None

                # Feature type specific processing
                if feature_type == 'gene':
                    # Gene ID parsing (compatible with NCBI/Ensembl different formats)
                    gene_id = attributes.get('id') or attributes.get('geneid') or f"GENE_{line_num}"
                    gene_id_map[gene_id] = True  # Register gene ID
                    
                    gene_records.append({
                        "gene_id": gene_id,
                        "chr": seqid,
                        "gene_start": start,
                        "gene_end": end,
                        "strand": strand,
                        "gene_type": attributes.get('biotype') or attributes.get('type'),
                        "gene_name": attributes.get('name') or attributes.get('gene_name')
                        # "attributes": str(attributes)
                    })
                    
                elif feature_type == 'mrna':
                    # Associate gene ID (handle different Parent formats)
                    parent_id = attributes.get('parent') or attributes.get('gene_id')
                    if not parent_id:
                        print(f"Line {line_num} mRNA missing Parent attribute: {line.strip()}")
                        continue
                    
                    # Standardize parent ID (handle ID:XXXX format)
                    parent_id = parent_id.split(':')[-1].replace("gene_", "")  # Compatible with NCBI's gene:XXX format
                    
                    # Validate parent gene existence
                    if parent_id not in gene_id_map:
                        print(f"Line {line_num} mRNA's parent gene doesn't exist: {parent_id}")
                        continue
                        
                    mrna_records.append({
                        "gene_id": parent_id,
                        "mrna_id": attributes.get('id') or f"mRNA_{line_num}",
                        "mrna_start": start,
                        "mrna_end": end,
                        "exon_count": len([k for k in attributes if 'exon' in k]),
                        "transcript_type": attributes.get('transcript_type') or attributes.get('biotype'),
                        "product": attributes.get('product') or attributes.get('description')
                    })

            except Exception as e:
                print(f"Failed to parse line {line_num}: {line.strip()}")
                print(f"Error details: {str(e)}")
                continue

    # Create DataFrames and establish relationships
    gene_df = pd.DataFrame(gene_records)
    gene_df['index_temp'] =  gene_df.groupby('chr').cumcount() + 1

    mrna_df = pd.DataFrame(mrna_records)
    
    # Left join merge (keep genes without mRNAs)
    merged_df = pd.merge(gene_df, mrna_df, on='gene_id', how='left')
    
    # Post-processing
    merged_df.sort_values(["chr", "gene_start", "mrna_start"], inplace=True)
    merged_df.reset_index(drop=True, inplace=True)
    # merged_df.sort_values(by=['chr', 'gene_start'], ascending=False, inplace=True)
    # merged_df['index'] = merged_df.groupby(['chr','gene_name']).ngroup() + 1

    return merged_df


In [162]:
result = parse_gff_features("D:/OneDrive/NAS/01.科研相关/01.博士/01.data/03.生信挖掘/20.水稻NLR-pairs/data/gff3/534M.gff3")

result.head()

Unnamed: 0,gene_id,chr,gene_start,gene_end,strand,gene_type,gene_name,index_temp,mrna_id,mrna_start,mrna_end,exon_count,transcript_type,product
0,534M_042.1,534M_042,9208,9486,+,,534M_042.1,1,534M_042.1,9208,9486,0,,
1,534M_042.2,534M_042,33865,36975,+,,534M_042.2,2,534M_042.2,33865,36975,0,,
2,534M_042.3,534M_042,57347,58144,+,,534M_042.3,3,534M_042.3,57347,58144,0,,
3,534M_042.4,534M_042,114074,114424,-,,534M_042.4,4,534M_042.4,114074,114424,0,,
4,534M_042.5,534M_042,117691,118542,-,,534M_042.5,5,534M_042.5,117691,118542,0,,


In [None]:
with open("D:/OneDrive/NAS/01.科研相关/01.博士/01.data/03.生信挖掘/20.水稻NLR-pairs/result/03.nlr-pairs/534M.nlr.id.txt") as f:
    for line in f:
        id_temp = line.strip().split("\t")[0]
        
        # 检查列名是否存在
        if 'mrna_id' not in result.columns:
            raise KeyError("Column 'mrna_id' not found in DataFrame.")
        
        info_temp = result[result['mrna_id'] == id_temp]
        
        # 处理未找到的情况
        if info_temp.empty:
            print(f"Warning: {id_temp} not found. Skipping...")
            continue
        
        # 提取具体值（取第一行）
        try:
            current_chr = info_temp['chr'].iloc[0]
            current_index = info_temp['index_temp'].iloc[0]
        except KeyError as e:
            print(f"Column error: {e}")
            break
        
        # 计算范围
        min_index = current_index - 3
        max_index = current_index + 3
        
        # 筛选逻辑（确保数值类型）
        pairs_temp = result[
            (result['chr'] == current_chr) & 
            (result['index_temp'].astype(int) > min_index) & 
            (result['index_temp'].astype(int) < max_index) &
            (result['index_temp'].astype(int) != current_index)
        ]
        
        # 组合结果
        for row in pairs_temp.itertuples():
        
            pairs = pd.DataFrame({"chr":[info_temp["chr"].iloc[0]],
                                    "gene_id_1":[info_temp["gene_id"].iloc[0]],
                                    "gene_start_1":[info_temp["gene_start"].iloc[0]],
                                    "gene_end_1":[info_temp["gene_end"].iloc[0]],
                                    "gene_id_2":[row.gene_id], 
                                    "gene_start_2":[row.gene_start], 
                                    "gene_end_2":[row.gene_end]})

    chr     gene_id_1  gene_start_1  gene_end_1     gene_id_2  gene_start_2  \
0  chr2  534M_001.798       5488790     5490322  534M_001.800       5482582   

   gene_end_2  
0     5483568  
    chr     gene_id_1  gene_start_1  gene_end_1     gene_id_2  gene_start_2  \
0  chr2  534M_001.798       5488790     5490322  534M_001.799       5484617   

   gene_end_2  
0     5485198  
    chr     gene_id_1  gene_start_1  gene_end_1     gene_id_2  gene_start_2  \
0  chr2  534M_001.798       5488790     5490322  534M_001.797       5495131   

   gene_end_2  
0     5496195  
    chr     gene_id_1  gene_start_1  gene_end_1     gene_id_2  gene_start_2  \
0  chr2  534M_001.798       5488790     5490322  534M_001.796       5500301   

   gene_end_2  
0     5501848  
    chr     gene_id_1  gene_start_1  gene_end_1     gene_id_2  gene_start_2  \
0  chr2  534M_001.796       5500301     5501848  534M_001.798       5488790   

   gene_end_2  
0     5490322  
    chr     gene_id_1  gene_start_1  gene_end

In [None]:
# 示例使用方式：
# 假设输入的id列表存储在input_ids.txt中，每行一个id
with open('input_ids.txt', 'r') as f:
    id_list = [line.strip() for line in f]

# 假设GFF数据已读取为gff_df
gff_df = pd.read_csv('your_gff_file.tsv', sep='\t')

# 调用函数
result_df = find_gene_pairs(id_list, gff_df, distance=3)

# 写入文件
result_df.to_csv('gene_pairs_output.tsv', sep='\t', index=False)

In [None]:
import pandas as pd

def find_gene_pairs(gene_info: pd.DataFrame, gene_id_input: list) -> pd.DataFrame:
    """
    根据基因ID在染色体上的索引范围筛选邻近基因对
    
    参数：
        gene_info : 包含基因信息的DataFrame，必须包含列：
                   mrna_id, chr, index_temp, gene_id, gene_start, gene_end
        gene_id_input : 要查询的基因ID列表或文件路径
    
    返回：
        包含所有符合条件的基因对DataFrame
    """
    # 检查必要列是否存在 
    required_cols = {'mrna_id','chr','index_temp','gene_id','gene_start','gene_end'}
    if not required_cols.issubset(gene_info.columns):
        missing = required_cols - set(gene_info.columns)
        raise KeyError(f"缺失必要列: {missing}")

    # 初始化结果容器
    all_pairs = []
    
    # 处理输入类型（支持文件路径或ID列表）
    if isinstance(gene_id_input, str):
        with open(gene_id_input) as f:
            gene_ids = [line.strip().split("\t")[0] for line in f]
    else:
        gene_ids = gene_id_input

    # 主处理逻辑
    for id_temp in gene_ids:
        # 查找目标基因信息
        target_gene = gene_info[gene_info['mrna_id'] == id_temp]
        
        if target_gene.empty:
            print(f"警告: 未找到基因 {id_temp}")
            continue
        
        # 提取坐标参数 
        try:
            current_data = target_gene.iloc[0]
            chrom = current_data['chr']
            idx = int(current_data['index_temp'])
        except (KeyError, ValueError) as e:
            print(f"数据格式错误: {e}")
            continue

        # 计算索引范围
        search_range = (
            (gene_info['chr'] == chrom) &
            (gene_info['index_temp'].astype(int).between(idx-2, idx+2)) &  # 包含边界
            (gene_info['index_temp'] != idx)
        )
        
        # 筛选邻近基因 
        nearby_genes = gene_info[search_range]
        
        # 生成配对信息
        for _, neighbor in nearby_genes.iterrows():
            pair = {
                "chr": chrom,
                "gene_id_1": current_data['gene_id'],
                "gene_start_1": current_data['gene_start'],
                "gene_end_1": current_data['gene_end'],
                "gene_id_2": neighbor['gene_id'],
                "gene_start_2": neighbor['gene_start'],
                "gene_end_2": neighbor['gene_end']
            }
            all_pairs.append(pair)

    # 将col1和col2的值排序后合并为新列
    all_pairs = pd.DataFrame(all_pairs) if all_pairs else pd.DataFrame()
    all_pairs["sorted_cols"] = all_pairs.apply(lambda row: "_".join(sorted([row["gene_id_1"], row["gene_id_2"]])), axis=1)
    df_clean = all_pairs.drop_duplicates(subset=["chr", "sorted_cols"], keep="first")
    df_clean = df_clean.drop(columns=["sorted_cols"])
    df_clean = df_clean.sort_values(by=['chr', 'gene_start'], ascending=[True, True])
    
    return df_clean

In [181]:
# 使用示例
gene_df = pd.read_csv("D:/OneDrive/NAS/01.科研相关/01.博士/01.data/03.生信挖掘/20.水稻NLR-pairs/result/03.nlr-pairs/534M.nlr.id.txt", header=None, names=["temp_id"]) 
gene_df.head()

Unnamed: 0,temp_id
0,534M_001.798
1,534M_001.796
2,534M_001.413
3,534M_002.28
4,534M_002.336


In [182]:
 # 加载基因信息
result_final = find_gene_pairs(
    gene_info=result,
    gene_id_input=gene_df.temp_id  # 或传入文件路径
)

# # 保存结果
result_final.to_csv("D:/OneDrive/NAS/01.科研相关/01.博士/01.data/03.生信挖掘/20.水稻NLR-pairs/gene_pairs.csv", index=False)

# print(result_final)