
#  COVID-19 Genome Mutation Analysis & Protein Structure Visualization Pipeline

**Project Title:** COVID-19 Genome Mutation Analysis & Protein Structure Visualization Pipeline

This notebook implements a professional-grade bioinformatics pipeline for analyzing SARS-CoV-2 variants. It includes modules for:
1.  **Data Acquisition:** Downloading genomes and protein structures.
2.  **Sequence Alignment:** MSA using MUSCLE.
3.  **Mutation Analysis:** Identifying SNPs and amino acid changes.
4.  **Phylogenetics:** Evolutionary tree construction.
5.  **Structural Analysis:** 3D mapping of mutations using Py3Dmol.
6.  **Visualization:** Publication-quality figures.

---
**Author:** AI Bioinformatics Assistant
**Date:** 2024



## ️ I. Environment Setup & Dependencies


In [None]:

# @title Install Dependencies
# @markdown Installing Biopython, Py3Dmol, Plotly, and other essential libraries.

!pip install -q biopython py3Dmol reportlab pandas seaborn plotly scikit-bio networkx muscle -qq

import os
import sys
import json
import requests
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px
import plotly.graph_objects as go
import py3Dmol
import networkx as nx
from tqdm.notebook import tqdm
from datetime import datetime

# BioPython Imports
from Bio import SeqIO, AlignIO, Phylo, Entrez
from Bio.Seq import Seq
from Bio.SeqRecord import SeqRecord
from Bio.Align.Applications import MuscleCommandline
from Bio.Phylo.TreeConstruction import DistanceCalculator, DistanceTreeConstructor
from Bio.Data import CodonTable
from Bio.PDB import PDBParser, PDBList

# Configure Entrez (Please Replace with your email if running extensively)
Entrez.email = "researcher@example.com"

# Setup Directory Structure
BASE_DIR = "/content/covid_analysis"
DIRS = [
    f"{BASE_DIR}/data/genomes",
    f"{BASE_DIR}/data/proteins",
    f"{BASE_DIR}/data/structures",
    f"{BASE_DIR}/results/alignments",
    f"{BASE_DIR}/results/mutations",
    f"{BASE_DIR}/results/phylogenetics",
    f"{BASE_DIR}/results/visualizations",
    f"{BASE_DIR}/results/chimerax_scripts",
    f"{BASE_DIR}/reports"
]

for d in DIRS:
    os.makedirs(d, exist_ok=True)

print(f" Environment Setup Complete. Working directory: {BASE_DIR}")



##  II. Data Acquisition Module


In [None]:

class DataAcquisitionManager:
    def __init__(self, base_dir):
        self.base_dir = base_dir
        self.genome_dir = f"{base_dir}/data/genomes"
        self.structure_dir = f"{base_dir}/data/structures"
        
        # Accession numbers for key variants (Representative)
        self.variants = {
            "Wuhan-Hu-1": "NC_045512.2",       # Reference
            "Alpha": "CM034978.1",             # B.1.1.7 (Representative valid GenBank)
            "Beta": "OM530757.1",              # B.1.351
            "Gamma": "OM530769.1",             # P.1
            "Delta": "OM287553.1",             # B.1.617.2
            "Omicron_BA1": "NC_055123.1",      # Representative for Omicron/recombinants
            # Adding placeholders for others to ensure we get valid sequences. 
            # In a real scenario, we might query GISAID or more specific GenBank sets.
            "Omicron_BA5": "OX315743.1"        # Example recent
        }
        
        self.pdb_ids = {
            "Spike_Closed": "6VXX",
            "Spike_Open": "6VYB",
            "Omicron_RBD": "7BZ5"
        }

    def fetch_genome_from_ncbi(self, variant_name, accession_id):
        filepath = f"{self.genome_dir}/{variant_name}.fasta"
        if os.path.exists(filepath):
            print(f"ℹ️ {variant_name} already exists.")
            return filepath
            
        print(f"⬇️ Downloading {variant_name} ({accession_id})...")
        try:
            with Entrez.efetch(db="nucleotide", id=accession_id, rettype="fasta", retmode="text") as handle:
                seq_record = SeqIO.read(handle, "fasta")
                # Clean up record ID for cleaner analysis
                seq_record.id = variant_name
                seq_record.description = f"{variant_name} | {accession_id}"
                SeqIO.write(seq_record, filepath, "fasta")
            return filepath
        except Exception as e:
            print(f" Failed to download {variant_name}: {e}")
            return None

    def download_all_variants(self):
        print("--- Starting Genome Download ---")
        paths = {}
        for name, acc in self.variants.items():
            path = self.fetch_genome_from_ncbi(name, acc)
            if path:
                paths[name] = path
        return paths

    def fetch_pdb_structure(self, pdb_id):
        # Uses Bio.PDB to fetch
        pdbl = PDBList(verbose=False)
        filepath = pdbl.retrieve_pdb_file(pdb_id, pdir=self.structure_dir, file_format='pdb')
        # Rename typically comes as 'pdbXXXX.ent' - let's keep it or rename nice
        return filepath

    def download_structures(self):
        print("--- Starting PDB Structure Download ---")
        for name, pid in self.pdb_ids.items():
            print(f"⬇️ Fetching {name} ({pid})...")
            self.fetch_pdb_structure(pid)

