# Загрузка пакетов и данных

In [None]:
!pip install biopython matplotlib seaborn pandas numpy

In [None]:
# загрузка рабочих директорий
from Bio import SeqIO
import os
import gzip
from google.colab import drive

# Подключение к Google Drive
drive.mount('/content/drive')

# Путь к рабочей папке
work_dir = "/content/drive/MyDrive/Colab Notebooks/IT_expedition/CortDB/"
os.chdir(work_dir)


In [None]:
# сверка файлов и таблицы
import os
import pandas as pd

# Пути
work_dir = "/content/drive/MyDrive/Colab Notebooks/IT_expedition/CortDB/"
rawseq_dir = os.path.join(work_dir, "rawseq")
sequences_file = os.path.join(work_dir, "sequences.csv")

# 1. Получаем список файлов в rawseq (без путей, только имена)
if not os.path.exists(rawseq_dir):
    raise FileNotFoundError(f"Папка 'rawseq' не найдена: {rawseq_dir}")

files_in_rawseq = set(os.listdir(rawseq_dir))

# 2. Загружаем и фильтруем данные из sequences.csv
if not os.path.exists(sequences_file):
    raise FileNotFoundError(f"Файл 'sequences.csv' не найден: {sequences_file}")

df = pd.read_csv(sequences_file)

# Фильтруем: исключаем строки, где seqResult == "contam"
valid_sequences = df[df["seqResult"] != "contam"]
trace_files_in_csv = set(valid_sequences["traceFileName"].dropna().unique())

# 3. Сравниваем
only_in_rawseq = files_in_rawseq - trace_files_in_csv
only_in_csv = trace_files_in_csv - files_in_rawseq

# 4. Выводим результаты
print("Файлы, которые есть в rawseq, но отсутствуют в sequences.csv (исключая 'contam'):")
print(only_in_rawseq)

print("\nФайлы, которые есть в sequences.csv (исключая 'contam'), но отсутствуют в rawseq:")
print(only_in_csv)

# Проверка на полное совпадение
if not only_in_rawseq and not only_in_csv:
    print("\n✅ Все имена файлов совпадают (с учётом фильтрации)!")
else:
    print("\n❌ Есть расхождения (см. выше).")

# (Опционально) Сохранение отчёта в файл
report = [
    "Отчёт о сравнении файлов:",
    "---",
    "Файлы только в rawseq:",
    *only_in_rawseq,
    "",
    "Файлы только в sequences.csv (исключая 'contam'):",
    *only_in_csv,
    "",
    f"Совпадающих файлов: {len(files_in_rawseq & trace_files_in_csv)}",
    f"Файлов только в rawseq: {len(only_in_rawseq)}",
    f"Файлов только в sequences.csv: {len(only_in_csv)}"
]

with open("file_comparison_report.txt", "w") as f:
    f.write("\n".join(report))

# Конвертация исходных файлов и анализ качества

In [None]:
# Конвертация с сохранением полного имени и упаковкой в .gz

import os
import gzip
from Bio import SeqIO

# Указываем рабочую директорию с .ab1 файлами
work_dir = "/content/drive/MyDrive/Colab Notebooks/IT_expedition/CortDB/rawseq"
os.chdir(work_dir)  # Переходим в нужную папку

print(f"Текущая директория: {os.getcwd()}")
print(f"Файлы в директории: {os.listdir()}")

# Конвертация .ab1 в .fastq.gz
for filename in os.listdir(work_dir):
    if filename.endswith(".ab1"):
        fastq_name = filename.replace(".ab1", ".fastq")
        gz_name = fastq_name + ".gz"
        ab1_path = os.path.join(work_dir, filename)  # Полный путь к .ab1 файлу

        print(f"Обработка файла: {filename}")

        try:
            with gzip.open(gz_name, "wt") as gz_handle:
                for record in SeqIO.parse(ab1_path, "abi"):
                    record.id = filename.replace(".ab1", "")
                    record.description = ""
                    SeqIO.write(record, gz_handle, "fastq")
            print(f"Успешно конвертирован: {filename} → {gz_name}")
        except Exception as e:
            print(f"Ошибка при обработке {filename}: {str(e)}")

# Проверка результатов
print("\nРезультаты конвертации:")
fastq_gz_files = [f for f in os.listdir() if f.endswith('.fastq.gz')]
if fastq_gz_files:
    print("Найдены сжатые FASTQ файлы:")
    for f in fastq_gz_files:
        print(f)
    print("\nПример содержимого первого файла:")
    !zcat {fastq_gz_files[0]} | head -n 4

    # Упаковываем в ZIP для скачивания
    !zip -r converted_fastq_gz.zip *.fastq.gz
    print("\nСкачать архив:")
    from google.colab import files
    files.download("converted_fastq_gz.zip")
else:
    print("Не найдено ни одного .fastq.gz файла. Возможные причины:")
    print("1. В директории нет .ab1 файлов")
    print("2. Возникли ошибки при конвертации")
    print(f"Проверьте содержимое папки: {work_dir}")
    print("Содержимое папки:")
    !ls -la

In [None]:
# оценка качества сиквенсов
import pandas as pd
from Bio import SeqIO
import gzip
import os

# Путь к папке с .fastq.gz файлами
work_dir = "/content/drive/MyDrive/Colab Notebooks/IT_expedition/CortDB/rawseq"
os.chdir(work_dir)

# Сбор данных по ВСЕМ ридам
all_data = []
for filename in os.listdir():
    if filename.endswith(".fastq.gz"):
        with gzip.open(filename, "rt") as handle:
            for record in SeqIO.parse(handle, "fastq"):
                mean_q = sum(record.letter_annotations["phred_quality"]) / len(record.seq)
                all_data.append({
                    "file": filename,
                    "sequence_id": record.id,
                    "length": len(record.seq),
                    "mean_quality": mean_q
                })

# Создание DataFrame
df = pd.DataFrame(all_data)

# Сохранение данных о качестве (отдельный файл)
df.to_csv("all_sequences_quality.csv", index=False)
print("Данные о качестве последовательностей сохранены в all_sequences_quality.csv")

# Загрузка метаданных
metadata_path = "/content/drive/MyDrive/Colab Notebooks/IT_expedition/CortDB/sequences.csv"
metadata_df = pd.read_csv(metadata_path)

# Создаем ключ для слияния - удаляем расширения
df['merge_key'] = df['file'].str.replace('.fastq.gz', '')
metadata_df['merge_key'] = metadata_df['traceFileName'].str.replace('.ab1', '')

# Добавляем колонки о качестве в метаданные
for col in ['length', 'mean_quality']:
    if col in metadata_df.columns:
        metadata_df.drop(col, axis=1, inplace=True)  # Удаляем если уже есть

