In [None]:
import os
import pandas as pd
import re
import matplotlib.pyplot as plt
import torch
import cv2
import numpy as np
from graphviz import Digraph


In [None]:
def count_numeric_subfolders(folder_path):
    if not os.path.exists(folder_path):
        raise FileNotFoundError(f"エラー: 指定されたパスが見つかりません: {folder_path}")
    if not os.path.isdir(folder_path):
        raise NotADirectoryError(f"エラー: 指定されたパスはフォルダではありません: {folder_path}")
    numeric_folder_count = 0
    for item in os.listdir(folder_path):
        item_path = os.path.join(folder_path, item)
        if os.path.isdir(item_path) and item.isdigit():
            numeric_folder_count += 1
    return numeric_folder_count

In [None]:
def filter_co_occur(data, sample_name, data_len, max_co_occur, out_num):
    filted_data = []
    filted_sample_name = []
    filted_data_len = []
    for i in range(len(data)):
        compare = 0
        for j in range(len(data[i])):
            mutation = data[i][j].split(',')
            if compare < len(mutation):
                compare = len(mutation)
        if compare <= max_co_occur:
            filted_data.append(data[i])
            filted_sample_name.append(sample_name[i])
            filted_data_len.append(data_len[i])
        if len(filted_data) >= out_num:
            break
    return filted_data, filted_sample_name, filted_data_len

In [None]:
def import_mutation_paths(base_dir, strain):
    """
    指定されたstrainディレクトリからmutation_paths_"strain".tsvを読み込む。

    Parameters:
        base_dir (str): ベースディレクトリのパス。
        strain (str): 読み込み対象のstrain名。

    Returns:
        list: 読み込んだTSVファイルのパスのリスト。
    """
    # ホームディレクトリを展開
    base_dir = os.path.expanduser(base_dir)
    strain_dir = os.path.join(base_dir, strain)

    # strain直下のファイルパスを確認
    file_paths = []
    file_path = os.path.join(strain_dir, f"mutation_paths_{strain}.tsv")
    if os.path.exists(file_path):
        file_paths.append(file_path)
    
    # strain/numサブディレクトリを探索
    else:
        if os.path.exists(strain_dir) and os.path.isdir(strain_dir):
            num_dirs = [d for d in os.listdir(strain_dir) if d.isdigit()]
            num_dirs.sort(key=int)  # 数字順にソート

            for num in num_dirs:
                file_path = os.path.join(strain_dir, num, f"mutation_paths_{strain}.tsv")
                if os.path.exists(file_path):
                    file_paths.append(file_path)

    if not file_paths:
        raise FileNotFoundError(f"mutation_paths_{strain}.tsvが{strain_dir}内に見つかりませんでした。")

    return file_paths


In [None]:
if __name__ == "__main__":
    # --- データ読み込み・前処理 ---
    #strains = ['B.1.1.7','P.1','BA.2','BA.1.1','BA.1','B.1.617.2','B.1.351','B.1.1.529']
    strains = ['P.1']
    out_num = 1000000
    dir = '~/usher_output/'
    max_co_occur = 5

    # 全件データの読み込み
    names = []
    lengths = []
    paths = []
    for strain in strains:
        file_paths = import_mutation_paths(dir,strain)
        for file_path in file_paths:
            print(f"[INFO]import: {file_path}")
            f = open(file_path, 'r',encoding="utf-8_sig")
            datalist = f.readlines()
            f.close()
            for i in range(1,len(datalist)):
                data = datalist[i].split('\t')
                names.append(data[0])
                lengths.append(int(data[1]))
                paths.append(data[2].rstrip().split('>'))
        
    print(f"[INFO] 全件読み込み完了: {len(paths)} サンプル")
    filtered_paths, filtered_name, filtered_length = filter_co_occur(paths, names, lengths, max_co_occur, out_num)
    print(f"[INFO] 共起数フィルタリング完了: {len(filtered_paths)} サンプル")
    max_timestep=max(lengths)

In [None]:
def build_graphviz_tree(mutation_paths):
    dot = Digraph(format="png")
    dot.attr(rankdir="LR") 
    dot.node("root", "root (timestep: 0)")  # ルートノードを作成

    # ノードを一意に管理する辞書
    nodes = {"root": "root"}  # ノード名をキー、ノードIDを値として管理
    edges = set()  # エッジの重複を防ぐためのセット

    for path in mutation_paths:
        parent = "root"  # ルートノードから開始
        for timestep, mutation in enumerate(path, start=1):  # タイムステップを明確に管理
            # ノード識別子にタイムステップを含める
            node_identifier = f"{mutation}_{timestep}"
            if node_identifier not in nodes:
                node_label = f"{mutation} (timestep: {timestep})"
                nodes[node_identifier] = node_identifier
                dot.node(node_identifier, node_label)
            # エッジを追加（重複を防ぐ）
            edge = (parent, nodes[node_identifier])
            if edge not in edges:
                dot.edge(*edge)
                edges.add(edge)
            # 現在のノードを親として更新
            parent = nodes[node_identifier]

    return dot

In [None]:
# ツリー構造を構築
dot = build_graphviz_tree(paths[:10])

# ツリーを描画
dot.render("mutation_tree")  # mutation_tree.pngとして保存し、表示