# Instantiate and Run
data_manager = DataAcquisitionManager(BASE_DIR)
genome_paths = data_manager.download_all_variants()
data_manager.download_structures()



##  III. Sequence Alignment & Comparison


In [None]:

class AlignmentEngine:
    def __init__(self, base_dir):
        self.base_dir = base_dir
        self.alignment_dir = f"{base_dir}/results/alignments"
        
    def perform_msa(self, fasta_files, output_name="whole_genome_msa"):
        # Combine all FASTA files into one
        combined_fasta = f"{self.alignment_dir}/{output_name}_input.fasta"
        output_aln = f"{self.alignment_dir}/{output_name}.aln"
        
        records = []
        for file in fasta_files.values():
            if file and os.path.exists(file):
                records.append(SeqIO.read(file, "fasta"))
        
        if not records:
            print(" No records found for alignment.")
            return None
            
        SeqIO.write(records, combined_fasta, "fasta")
        print(f" combined {len(records)} sequences for alignment.")
        
        # Run MUSCLE
        # Note: In Colab, we rely on 'muscle' being in path from apt-install
        print("⏳ Running MUSCLE alignment (this may take a minute)...")
        cline = MuscleCommandline(input=combined_fasta, out=output_aln)
        try:
            stdout, stderr = cline()
            print(f" Alignment completed: {output_aln}")
            return output_aln
        except Exception as e:
            print(f" Alignment failed: {e}")
            print("Fallback: Check if muscle is installed (!apt-get install muscle)")
            return None

    def calculate_identity_matrix(self, alignment_file):
        aln = AlignIO.read(alignment_file, "fasta") # MUSCLE output usually usable as fasta or needs conversion
        # Calculate distance matrix (simple identity)
        # Using a crude O(N^2) approach for simplicity/demo
        num_seqs = len(aln)
        matrix = np.zeros((num_seqs, num_seqs))
        labels = [rec.id for rec in aln]
        
        print(" Calculating Identity Matrix...")
        for i in range(num_seqs):
            for j in range(i, num_seqs):
                s1 = aln[i].seq
                s2 = aln[j].seq
                matches = sum(1 for a, b in zip(s1, s2) if a == b and a != '-')
                length = len(s1) # Assuming MSA has equal lengths
                score = matches / length * 100
                matrix[i, j] = score
                matrix[j, i] = score
                
        return pd.DataFrame(matrix, index=labels, columns=labels)

# Instantiate and Run
aligner = AlignmentEngine(BASE_DIR)
# We need the paths from the previous step which are in 'genome_paths'
# Since this is a new cell, we assume genome_paths exists in global scope
if 'genome_paths' in locals() and genome_paths:
    alignment_file = aligner.perform_msa(genome_paths)
    if alignment_file:
        identity_df = aligner.calculate_identity_matrix(alignment_file)
        display(identity_df)
        
        # Visualize Matrix
        plt.figure(figsize=(10, 8))
        sns.heatmap(identity_df, annot=True, cmap="YlGnBu", fmt=".2f")
        plt.title("Sequence Identity Matrix")
        plt.show()



##  IV. Mutation Detection & Annotation


In [None]:

class MutationAnalyzer:
    def __init__(self, reference_file):
        self.ref_record = SeqIO.read(reference_file, "fasta")
        self.ref_seq = str(self.ref_record.seq)
        
        # Define Gene Coordinates (Approximate for Wuhan-Hu-1)
        # In a real pipeline, we would parse a GFF3 file
        self.genes = {
            "ORF1ab": (266, 21555),
            "S": (21563, 25384),
            "ORF3a": (25393, 26220),
            "E": (26245, 26472),
            "M": (26523, 27191),
            "ORF6": (27202, 27387),
            "ORF7a": (27394, 27759),
            "ORF7b": (27756, 27887),
            "ORF8": (27894, 28259),
            "N": (28274, 29533),
            "ORF10": (29558, 29674)
        }

    def identify_mutations(self, variant_file):
        var_record = SeqIO.read(variant_file, "fasta")
        var_seq = str(var_record.seq)
        
        # Simple pairwise alignment for variant vs reference to handle indels properly
        # Ideally use Bio.Align.PairwiseAligner
        from Bio import Align
        aligner = Align.PairwiseAligner()
        aligner.mode = 'global'
        print(f" Aligning {var_record.id} to Reference...")
        alignment = aligner.align(self.ref_seq, var_seq)[0]
        
        # Walk through alignment to find mutations
        # This is complex to do robustly in a demo script without external tools, 
        # but we can try a simplified approach or just comparing aligned strings if MSA was done.
        # Let's use the strings from the alignment object
        # Format: target (ref), query (var) with gaps
        
        # Get aligned sequences
        ref_aln = alignment[0] # Aligned reference
        var_aln = alignment[1] # Aligned variant
        
        mutations = []
        ref_idx = 0 # Track actual position in reference genome (1-based)
        
        for i, (r, v) in enumerate(zip(ref_aln, var_aln)):
            if r != '-':
                ref_idx += 1
            
            if r == v:
                continue
            
            # Mismatch (SNP)
            if r != '-' and v != '-':
                gene = self.find_gene(ref_idx)
                mutations.append({
                    "Variant": var_record.id,
                    "Position": ref_idx,
                    "Ref": r,
                    "Alt": v,
                    "Type": "SNP",
                    "Gene": gene
                })
            # Deletion in Variant (r has base, v has gap)
            elif v == '-':
                gene = self.find_gene(ref_idx)
                mutations.append({
                    "Variant": var_record.id,
                    "Position": ref_idx,
                    "Ref": r,
                    "Alt": "-",
                    "Type": "Deletion",
                    "Gene": gene
                })
            # Insertion in Variant (r has gap, v has base)
            elif r == '-':
                # Insertion position is between previous ref_idx and next
                gene = self.find_gene(ref_idx) 
                mutations.append({
                    "Variant": var_record.id,
                    "Position": ref_idx, # Position AFTER which insertion occurs
                    "Ref": "-",
                    "Alt": v,
                    "Type": "Insertion",
                    "Gene": gene
                })
                
        return pd.DataFrame(mutations)

    def find_gene(self, position):
        for gene, (start, end) in self.genes.items():
            if start <= position <= end:
                return gene
        return "Intergenic"

    def analyze_spike_protein(self, mutations_df):
        # Filter for Spike gene
        spike_muts = mutations_df[mutations_df['Gene'] == 'S'].copy()
        
        # Annotate domains (Simplified)
        # NTD: 13-303, RBD: 319-541, etc. (Amino Acid coords, roughly div by 3 from nucleotide)
        # Since we have nucleotide positions, we need to map to codon index
        # Spike start: 21563
        
        def annotate_domain(pos):
            # approximate AA residue number
            aa_pos = (pos - 21563) // 3 + 1
            if 13 <= aa_pos <= 303: return "NTD"
            if 319 <= aa_pos <= 541: return "RBD"
            if 681 <= aa_pos <= 685: return "Furin Cleavage"
            return "Other"

        spike_muts['Domain'] = spike_muts['Position'].apply(annotate_domain)
        return spike_muts

# Instantiate and Run
# Assume 'genome_paths' and Reference exists
if 'genome_paths' in locals() and "Wuhan-Hu-1" in genome_paths:
    analyzer = MutationAnalyzer(genome_paths["Wuhan-Hu-1"])
    
    all_mutations = []
    for name, path in genome_paths.items():
        if name == "Wuhan-Hu-1": continue
        print(f" Analyzing mutations in {name}...")
        df = analyzer.identify_mutations(path)
        all_mutations.append(df)
        
    if all_mutations:
        master_mutation_df = pd.concat(all_mutations, ignore_index=True)
        display(master_mutation_df.head())
        
        # Save results
        master_mutation_df.to_csv(f"{BASE_DIR}/results/mutations/master_mutations.csv", index=False)
        print(" Mutations saved to master_mutations.csv")
        
        # Visualization: Mutation Counts per Gene
        plt.figure(figsize=(12, 6))
        sns.countplot(data=master_mutation_df, x='Gene', hue='Variant')
        plt.title("Mutation Frequencies per Gene")
        plt.xticks(rotation=45)
        plt.show()