# Слияние данных
metadata_df = metadata_df.merge(
    df[['merge_key', 'length', 'mean_quality']],
    on='merge_key',
    how='left'
)

# Обновляем поле seqResult
metadata_df['seqResult'] = metadata_df.apply(
    lambda row: 'seq failed' if pd.notna(row['mean_quality']) and row['mean_quality'] < 20 else row['seqResult'],
    axis=1
)

# Удаляем временный столбец merge_key
metadata_df.drop('merge_key', axis=1, inplace=True)

# Перезаписываем исходный файл метаданных (сначала делаем backup)
backup_path = metadata_path.replace('.csv', '_backup.csv')
metadata_df.to_csv(backup_path, index=False)
print(f"\nСоздана резервная копия метаданных: {backup_path}")

metadata_df.to_csv(metadata_path, index=False)
print(f"Исходные метаданные обновлены: {metadata_path}")

# Проверка
print("\nПервые 5 строк обновленных метаданных:")
display(metadata_df.head())

In [None]:
# визуализация качества исходных сиквенсов
import matplotlib.pyplot as plt
from matplotlib.patches import Patch
import numpy as np
import os
from Bio import SeqIO
import gzip
from tqdm.notebook import tqdm

# Убедитесь, что df и low_q_df определены перед использованием
# df = ... ваш код загрузки данных ...
# low_q_df = df[df["mean_quality"] < 20]

# Создаем фигуру с тремя областями графиков
plt.figure(figsize=(16, 12))

# =============================================
# График 1: Распределение качества (верхний левый)
# =============================================
plt.subplot(2, 2, 1)

# Гистограмма качества
n_qual, bins_qual, patches_qual = plt.hist(df["mean_quality"], bins=50, color="steelblue")

# Закрашиваем столбцы с Q < 20
for patch, bin_edge in zip(patches_qual, bins_qual):
    if bin_edge < 20:
        patch.set_facecolor('firebrick')
    else:
        patch.set_facecolor('steelblue')

plt.axvline(x=20, color="black", linestyle="--", linewidth=1.5)
plt.xlabel("Средний Phred score качества", fontsize=12)
plt.ylabel("Количество ридов", fontsize=12)
plt.title("Распределение качества ридов", fontsize=14, pad=20)
plt.grid(True, alpha=0.3)

# Легенда для качества
legend_elements_qual = [
    Patch(facecolor='steelblue', label='Q ≥ 20 (хорошее)'),
    Patch(facecolor='firebrick', label='Q < 20 (низкое)'),
    Patch(facecolor='none', edgecolor='black', linestyle='--',
          label='Порог качества (Q=20)')
]
plt.legend(handles=legend_elements_qual, fontsize=10)

# =============================================
# График 2: Распределение длин (верхний правый)
# =============================================
plt.subplot(2, 2, 2)

# Автоматическое определение разумных бинов для длины
len_bins = np.linspace(df["length"].min(), df["length"].max(), 50)

# Гистограмма длин
n_len, bins_len, patches_len = plt.hist(df["length"], bins=len_bins, color="forestgreen")

# Вычисляем 5-й и 95-й перцентили для выделения
len_5th = df["length"].quantile(0.05)
len_95th = df["length"].quantile(0.95)

# Закрашиваем выбросы
for patch, bin_edge in zip(patches_len, bins_len):
    if bin_edge < len_5th or bin_edge > len_95th:
        patch.set_facecolor('goldenrod')
    else:
        patch.set_facecolor('forestgreen')

plt.axvline(x=len_5th, color="black", linestyle=":", linewidth=1)
plt.axvline(x=len_95th, color="black", linestyle=":", linewidth=1)
plt.xlabel("Длина рида (нуклеотиды)", fontsize=12)
plt.ylabel("Количество ридов", fontsize=12)
plt.title("Распределение длин ридов", fontsize=14, pad=20)
plt.grid(True, alpha=0.3)

# Легенда для длин
legend_elements_len = [
    Patch(facecolor='forestgreen', label='Основной диапазон'),
    Patch(facecolor='goldenrod', label='5%/95% перцентили'),
    Patch(facecolor='none', edgecolor='black', linestyle=':',
          label='Границы перцентилей')
]
plt.legend(handles=legend_elements_len, fontsize=10)

# =============================================
# График 3: Профиль качества (нижний, на всю ширину)
# =============================================
plt.subplot(2, 1, 2)

# Определяем путь к директории с fastq файлами
work_dir = "/content/drive/MyDrive/Colab Notebooks/IT_expedition/CortDB/rawseq"

# Сбор данных о качестве
quality_profiles = []
max_observed_q = 0

for filename in tqdm([f for f in os.listdir(work_dir) if f.endswith(".fastq.gz")],
                   desc="Анализ качества"):
    with gzip.open(os.path.join(work_dir, filename), "rt") as handle:
        for record in SeqIO.parse(handle, "fastq"):
            quals = record.letter_annotations["phred_quality"]
            quality_profiles.append(quals)
            current_max = max(quals)
            if current_max > max_observed_q:
                max_observed_q = current_max

# Определение параметров графика
y_max = max(60, max_observed_q + 5)  # Минимум до 60, с запасом +5
x_max = max(len(q) for q in quality_profiles)

# Усреднение по позициям (с обработкой разных длин)
avg_quality = []
for pos in range(x_max):
    pos_qualities = [q[pos] for q in quality_profiles if pos < len(q)]
    avg_quality.append(np.mean(pos_qualities) if pos_qualities else np.nan)

# Построение графика
plt.plot(avg_quality, color='royalblue', linewidth=2.5, label='Среднее качество')

# Горизонтальные линии порогов
thresholds = {
    'Отлично (Q≥40)': {'value': 40, 'color': 'darkgreen', 'linestyle': '--'},
    'Хорошо (Q≥30)': {'value': 30, 'color': 'limegreen', 'linestyle': ':'},
    'Приемлемо (Q≥20)': {'value': 20, 'color': 'gold', 'linestyle': '-.'},
    'Плохо (Q<20)': {'value': 10, 'color': 'red', 'linestyle': '--'}
}

for label, params in thresholds.items():
    plt.axhline(y=params['value'], color=params['color'],
               linestyle=params['linestyle'], alpha=0.7, label=label)

# Настройки графика
plt.title('Профиль качества сиквенсов', fontsize=16, pad=20)
plt.xlabel('Позиция в риде', fontsize=14)
plt.ylabel('Phred score', fontsize=14)
plt.ylim(0, y_max)
plt.xlim(0, x_max)

# Легенда и сетка
plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
plt.grid(alpha=0.2)

# Добавление аннотации с максимальным качеством
plt.annotate(f'Макс. качество: Q{max_observed_q}',
            xy=(0.02, 0.95), xycoords='axes fraction',
            fontsize=12, bbox=dict(boxstyle='round', alpha=0.1))

# =============================================
# Общие настройки
# =============================================
plt.tight_layout(pad=3.0)

# Добавляем общий заголовок
plt.suptitle("Анализ качества и длины последовательностей",
             fontsize=16, y=1.02)

plt.show()

# Вывод статистики
print("\nСтатистика по качеству:")
print(f"• Среднее качество: {df['mean_quality'].mean():.1f}")
print(f"• Доля ридов с Q < 20: {len(low_q_df)/len(df)*100:.1f}%")

print("\nСтатистика по длинам:")
print(f"• Средняя длина: {df['length'].mean():.1f} нт")
print(f"• 5-й перцентиль: {len_5th:.1f} нт")
print(f"• 95-й перцентиль: {len_95th:.1f} нт")

print(f"\nСтатистика профиля качества:\nМаксимальное качество: Q{max_observed_q}\n"
      f"Среднее качество: {np.nanmean(avg_quality):.1f}\n"
      f"Длина ридов: {x_max} п.н.")

# Обработка сиквенсов

In [None]:
# Обработка сиквенсов (обрезка, слияние) и экспорт всех фаста файлов по отдельности и вместе и запись последовательностей в таблицу метаданных

import os
import gzip
import shutil
import pandas as pd
import numpy as np
from Bio import SeqIO, pairwise2
from Bio.Seq import Seq
from Bio.SeqRecord import SeqRecord
from collections import defaultdict
from tqdm import tqdm
from google.colab import files as colab_files
from difflib import SequenceMatcher

# ======================
# КОНФИГУРАЦИЯ
# ======================
QUALITY_PROFILES = {
    'strict': {
        'min_length': 50,
        'min_avg_quality': 20,
        'min_end_quality': 15,
        'min_overlap': 20,
        'window_size': 5
    },
    'moderate': {
        'min_length': 40,
        'min_avg_quality': 15,
        'min_end_quality': 10,
        'min_overlap': 15,
        'window_size': 5
    },
    'loose': {
        'min_length': 30,
        'min_avg_quality': 10,
        'min_end_quality': 7,
        'min_overlap': 10,
        'window_size': 5
    }
}

CURRENT_PROFILE = QUALITY_PROFILES['strict']

input_dir = "/content/drive/MyDrive/Colab Notebooks/IT_expedition/CortDB/rawseq"
output_dir = "/content/drive/MyDrive/Colab Notebooks/IT_expedition/CortDB/"
os.makedirs(output_dir, exist_ok=True)
original_dir = "/content/drive/MyDrive/Colab Notebooks/IT_expedition/CortDB"

# ======================
# ОСНОВНЫЕ ФУНКЦИИ
# ======================
def load_filtered_metadata(metadata_path):
    """Загружает и фильтрует метаданные по seqResult"""
    df = pd.read_csv(metadata_path)
    print(f"Всего записей в метаданных: {len(df)}")

def load_filtered_metadata(metadata_path):
    """Загружает и фильтрует метаданные по seqResult и pcrResult"""
    df = pd.read_csv(metadata_path)
    print(f"Всего записей в метаданных: {len(df)}")

    # Фильтрация по seqResult и pcrResult
    filtered_df = df[
        (~df['seqResult'].isin(['seq failed', 'pcr failed']))
    ]
    print(f"Осталось после фильтрации: {len(filtered_df)} (исключены seq failed и pcr failed)")

    return filtered_df

def calculate_sequence_quality(sequence):
    """Вычисляет характеристики качества последовательности"""
    if not sequence or not isinstance(sequence, str):
        return {
            'seq_length': 0,
            'n_count': 0,
            'n_percentage': 100.0,
            'quality_score': 0.0
        }

    seq_length = len(sequence)
    n_count = sequence.upper().count('N')
    n_percentage = (n_count / seq_length) * 100 if seq_length > 0 else 100.0
    quality_score = (seq_length * 0.4) - (n_percentage * 0.6)

    return {
        'seq_length': seq_length,
        'n_count': n_count,
        'n_percentage': round(n_percentage, 2),
        'quality_score': round(quality_score, 2)
    }

def process_sanger_read(filepath, params):
    """Обработка рида с учетом параметров качества"""
    try:
        with gzip.open(filepath, 'rt') as f:
            for record in SeqIO.parse(f, 'fastq'):
                quals = record.letter_annotations["phred_quality"]

                # Проверка среднего качества
                if np.mean(quals) < params['min_avg_quality']:
                    continue

                # Определение границ хорошего качества
                start = next((i for i in range(len(quals)-params['window_size'])
                            if np.mean(quals[i:i+params['window_size']]) >= params['min_end_quality']), 0)
                end = next((i for i in range(len(quals)-1, params['window_size'], -1)
                          if np.mean(quals[i-params['window_size']:i]) >= params['min_end_quality']), len(quals))

                # Обрезка и проверка длины
                trimmed = record[start:end]
                if len(trimmed) >= params['min_length']:
                    return trimmed
    except Exception as e:
        print(f"Ошибка обработки {filepath}: {str(e)}")
    return None

def assemble_contigs(f_read, r_read, sample_id, params):
    """Собирает контиги из прямого и обратного ридов"""
    try:
        f_seq = str(f_read.seq)
        r_seq = str(r_read.reverse_complement().seq)

        # Выравнивание последовательностей
        alignments = pairwise2.align.globalms(
            f_seq, r_seq, 2, -3, -5, -2, one_alignment_only=True
        )

        if not alignments:
            return None

        # Построение консенсусной последовательности
        aligned_f, aligned_r = alignments[0][0], alignments[0][1]
        consensus = []
        overlap_len = 0

        for f_base, r_base in zip(aligned_f, aligned_r):
            if f_base == '-' or r_base == '-':
                continue
            overlap_len += 1
            consensus.append(f_base if f_base == r_base else f_base)

        # Проверка минимального перекрытия
        if overlap_len >= params['min_overlap']:
            return SeqRecord(
                Seq(''.join(consensus)),
                id=f"contig_{sample_id}",
                description=f"len={len(consensus)}"
            )
    except Exception as e:
        print(f"Ошибка сборки {sample_id}: {str(e)}")
    return None