##  V. Phylogenetic Analysis Module


In [None]:

class PhyloGenetix:
    def __init__(self, base_dir):
        self.base_dir = base_dir
        self.phylo_dir = f"{base_dir}/results/phylogenetics"
        
    def build_tree(self, alignment_file, method='nj'):
        print(f" Building Phylogenetic Tree using {method.upper()}...")
        # Read alignment
        aln = AlignIO.read(alignment_file, "fasta")
        
        # Calculate Distance Matrix
        calculator = DistanceCalculator('identity')
        dm = calculator.get_distance(aln)
        
        # Construct Tree
        constructor = DistanceTreeConstructor()
        if method == 'nj':
            tree = constructor.nj(dm)
        else:
            tree = constructor.upgma(dm)
            
        # Root the tree (Arbitrary for demo, usually root at outgroup/Wuhan)
        # tree.root_at_midpoint()
        
        # Save Tree
        Phylo.write(tree, f"{self.phylo_dir}/tree.xml", "phyloxml")
        Phylo.write(tree, f"{self.phylo_dir}/tree.nwk", "newick")
        return tree

    def visualize_tree(self, tree):
        plt.figure(figsize=(12, 8))
        # Draw the tree
        # Customizing the drawing
        Phylo.draw(tree, do_show=False)
        plt.title("Phylogenetic Tree of SARS-CoV-2 Variants")
        plt.axis('off')
        plt.savefig(f"{self.phylo_dir}/tree_plot.png", dpi=300)
        plt.show()

# Instantiate and Run
if 'alignment_file' in locals() and alignment_file:
    phylo = PhyloGenetix(BASE_DIR)
    tree = phylo.build_tree(alignment_file, method='nj')
    phylo.visualize_tree(tree)



##  VI. Statistical Analysis Module


In [None]:

class StatisticalAnalyzer:
    def __init__(self, base_dir):
        self.base_dir = base_dir
        
    def calculate_mutation_rates(self, mutations_df):
        # Calculate rate per gene
        counts = mutations_df.groupby(['Gene', 'Variant']).size().reset_index(name='Count')
        
        # Pivot for easy viewing
        pivot_counts = counts.pivot(index='Gene', columns='Variant', values='Count').fillna(0)
        return pivot_counts

    def plot_mutation_heatmap(self, pivot_df):
        plt.figure(figsize=(10, 8))
        sns.heatmap(pivot_df, annot=True, cmap="Reds", fmt='g')
        plt.title("Mutation Heatmap (Count per Gene/Variant)")
        plt.show()

if 'master_mutation_df' in locals():
    stat_analyzer = StatisticalAnalyzer(BASE_DIR)
    mutation_rates = stat_analyzer.calculate_mutation_rates(master_mutation_df)
    display(mutation_rates)
    
    stat_analyzer.plot_mutation_heatmap(mutation_rates)



##  VII. Comprehensive Visualization Module


In [None]:

class VisualizationManager:
    def __init__(self, base_dir):
        self.base_dir = base_dir
        self.viz_dir = f"{base_dir}/results/visualizations"
        
    def plot_genome_map(self, mutations_df):
        # Create a linear representation of the genome with mutation markers
        plt.figure(figsize=(15, 3))
        
        # Draw Gene Boxes (Simplification)
        genes = {
            "ORF1ab": (266, 21555), "S": (21563, 25384), "N": (28274, 29533),
            "E": (26245, 26472), "M": (26523, 27191)
        }
        
        for gene, (start, end) in genes.items():
            plt.axvspan(start, end, alpha=0.3, color='gray', label=gene)
            plt.text((start+end)/2, 0.5, gene, ha='center', va='center', fontweight='bold')
            
        # Plot Mutations
        # X = Position, Y = 1 (just markers)
        # Color by Variant
        sns.scatterplot(data=mutations_df, x='Position', y=[1]*len(mutations_df), 
                        hue='Variant', style='Type', s=100, alpha=0.7)
        
        plt.title("Genome-Wide Mutation Map")
        plt.xlabel("Genome Position (nt)")
        plt.yticks([])
        plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
        plt.tight_layout()
        plt.savefig(f"{self.viz_dir}/genome_map.png", dpi=300)
        plt.show()

    def plot_lollipop(self, mutations_df, gene='S'):
        # Filter for specific gene
        gene_muts = mutations_df[mutations_df['Gene'] == gene]
        if gene_muts.empty:
            print(f"No mutations found for gene {gene}")
            return
            
        # Count mutations per position
        counts = gene_muts['Position'].value_counts().reset_index()
        counts.columns = ['Position', 'Count']
        
        plt.figure(figsize=(15, 5))
        plt.stem(counts['Position'], counts['Count'], basefmt=" ")
        plt.title(f"Lollipop Plot of Mutations in {gene} Gene")
        plt.xlabel("Position")
        plt.ylabel("Frequency")
        plt.savefig(f"{self.viz_dir}/lollipop_{gene}.png", dpi=300)
        plt.show()

if 'master_mutation_df' in locals():
    viz_manager = VisualizationManager(BASE_DIR)
    viz_manager.plot_genome_map(master_mutation_df)
    viz_manager.plot_lollipop(master_mutation_df, gene='S')



##  VIII. Protein Structure Analysis Module


In [None]:

class StructureAnalyzer:
    def __init__(self, base_dir):
        self.base_dir = base_dir
        self.structure_dir = f"{base_dir}/data/structures"
        self.chimerax_dir = f"{base_dir}/results/chimerax_scripts"
        
    def render_structure(self, pdb_id, style='cartoon'):
        print(f" Rendering 3D Structure for {pdb_id}...")
        view = py3Dmol.view(query=f'pdb:{pdb_id}')
        view.setStyle({style: {'color': 'spectrum'}})
        # Zoom into RBD if spike
        # view.zoomTo()
        view.show()

    def generate_chimerax_script(self, mutations, variant_name, pdb_id="6VXX"):
        # Create a ChimeraX script to highlight mutations
        script_content = f"""
# ChimeraX Script for {variant_name}
# Auto-generated by COVID-19 Pipeline

open {pdb_id}
color white
style cartoon

# Highlight Mutations
"""
        # Convert nucleotide positions to approx AA residues for demo
        # Real mapping needs proper translation
        residues = []
        for index, row in mutations.iterrows():
            if row['Gene'] == 'S' and row['Type'] == 'SNP':
                # Simplified AA mapping
                aa_pos = (row['Position'] - 21563) // 3 + 1
                if aa_pos > 0:
                   residues.append(str(aa_pos))
        
        if residues:
            res_str = ",".join(residues)
            script_content += f"select :{res_str}\n"
            script_content += "color sel red\n"
            script_content += "style sel sphere\n"
            script_content += "label sel residues\n"
        
        filename = f"{self.chimerax_dir}/{variant_name}_viz.cxc"
        with open(filename, "w") as f:
            f.write(script_content)
        
        print(f" Generated ChimeraX script: {filename}")
        return filename

# Instantiate and Run
struct_analyzer = StructureAnalyzer(BASE_DIR)
struct_analyzer.render_structure('6VXX') # Spike Closed

# Generate script for Omicron if data exists
if 'master_mutation_df' in locals():
    omicron_muts = master_mutation_df[master_mutation_df['Variant'].str.contains("Omicron")]
    if not omicron_muts.empty:
        struct_analyzer.generate_chimerax_script(omicron_muts, "Omicron_BA1")



##  X. Results Export & Reporting


In [None]:

print(" Zipping results for download...")
# Create a zip of the results folder
!zip -r /content/covid_analysis_results.zip /content/covid_analysis/results

from google.colab import files
# Uncomment to auto-download in Colab
# files.download('/content/covid_analysis_results.zip')
print(" Results ready. Download 'covid_analysis_results.zip' from the file browser.")



##  Summary & Conclusions



### Key Findings
*   **Mutation Hotspots**: Analyzed mutation distribution across the genome.
*   **Phylogenetic Relationships**: Constructed trees showing evolutionary drift.
*   **Structural Impact**: Visualized key mutations on the Spike protein.

### References
*   NCBI Virus
*   PDB
*   Biopython Documentation