def process_sample(sample_id, f_files, r_files, params, df_meta):
    """Обрабатывает один образец"""
    # Получаем ручную последовательность (если есть)
    manual_seq = df_meta[df_meta['catNumber'] == sample_id]['Sequence'].iloc[0] if not df_meta[df_meta['catNumber'] == sample_id].empty else None

    # Получаем лучшие риды для каждого направления
    best_f = None
    best_r = None

    for file in f_files:
        read = process_sanger_read(os.path.join(input_dir, file.replace('.ab1', '.fastq.gz')), params)
        if read and (best_f is None or np.mean(read.letter_annotations["phred_quality"]) >
                     np.mean(best_f.letter_annotations["phred_quality"])):
            best_f = read

    for file in r_files:
        read = process_sanger_read(os.path.join(input_dir, file.replace('.ab1', '.fastq.gz')), params)
        if read and (best_r is None or np.mean(read.letter_annotations["phred_quality"]) >
                     np.mean(best_r.letter_annotations["phred_quality"])):
            best_r = read

    # Сборка контига или использование одиночных ридов
    result = {
        'sample_id': sample_id,
        'result_type': 'failed',
        'auto_sequence': None,
        'identity_percent': None,
        'sequence_preview': None
    }

    if best_f and best_r:
        contig = assemble_contigs(best_f, best_r, sample_id, params)
        if contig:
            result['auto_sequence'] = str(contig.seq)
            result['result_type'] = 'contig'
            SeqIO.write(contig, os.path.join(output_dir, f"{sample_id}_contig.fasta"), "fasta")

    if not result['auto_sequence']:
        if best_f:
            result['auto_sequence'] = str(best_f.seq)
            result['result_type'] = 'F_only'
            SeqIO.write(best_f, os.path.join(output_dir, f"{sample_id}_F.fasta"), "fasta")
        elif best_r:
            result['auto_sequence'] = str(best_r.seq)
            result['result_type'] = 'R_only'
            SeqIO.write(best_r, os.path.join(output_dir, f"{sample_id}_R.fasta"), "fasta")

    # Сравнение с ручной последовательностью
    if result['auto_sequence'] and manual_seq:
        identity, preview = compare_sequences(result['auto_sequence'], manual_seq)
        result['identity_percent'] = identity
        result['sequence_preview'] = preview

    # Расчет характеристик качества
    quality_metrics = calculate_sequence_quality(result['auto_sequence'])
    result.update(quality_metrics)

    return result

def compare_sequences(auto_seq, manual_seq):
    """Сравнение последовательностей и расчет идентичности"""
    if pd.isna(manual_seq) or not manual_seq or not isinstance(manual_seq, str):
        return None, None

    if not auto_seq:
        return 0.0, None

    manual_seq = manual_seq.upper().replace(' ', '').replace('\n', '')
    auto_seq = auto_seq.upper().replace(' ', '').replace('\n', '')

    if not manual_seq or not auto_seq:
        return 0.0, None

    aligner = pairwise2.align.globalxx(auto_seq, manual_seq)
    if not aligner:
        return 0.0, None

    best_aln = aligner[0]
    matches = sum(a == b for a, b in zip(best_aln[0], best_aln[1]))
    identity = matches / max(len(best_aln[0]), len(best_aln[1]))

    preview = f"Auto: {auto_seq[:50]}...\nManual: {manual_seq[:50]}..."
    return round(identity * 100, 2), preview

def main(metadata_path):
    """Основная функция обработки"""
    # Загрузка и фильтрация метаданных
    df_meta = load_filtered_metadata(metadata_path)

    # Группировка файлов по образцам и направлениям
    samples = defaultdict(lambda: {'F': [], 'R': []})

    for _, row in df_meta.iterrows():
        # Проверяем наличие направления и catNumber
        if pd.isna(row['direction']) or pd.isna(row['catNumber']):
            print(f"Пропуск записи с пустым direction или catNumber: {row}")
            continue

        try:
            samples[row['catNumber']][row['direction']].append(row['traceFileName'])
        except KeyError as e:
            print(f"Ошибка обработки строки: {row}")
            print(f"Некорректное значение направления: {row['direction']}")
            continue

    # Остальной код остается без изменений...
    # Обработка каждого образца
    results = []
    all_sequences = []

    for sample_id, files in tqdm(samples.items(), desc="Обработка образцов"):
        result = process_sample(sample_id, files['F'], files['R'], CURRENT_PROFILE, df_meta)
        results.append(result)

        if result['auto_sequence']:
            record = SeqRecord(
                Seq(result['auto_sequence']),
                id=f"{sample_id}_{result['result_type']}",
                description=f"identity={result['identity_percent']}%"
            )
            all_sequences.append(record)

    # Сохранение результатов
    save_results(results, all_sequences, df_meta)
def save_results(results, all_sequences, original_meta):
    """Сохраняет все результаты обработки"""
    # Сохранение объединенного FASTA
    combined_fasta = os.path.join(output_dir, "all_sequences.fasta")
    with open(combined_fasta, "w") as output_handle:
        SeqIO.write(all_sequences, output_handle, "fasta")

    # Создание итоговой таблицы
    df_results = pd.DataFrame(results)
    df_final = pd.merge(
        original_meta,
        df_results,
        left_on='catNumber',
        right_on='sample_id',
        how='left'
    )

    # Категоризация качества
    df_final['quality_category'] = pd.cut(
        df_final['quality_score'],
        bins=[-float('inf'), 0, 150, 200, 250, float('inf')],
        labels=['Failed', 'Low', 'Medium', 'Good', 'Excellent']
    )

    # Сохранение CSV
    output_csv = os.path.join(output_dir, "comparison_results_with_quality.csv")
    original_output_csv = os.path.join(original_dir, "comparison_results_with_quality.csv")
    df_final.to_csv(output_csv, index=False)
    df_final.to_csv(original_output_csv, index=False)

    # Архивирование
    shutil.make_archive("sanger_results", 'zip', output_dir)
    colab_files.download("sanger_results.zip")

    # Вывод статистики
    print_statistics(df_final)

def print_statistics(df):
    """Выводит статистику обработки"""
    print("\n=== СТАТИСТИКА ОБРАБОТКИ ===")
    print(f"Всего образцов: {len(df)}")
    print(f"Успешно собрано контигов: {len(df[df['result_type'] == 'contig'])}")
    print(f"Только прямой рид: {len(df[df['result_type'] == 'F_only'])}")
    print(f"Только обратный рид: {len(df[df['result_type'] == 'R_only'])}")
    print(f"Не удалось обработать: {len(df[df['result_type'] == 'failed'])}")

    print("\n=== КАЧЕСТВО ПОСЛЕДОВАТЕЛЬНОСТЕЙ ===")
    print(f"Средняя длина: {df['seq_length'].mean():.1f} bp")
    print(f"Среднее количество N: {df['n_count'].mean():.1f}")
    print(f"Средний процент N: {df['n_percentage'].mean():.1f}%")
    print("\nРаспределение по категориям качества:")
    print(df['quality_category'].value_counts())

    if 'identity_percent' in df.columns:
        valid_comparisons = df['identity_percent'].notna()
        print(f"\nСредняя идентичность с ручными данными: {df[valid_comparisons]['identity_percent'].mean():.1f}%")

if __name__ == "__main__":
    metadata_path = "/content/drive/MyDrive/Colab Notebooks/IT_expedition/CortDB/sequences.csv"
    main(metadata_path)

In [None]:
# визуализация качества полученных фаста файлов

import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import os
from time import time

# Конфигурация
INPUT_CSV = "/content/drive/MyDrive/Colab Notebooks/IT_expedition/CortDB/comparison_results_with_quality.csv"
OUTPUT_DIR = "/content/drive/MyDrive/Colab Notebooks/IT_expedition/CortDB/quality_plots/"
os.makedirs(OUTPUT_DIR, exist_ok=True)

start_time = time()

# 1. Загрузка данных
df = pd.read_csv(INPUT_CSV, usecols=[
    'seq_length', 'n_percentage',
    'quality_score', 'quality_category',
    'result_type'
])

# 2. Настройка стиля
sns.set_theme(style="whitegrid")
plt.figure(figsize=(16, 12))

# --- ВЕРХНИЙ РЯД: ГИСТОГРАММЫ ---

# График 1: Распределение длин
plt.subplot(2, 2, 1)
sns.histplot(df['seq_length'], bins=30, kde=False, color='royalblue')
plt.title(f"Распределение длин последовательностей\n(Среднее: {df['seq_length'].mean():.1f} bp)")
plt.xlabel("Длина (bp)")
plt.ylabel("Количество")

# График 2: Распределение Quality Score
plt.subplot(2, 2, 2)
sns.histplot(df['quality_score'], bins=30, kde=True, color='purple')
plt.title(f"Распределение Quality Score\n(Среднее: {df['quality_score'].mean():.1f})")
plt.xlabel("Quality Score")
plt.ylabel("Частота")

# --- НИЖНИЙ РЯД: КРУГОВЫЕ ДИАГРАММЫ ---

# График 3: Категории качества
plt.subplot(2, 2, 3)
quality_counts = df['quality_category'].value_counts()
plt.pie(quality_counts,
        labels=quality_counts.index,
        autopct='%1.1f%%',
        startangle=90,
        colors=sns.color_palette("husl", len(quality_counts)),
        textprops={'fontsize': 10})
plt.title("Категории качества", pad=20)

# График 4: Типы результатов сборки
plt.subplot(2, 2, 4)
if 'result_type' in df.columns:
    result_counts = df['result_type'].value_counts()
    plt.pie(result_counts,
            labels=result_counts.index,
            autopct='%1.1f%%',
            startangle=90,
            colors=sns.color_palette("Set2", len(result_counts)),
            textprops={'fontsize': 10})
    plt.title("Типы результатов сборки", pad=20)
else:
    plt.text(0.5, 0.5, 'Данные о типах сборки отсутствуют',
             ha='center', va='center', fontsize=10)
    plt.title("Типы сборки недоступны", pad=20)

# Общие настройки
plt.tight_layout(pad=3.0)
plt.savefig(os.path.join(OUTPUT_DIR, "final_quality_metrics.png"),
            dpi=150, bbox_inches='tight')
plt.show()

# 3. Генерация отчета
execution_time = time() - start_time
stats_report = f"""
=== ОТЧЕТ ПО КАЧЕСТВУ ===
Время генерации: {execution_time:.2f} сек
Всего образцов: {len(df)}
--- Основные метрики ---
Средняя длина: {df['seq_length'].mean():.1f} ± {df['seq_length'].std():.1f} bp
Средний процент N: {df['n_percentage'].mean():.2f}% ± {df['n_percentage'].std():.2f}%
Средний Quality Score: {df['quality_score'].mean():.2f} ± {df['quality_score'].std():.2f}
--- Распределение по категориям ---
{df['quality_category'].value_counts().to_string()}
"""

print(stats_report)
with open(os.path.join(OUTPUT_DIR, "quality_report.txt"), 'w') as f:
    f.write(stats_report)

# Работа с данными классификации BLASTN

In [None]:
# парсинг html файла результатов BLASTN из PlutoF|UNITE

import pandas as pd
from bs4 import BeautifulSoup
import os

def parse_blastn_html(html_file_path):
    try:
        with open(html_file_path, 'r', encoding='utf-8') as f:
            soup = BeautifulSoup(f, 'html.parser')

        queries = []

        # Находим все блоки с запросами
        query_blocks = soup.find_all('div', style=lambda x: x and 'margin-top: 20px' in x)

        for block in query_blocks:
            # Извлекаем информацию о запросе
            query_text = block.get_text(strip=True)
            if 'Query' in query_text:
                query_num = query_text.split('Query')[1].split('of')[0].strip()
                query_name = query_text.split(':')[1].split('identity=')[0].strip()
            else:
                continue

            # Находим следующую таблицу после блока запроса
            table = block.find_next('table')
            if not table:
                continue

            hits = []
            # Обрабатываем строки таблицы (пропускаем заголовок)
            for row in table.find_all('tr')[1:]:
                cols = row.find_all('td')
                if len(cols) >= 13:  # Проверяем, что есть все необходимые колонки
                    hit = {
                        'Reference': cols[0].get_text(strip=True),
                        'SH:0.5%': cols[1].get_text(strip=True),
                        'SH:1.0%': cols[2].get_text(strip=True),
                        'SH:1.5%': cols[3].get_text(strip=True),
                        'Taxon name': cols[4].get_text(strip=True),
                        'Score': cols[5].get_text(strip=True),
                        'E-value': cols[6].get_text(strip=True),
                        'Prcnt': cols[7].get_text(strip=True),
                        'MisM': cols[8].get_text(strip=True),
                        'Qstart': cols[9].get_text(strip=True),
                        'Qend': cols[10].get_text(strip=True),
                        'Rstart': cols[11].get_text(strip=True),
                        'Rend': cols[12].get_text(strip=True)
                    }
                    hits.append(hit)

            if hits:
                queries.append({
                    'query_num': query_num,
                    'query_name': query_name,
                    'hits': hits
                })

        # Преобразуем в плоскую таблицу
        rows = []
        for query in queries:
            for hit in query['hits']:
                row = {
                    'Query': query['query_name'],
                    'Reference': hit['Reference'],
                    'SH:0.5%': hit['SH:0.5%'],
                    'SH:1.0%': hit['SH:1.0%'],
                    'SH:1.5%': hit['SH:1.5%'],
                    'Taxon name': hit['Taxon name'],
                    'Score': hit['Score'],
                    'E-value': hit['E-value'],
                    'Prcnt': hit['Prcnt'],
                    'MisM': hit['MisM'],
                    'Qstart': hit['Qstart'],
                    'Qend': hit['Qend'],
                    'Rstart': hit['Rstart'],
                    'Rend': hit['Rend']
                }
                rows.append(row)

        if not rows:
            raise ValueError("Не найдено ни одного результата BLAST в файле")

        return pd.DataFrame(rows)

    except Exception as e:
        print(f"Ошибка при разборе файла: {str(e)}")
        raise

# Основной код выполнения
try:
    blast_html_path = "/content/drive/MyDrive/Colab Notebooks/IT_expedition/CortDB/BlastN.html"
    print(f"Обработка файла: {blast_html_path}")

    if not os.path.exists(blast_html_path):
        raise FileNotFoundError(f"Файл не найден: {blast_html_path}")

    df = parse_blastn_html(blast_html_path)

    # Сохранение результатов
    output_csv_path = os.path.join(os.path.dirname(blast_html_path), "blast_results.csv")
    df.to_csv(output_csv_path, index=False)

    print(f"\nУспешно обработано:")
    print(f"Всего запросов: {df['Query'].nunique()}")
    print(f"Всего результатов: {len(df)}")
    print(f"CSV сохранен: {output_csv_path}")

    # Проверка первых строк
    print("\nПервые 3 строки результатов:")
    print(df.head(3))

except Exception as e:
    print(f"\nКритическая ошибка: {str(e)}")
    print("Рекомендации:")
    print("1. Проверьте формат входного файла")
    print("2. Убедитесь, что файл содержит результаты BLAST в HTML-таблицах")
    print("3. Если проблема сохраняется, предоставьте полный файл для анализа")

In [None]:
# выгрузка типовых образцов паутинников из NCBI

from Bio import Entrez
import time
import pandas as pd
from urllib.error import HTTPError

Entrez.email = "filippova.courlee.nina@gmail.com"  # Критически важно указать реальный email
NCBI_DELAY = 3  # Задержка между запросами (сек)
MAX_RETRIES = 5  # Максимальное число попыток

def safe_entrez_request(func, *args, **kwargs):
    """Обертка для запросов к Entrez с повторами при ошибках"""
    for attempt in range(MAX_RETRIES):
        try:
            time.sleep(NCBI_DELAY)
            return func(*args, **kwargs)
        except HTTPError as e:
            if e.code == 429:
                wait = (attempt + 1) * 10  # Увеличиваем задержку
                print(f"Слишком много запросов. Ждем {wait} сек...")
                time.sleep(wait)
            else:
                raise
    raise Exception(f"Не удалось выполнить запрос после {MAX_RETRIES} попыток")

# 1. Поиск типовых образцов Cortinarius
try:
    print("Поиск типовых последовательностей...")
    handle = safe_entrez_request(
        Entrez.esearch,
        db="nucleotide",
        term='"Cortinarius"[Organism] AND "type material"[Filter]',
        retmax=1000,
        usehistory="y"
    )
    search_results = Entrez.read(handle)
    handle.close()

    # Получаем ID через History (для больших наборов)
    webenv = search_results["WebEnv"]
    query_key = search_results["QueryKey"]
    count = int(search_results["Count"])
    print(f"Найдено {count} типовых образцов")

    # 2. Получение метаданных порциями
    batch_size = 100
    all_records = []

    for start in range(0, count, batch_size):
        print(f"Обработка записей {start+1}-{min(start+batch_size, count)}")

        handle = safe_entrez_request(
            Entrez.efetch,
            db="nucleotide",
            rettype="gb",
            retmode="xml",
            retstart=start,
            retmax=batch_size,
            webenv=webenv,
            query_key=query_key
        )
        records = Entrez.read(handle)
        all_records.extend(records)
        handle.close()

    # 3. Извлечение accession numbers
    type_accessions = []
    for record in all_records:
        acc = record['GBSeq_primary-accession']
        organism = record['GBSeq_organism']

        # Проверка типа материала
        is_type = False
        for feature in record.get('GBSeq_feature-table', []):
            if feature.get('GBFeature_key') == 'source':
                for qual in feature.get('GBFeature_quals', []):
                    if qual.get('GBQualifier_name') == 'type_material':
                        is_type = True
                        break

        if is_type:
            type_accessions.append(acc)
            print(f"{acc}: {organism} (ТИПОВОЙ)")

    # 4. Сохранение результатов
    df = pd.DataFrame({'Accession': type_accessions})
    df.to_csv('Cortinarius_type_materials.csv', index=False)
    print(f"\nСохранено {len(type_accessions)} типовых accession numbers в Cortinarius_type_materials.csv")

except Exception as e:
    print(f"Ошибка: {str(e)}")

In [None]:
# выбор лучшей идентицикации (best hit) по приоритету типовой образец > лучшей score
import pandas as pd
from tqdm.auto import tqdm
from collections import defaultdict

# ======================
# НАСТРОЙКИ
# ======================
INPUT_PATH = "/content/drive/MyDrive/Colab Notebooks/IT_expedition/CortDB/blast_results.csv"
TYPES_DB_PATH = "Cortinarius_type_materials.csv"
SKIP_PREFIXES = ['UDB', 'MZ', 'KU', 'HQ', 'KM', 'MT', 'MH', 'DQ']

# ======================
# ФУНКЦИИ
# ======================
def load_type_database():
    """Загружает базу типовых образцов и возвращает set"""
    type_db = pd.read_csv(TYPES_DB_PATH)
    return set(type_db['Accession'].astype(str).str.strip().values)

def is_species_level(taxon_name):
    """Проверяет, является ли таксон бинарным названием (видом)"""
    if pd.isna(taxon_name):
        return False
    parts = str(taxon_name).strip().split()
    return len(parts) >= 2 and parts[1] not in ['sp.', 'sp', 'cf.', 'aff.']

def select_best_hits(df):
    """
    Выбирает лучшие хиты для каждого Query с приоритетом:
    1. Типовые образцы с определением до вида
    2. Максимальный Score среди видовых определений
    """
    # Фильтруем только видовые определения
    species_hits = df[df['Taxon name'].apply(is_species_level)].copy()

    # Сортировка по приоритетам
    species_hits.sort_values(['Query', 'is_type', 'Score'],
                           ascending=[True, False, False], inplace=True)

    # Выбор лучшего хита для каждого Query
    best_hits = species_hits.groupby('Query').first().reset_index()

    # Добавляем список всех уникальных видовых таксонов для каждого Query
    unique_taxa = species_hits.groupby('Query')['Taxon name'].unique().apply(
        lambda x: '|'.join(sorted(set(x), key=str)))
    best_hits['All_Taxa'] = best_hits['Query'].map(unique_taxa)

    return best_hits

# ======================
# ОСНОВНОЙ БЛОК
# ======================
def main():
    try:
        # 1. Загрузка данных
        print("Загрузка данных BLAST...")
        blast_df = pd.read_csv(INPUT_PATH)
        print(f"Загружено строк: {len(blast_df)}")

        # 2. Загрузка базы типовых образцов
        print("Загрузка базы типовых образцов...")
        type_accessions = load_type_database()
        print(f"Загружено {len(type_accessions)} типовых последовательностей")

        # 3. Фильтрация проблемных accession
        blast_df = blast_df[~blast_df['Reference'].astype(str).str.startswith(tuple(SKIP_PREFIXES))]
        print(f"Осталось после фильтрации: {len(blast_df)}")

        # 4. Добавление меток типовых образцов
        blast_df['is_type'] = blast_df['Reference'].astype(str).isin(type_accessions)

        # 5. Выбор лучших хитов (только видовые определения)
        print("Выбор лучших хитов (только видовые определения)...")
        best_hits_df = select_best_hits(blast_df)

        # 6. Сохранение результатов
        output_path = INPUT_PATH.replace(".csv", "_best_species_hits.csv")
        best_hits_df.to_csv(output_path, index=False)

        # 7. Статистика
        print("\n" + "="*50)
        print("ФИНАЛЬНЫЕ РЕЗУЛЬТАТЫ")
        print(f"Всего запросов: {len(best_hits_df)}")
        print(f"Найдено типовых образцов: {best_hits_df['is_type'].sum()}")
        print(f"Уникальных видов: {best_hits_df['All_Taxa'].nunique()}")
        print(f"Сохранено в: {output_path}")
        print("="*50)

        # Примеры типовых образцов
        type_samples = best_hits_df[best_hits_df['is_type']].head(3)
        if not type_samples.empty:
            print("\nПримеры типовых образцов (определенных до вида):")
            print(type_samples[['Query', 'Reference', 'Taxon name', 'Score']])

    except Exception as e:
        print(f"\n!!! КРИТИЧЕСКАЯ ОШИБКА: {str(e)}")

if __name__ == "__main__":
    main()

In [None]:
# слияние best hit с исходной таблицей сиквенсов

import pandas as pd
import os

# Загрузка данных
output_dir = "/content/drive/MyDrive/Colab Notebooks/IT_expedition/CortDB"  # Замените на ваш путь
blast_best_path = os.path.join(output_dir, "blast_results_best_species_hits.csv")
comparison_path = os.path.join(output_dir, "comparison_results_with_quality.csv")

# 1. Обработка blast_best_hits.csv - создание catNumber
blast_df = pd.read_csv(blast_best_path)

# Извлекаем catNumber из Query (все до первого подчеркивания)
blast_df['catNumber'] = blast_df['Query'].str.split('_').str[0]

# Проверка
print("Уникальные catNumber в blast_best_hits:")
print(blast_df['catNumber'].unique()[:10])  # Покажем первые 10 для проверки

# 2. Загрузка comparison_results.csv
comparison_df = pd.read_csv(comparison_path)

# Проверка
print("\nУникальные catNumber в comparison_results:")
print(comparison_df['catNumber'].unique()[:10])

# 3. Объединение таблиц
# Левое соединение, чтобы сохранить все строки из comparison_results
merged_df = pd.merge(
    comparison_df,
    blast_df,
    on='catNumber',
    how='left',
    suffixes=('', '_blast')
)

# 4. Сохранение результата
merged_path = os.path.join(output_dir, "final_merged_results.csv")
merged_df.to_csv(merged_path, index=False)

print(f"\nОбъединение завершено. Результат сохранен в: {merged_path}")
print(f"Исходные размеры: comparison_results - {comparison_df.shape}, blast_best_hits - {blast_df.shape}")
print(f"Итоговый размер: {merged_df.shape}")

In [None]:
# сравнение машинной и ручной идентификации

import pandas as pd
import re
from difflib import SequenceMatcher
from tqdm import tqdm

# Настройки путей
INPUT_PATH = "/content/drive/MyDrive/Colab Notebooks/IT_expedition/CortDB/final_merged_results.csv"
OUTPUT_PATH = INPUT_PATH.replace(".csv", "_with_comparison_final_v2.csv")

def normalize_name(name):
    """Нормализация названий таксонов (без рода и окончаний)"""
    if pd.isna(name):
        return ''
    name = str(name).lower().strip()
    name = re.sub(r'\s+', ' ', name)
    name = re.sub(r'[^a-z ]', '', name)
    return name

def extract_species(name):
    """Извлечение видового эпитета (второго слова)"""
    if pd.isna(name):
        return ''
    parts = str(name).split()
    return parts[1] if len(parts) > 1 else ''

def compare_species(manual, auto):
    """Сравнение видовых эпитетов с нормализацией"""
    manual_species = extract_species(manual)
    auto_species = extract_species(auto)

    if not manual_species or not auto_species:
        return 0

    # Удаляем окончания (us, a, um и т.д.)
    manual_stem = re.sub(r'(us|a|um|is|es|i|ae|orum)$', '', normalize_name(manual_species))
    auto_stem = re.sub(r'(us|a|um|is|es|i|ae|orum)$', '', normalize_name(auto_species))

    return 1 if (
        manual_stem == auto_stem or
        manual_stem in auto_stem or
        auto_stem in manual_stem or
        SequenceMatcher(None, manual_stem, auto_stem).ratio() > 0.85
    ) else 0

def main():
    # Загрузка данных
    print("Загрузка данных...")
    df = pd.read_csv(INPUT_PATH)

    # Уникальные образцы с ручной идентификацией
    manual_samples = df[df['NameBasedOnSequence'].notna()].drop_duplicates('catNumber')
    print(f"Уникальных образцов с ручной ID: {len(manual_samples)}")

    # Сравнение только для уникальных catNumber
    comparison_results = {}
    for _, row in tqdm(manual_samples.iterrows(), total=len(manual_samples), desc="Сравнение"):
        manual_id = row['NameBasedOnSequence']
        auto_id = row['Taxon name'] if not pd.isna(row['Taxon name']) else ''
        comparison_results[row['catNumber']] = compare_species(manual_id, auto_id)

    # Добавляем результаты в исходный DataFrame
    df['identification_match'] = df['catNumber'].map(comparison_results)

    # Сохраняем
    df.to_csv(OUTPUT_PATH, index=False)
    print(f"\nРезультаты сохранены в: {OUTPUT_PATH}")

    # Статистика по УНИКАЛЬНЫМ образцам
    total = len(comparison_results)
    matches = sum(comparison_results.values())

    print("\nФинальная статистика (по уникальным образцам):")
    print(f"Всего образцов: {total}")
    print(f"Совпадений: {matches} ({matches/total:.1%})")
    print(f"Расхождений: {total - matches} ({(total - matches)/total:.1%})")

    # Примеры расхождений
    discrepancies = [k for k, v in comparison_results.items() if v == 0]
    if discrepancies:
        print("\nПримеры расхождений (первые 3):")
        for cat in discrepancies[:3]:
            sample = manual_samples[manual_samples['catNumber'] == cat].iloc[0]
            print(f"catNumber: {cat}")
            print(f"Ручная ID: {sample['NameBasedOnSequence']}")
            print(f"Авто ID: {sample['Taxon name']}")
            print(f"Сравнивались: '{extract_species(sample['NameBasedOnSequence'])}' vs '{extract_species(sample['Taxon name'])}'\n")

if __name__ == "__main__":
    main()

# Другие анализы

In [None]:
# загрузка ID номеров из базы NCBI для выгрузки в ENA

from Bio import Entrez
import pandas as pd
import time  # Добавлен импорт модуля time
from tqdm.notebook import tqdm

# --- Настройки ---
Entrez.email = "filippova.courlee.nina@gmail.com"  # Обязательно замените на реальный email!
API_DELAY = 0.34  # Задержка между запросами (в секундах)

# --- Проверка таблицы ---
if not isinstance(taxa, pd.DataFrame):
    raise ValueError("'taxa' должен быть pandas DataFrame!")
if 'name' not in taxa.columns:
    raise ValueError("Таблица 'taxa' должна содержать колонку 'name'!")

# --- Функция для запроса tax_id с обработкой ошибок ---
def get_taxid(species_name):
    try:
        handle = Entrez.esearch(db="taxonomy", term=species_name, retmode="xml")
        record = Entrez.read(handle)
        handle.close()
        return record["IdList"][0] if record["IdList"] else pd.NA
    except Exception as e:
        print(f"Ошибка для '{species_name}': {str(e)}")
        return pd.NA

# --- Прогресс-бар и обработка ---
tqdm.pandas(desc="Запрос tax_id из NCBI")

# Применяем функцию с задержкой
taxa['tax_id'] = taxa['name'].progress_apply(
    lambda x: (time.sleep(API_DELAY), get_taxid(x))[1]
)

# --- Результаты ---
print("\nРезультаты:")
print(f"Найдено tax_id: {taxa['tax_id'].notna().sum()}/{len(taxa)}")
display(taxa.head())

# --- Сохранение ---
output_filename = "taxa_with_taxids.csv"
taxa.to_csv(output_filename, index=False)
print(f"\nРезультаты сохранены в файл: {output_filename}")

# Для Google Colab (если нужно скачать)
try:
    from google.colab import files
    files.download(output_filename)
except ImportError:
    pass

In [None]:
# небольшой географический анализ
import pandas as pd
import numpy as np
import os
from sklearn.cluster import DBSCAN
from geopy.distance import great_circle

# Загрузка данных
# Укажите правильный путь к вашей рабочей директории
work_dir = '/content/drive/MyDrive/Colab Notebooks/IT_expedition/CortDB/'  # Для Google Colab обычно /content/
file_name = "geography1.csv"  # Убедитесь, что имя файла правильное (возможно опечатка "gepgraphy" вместо "geography")

# Полный путь к файлу
file_path = os.path.join(work_dir, file_name)

# Проверка существования файла
if not os.path.exists(file_path):
    raise FileNotFoundError(f"Файл не найден: {file_path}. Проверьте путь и имя файла.")

# Чтение данных
df = pd.read_csv(file_path)

# Проверка наличия нужных колонок
required_columns = ['decimalLatitude', 'decimalLongitude', 'sample_id']
for col in required_columns:
    if col not in df.columns:
        raise ValueError(f"В данных отсутствует обязательная колонка: {col}")

# Преобразуем координаты в радианы для вычислений
coords = np.radians(df[['decimalLatitude', 'decimalLongitude']].values)

# Параметры для DBSCAN
epsilon = 5 / 6371.0088  # 5 км в радианах (6371.0088 - радиус Земли в км)
min_samples = 1  # Минимум 1 точка в кластере

# Создаем модель DBSCAN
db = DBSCAN(eps=epsilon, min_samples=min_samples, metric='haversine', algorithm='ball_tree')
labels = db.fit_predict(coords)

# Добавляем метки кластеров в DataFrame
df['cluster'] = labels

# Функция для нахождения центра кластера
def get_center(cluster_df):
    return (cluster_df['decimalLatitude'].mean(), cluster_df['decimalLongitude'].mean())

# Создаем DataFrame с информацией о кластерах
clusters_info = []
for cluster_id in df['cluster'].unique():
    if cluster_id != -1:  # Игнорируем шум (если есть)
        cluster_df = df[df['cluster'] == cluster_id]
        center = get_center(cluster_df)
        clusters_info.append({
            'cluster_id': cluster_id,
            'center_latitude': center[0],
            'center_longitude': center[1],
            'num_samples': len(cluster_df),
            'sample_ids': ', '.join(map(str, cluster_df['sample_id'].tolist()))
        })

clusters_df = pd.DataFrame(clusters_info)

# Выводим результаты
print(f"Всего кластеров: {len(clusters_df)}")
print(f"Всего образцов: {len(df)}")
print("\nИнформация о кластерах:")
print(clusters_df)

# Визуализация (опционально)
try:
    import folium

    # Создаем карту с центром в среднем положении всех точек
    map_center = [df['decimalLatitude'].mean(), df['decimalLongitude'].mean()]
    m = folium.Map(location=map_center, zoom_start=10)

    # Добавляем точки на карту
    for _, row in df.iterrows():
        folium.CircleMarker(
            location=[row['decimalLatitude'], row['decimalLongitude']],  # Исправлено: было 'decimalLatitudede'
            radius=5,
            color='blue' if row['cluster'] != -1 else 'red',
            fill=True,
            fill_opacity=0.7,
            popup=f"Sample: {row['sample_id']}, Cluster: {row['cluster']}"
        ).add_to(m)

    # Добавляем центры кластеров
    for _, cluster in clusters_df.iterrows():
        folium.Circle(
            location=[cluster['center_latitude'], cluster['center_longitude']],
            radius=5000,  # 5 км
            color='green',
            fill=True,
            fill_opacity=0.2,
            popup=f"Cluster {cluster['cluster_id']}: {cluster['num_samples']} samples"
        ).add_to(m)

    # Сохраняем карту
    m.save('clusters_map.html')
    print("\nКарта сохранена как 'clusters_map.html'")
except ImportError:
    print("\nДля визуализации установите folium: !pip install folium")

# Сохраняем результаты
df.to_csv('samples_with_clusters.csv', index=False)
clusters_df.to_csv('clusters_info.csv', index=False)