<a href="https://colab.research.google.com/github/jamessutton600613-png/GC/blob/main/Untitled173.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install qiskit-aer qiskit

Collecting qiskit-aer
  Downloading qiskit_aer-0.17.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (8.3 kB)
Collecting qiskit
  Downloading qiskit-2.1.0-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (12 kB)
Collecting rustworkx>=0.15.0 (from qiskit)
  Downloading rustworkx-0.16.0-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (10 kB)
Collecting stevedore>=3.0.0 (from qiskit)
  Downloading stevedore-5.4.1-py3-none-any.whl.metadata (2.3 kB)
Collecting pbr>=2.0.0 (from stevedore>=3.0.0->qiskit)
  Downloading pbr-6.1.1-py2.py3-none-any.whl.metadata (3.4 kB)
Downloading qiskit_aer-0.17.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (12.4 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m12.4/12.4 MB[0m [31m128.6 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading qiskit-2.1.0-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (7.5 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m

In [None]:
import os
import sys
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import random
import pickle
from datetime import datetime
from google.colab import output
from tqdm.notebook import tqdm
from qiskit import QuantumCircuit, transpile
from qiskit_aer import AerSimulator
from google.colab import drive
drive.mount('/content/drive')

class QuantumRandomGenerator:
    def __init__(self, num_bits_precision=64):
        self.simulator = AerSimulator()
        self.num_bits = num_bits_precision
    def get_seed(self):
        qc = QuantumCircuit(self.num_bits, self.num_bits)
        qc.h(range(self.num_bits))
        qc.measure(range(self.num_bits), range(self.num_bits))
        job = self.simulator.run(transpile(qc, self.simulator), shots=1)
        return int(list(job.result().get_counts(0).keys())[0], 2)

class Environment:
    def __init__(self, rng, days_per_cycle=50):
        self.days_per_cycle, self.time, self.rng = days_per_cycle, 0, rng
        self.steps_per_year = 365 * self.days_per_cycle
        self.uv_intensity, self.temperature = self._generate_cycles(num_years=50)

    def _generate_cycles(self, num_years):
        total_steps = num_years * self.steps_per_year
        daily_uv_cycle = np.sin(np.linspace(0, 2 * np.pi, self.days_per_cycle)); daily_uv_cycle[daily_uv_cycle < 0] = 0
        daily_temp_swing = -4 * np.cos(np.linspace(0, 2 * np.pi, self.days_per_cycle))
        base_weather_noise = self.rng.standard_normal(total_steps)
        smoothing_window = np.ones(14 * self.days_per_cycle) / (14 * self.days_per_cycle)
        weather_pattern = np.convolve(base_weather_noise, smoothing_window, 'same') * 5.0

        full_uv, full_temp = [], []
        global_step_counter = 0
        for _ in range(num_years):
            uv_severity = self.rng.uniform(0.6, 1.4)
            seasonal_temp_base = 15 - 10 * np.cos(np.linspace(0, 2 * np.pi, self.steps_per_year))
            seasonal_amplitude_mod = (0.225 * np.sin(np.linspace(0, 2 * np.pi, self.steps_per_year)) + 0.725) * uv_severity
            daily_temp_anomaly = 0
            for i in range(self.steps_per_year):
                if i % self.days_per_cycle == 0: daily_temp_anomaly = self.rng.uniform(-1.5, 1.5)
                cloud_cover_factor = self.rng.uniform(0.7, 1.0)
                daily_uv = daily_uv_cycle[i % self.days_per_cycle] * seasonal_amplitude_mod[i] * cloud_cover_factor
                full_uv.append(daily_uv)
                solar_temp = seasonal_temp_base[i] + daily_temp_swing[i % self.days_per_cycle] + weather_pattern[global_step_counter] + daily_temp_anomaly
                full_temp.append(max(4.0, solar_temp))
                global_step_counter += 1
        return np.array(full_uv), np.array(full_temp)

    def get_current_uv(self): return self.uv_intensity[self.time % len(self.uv_intensity)]
    def get_current_temperature(self): return self.temperature[self.time % len(self.temperature)]
    def step(self): self.time += 1

class Protoribosome:
    def __init__(self, env, strategy, initial_sequence, rng, initial_mass=100.0):
        self.env, self.strategy, self.rna_sequence, self.rng = env, strategy, list(initial_sequence), rng
        self.rna_mass = initial_mass
        self.uv_protection_pool = 50.0
        self.rna_damage_level, self.location, self.status = 0.0, 'shadow_zone', 'ACTIVE'
        self.nmp_pool = {'A': 100, 'U': 100, 'G': 100, 'C': 100}
        self.ndp_pool = {'A': 50,  'U': 50,  'G': 50,  'C': 50}
        self.ntp_pool = {'A': 20,  'U': 20,  'G': 20,  'C': 20}

        if self.strategy == 'cautious': self.uv_damage_rate_per_uv = 1.0
        else: self.uv_damage_rate_per_uv = 1.4
        self.repair_rate, self.uv_protection_factor, self.damage_tolerance_threshold = 0.5, 0.01, 3.0
        self.replication_rate, self.mutation_prob = 0.05, 0.004

    def __setstate__(self, state):
        self.__dict__.update(state)
        if 'nmp_pool' not in state: self.nmp_pool = {'A': 100, 'U': 100, 'G': 100, 'C': 100}
        if 'ndp_pool' not in state: self.ndp_pool = {'A': 50,  'U': 50,  'G': 50,  'C': 50}
        if 'ntp_pool' not in state: self.ntp_pool = {'A': 20,  'U': 20,  'G': 20,  'C': 20}
        if 'rna_damage_level' not in state: self.rna_damage_level = 0.0

    def calculate_protection_score(self):
        score = (len(self.rna_sequence) // 3) * 0.1
        weights = {'UGG':5.0,'UAU':3.0,'UAC':3.0,'UUU':1.5,'UUC':1.5,'UGU':1.0,'UGC':1.0,'AUG':0.75,'CAU':0.5,'CAC':0.5}
        score += sum(weights.get("".join(self.rna_sequence[i:i+3]), 0) for i in range(0, len(self.rna_sequence), 3))
        return score

    def _forage_and_phosphorylate(self, current_uv):
        for base in self.nmp_pool: self.nmp_pool[base] += 5
        for base in self.nmp_pool:
            if self.nmp_pool[base] > 0: self.nmp_pool[base] -= 1; self.ndp_pool[base] += 1
        energy = self.uv_protection_pool * current_uv * 0.1
        conversions = int(energy / 5)
        for _ in range(conversions):
            base = self.rng.choice(list(self.ndp_pool.keys()))
            if self.ndp_pool[base] > 0: self.ndp_pool[base] -= 1; self.ntp_pool[base] += 1

    def _replicate_rna(self):
        if self.status != 'ACTIVE' or self.rna_mass < 80: return None
        required_ntps = {base: self.rna_sequence.count(base) for base in 'AUGC'}
        if not all(self.ntp_pool[base] >= count for base, count in required_ntps.items()):
            return None

        for base, count in required_ntps.items(): self.ntp_pool[base] -= count
        offspring_mass = self.rna_mass * 0.5; self.rna_mass -= offspring_mass
        return Protoribosome(self.env, self.strategy, "".join(self.rna_sequence), self.rng, offspring_mass)

    def step(self, current_uv):
        if self.status == 'INACTIVE': return None
        self._forage_and_phosphorylate(current_uv)

        protection = self.uv_protection_pool * self.uv_protection_factor
        effective_uv = current_uv * max(0.01, 1 - protection)
        self.rna_damage_level += effective_uv * self.uv_damage_rate_per_uv
        self.rna_damage_level = max(0, self.rna_damage_level - self.repair_rate)

        has_stop = any("".join(self.rna_sequence[i:i+3]) in {'UAA','UAG','UGA'} for i in range(0,len(self.rna_sequence),3))
        if self.strategy == 'cautious' and has_stop: self.status = 'ARRESTED'
        elif self.strategy == 'readthrough' and has_stop: self.status = 'INACTIVE'

        if self.rna_damage_level > self.damage_tolerance_threshold: self.status = 'INACTIVE'
        if self.status == 'INACTIVE': return None

        self.uv_protection_pool += 0.20 * self.calculate_protection_score()
        return self._replicate_rna()

class Colony:
    def __init__(self, env, dna_template, initial_pop_size, steps_per_day, rng, shuffle_rng):
        self.env, self.dna_template, self.steps_per_day = env, dna_template, steps_per_day
        self.max_population = 10000
        self.rng, self.shuffle_rng = rng, shuffle_rng
        self.active_population = [Protoribosome(env, 'cautious' if i%2==0 else 'readthrough', dna_template, rng) for i in range(initial_pop_size)]

    def step(self, current_step):
        current_uv = self.env.get_current_uv()
        next_generation = [offspring for p in self.active_population if (offspring := p.step(current_uv)) is not None]
        self.active_population = [p for p in self.active_population if p.status != 'INACTIVE'] + next_generation
        if len(self.active_population) > self.max_population:
            self.active_population = self.rng.choice(self.active_population, self.max_population, replace=False).tolist()

    def get_aggregated_data(self):
        s_keys = ['cautious', 'readthrough', 'cautious_avg_damage', 'readthrough_avg_damage']
        p_keys = [f'{s}_{p}_total' for s in ['cautious','readthrough'] for p in ['nmp','ndp','ntp']]
        data = {key: 0.0 for key in s_keys + p_keys}
        if not self.active_population: return data

        c_count, r_count = 0, 0
        for p in self.active_population:
            s = p.strategy
            data[s] += 1
            data[f'{s}_avg_damage'] += p.rna_damage_level
            data[f'{s}_nmp_total'] += sum(p.nmp_pool.values())
            data[f'{s}_ndp_total'] += sum(p.ndp_pool.values())
            data[f'{s}_ntp_total'] += sum(p.ntp_pool.values())
            if s == 'cautious': c_count += 1
            else: r_count += 1

        if c_count > 0: data['cautious_avg_damage'] /= c_count
        if r_count > 0: data['readthrough_avg_damage'] /= r_count
        return data

    def get_genetic_load_analysis(self):
        c_total, c_stop, r_total, r_stop = 0, 0, 0, 0
        for p in self.active_population:
            if p.strategy == 'cautious':
                c_total += 1
                if any("".join(p.rna_sequence[i:i+3]) in {'UAA','UAG','UGA'} for i in range(0,len(p.rna_sequence),3)): c_stop += 1
            else:
                r_total += 1
                if any("".join(p.rna_sequence[i:i+3]) in {'UAA','UAG','UGA'} for i in range(0,len(p.rna_sequence),3)): r_stop += 1
        return c_total, c_stop, r_total, r_stop

def run_single_simulation(steps_per_day, rng, shuffle_rng, save_path, dna_template):
    colony, start_step, data_log = None, 0, []
    if save_path and os.path.exists(save_path):
        try:
            with open(save_path, 'rb') as f: state = pickle.load(f)
            colony, start_step, data_log = state['colony'], state['step_count'], state['log']
            print(f"--- Resuming simulation from step {start_step} ---")
        except Exception as e: print(f"--- Save file corrupt ({e}). Starting fresh. ---")
    if not colony:
        colony = Colony(Environment(rng, steps_per_day), dna_template, 5000, steps_per_day, rng, shuffle_rng)

    max_run_steps = 3 * colony.env.steps_per_year
    if start_step >= max_run_steps:
        # ... (code to handle already completed runs)
        return {}, pd.DataFrame(data_log)

    progress_bar = tqdm(desc="Simulating Replica", initial=start_step, total=max_run_steps, leave=False)
    for step_count in range(start_step, max_run_steps):
        colony.step(step_count)
        if len(colony.active_population) == 0: break

        if step_count % 10 == 0:
            agg_data = colony.get_aggregated_data()
            c_tot, c_stop, r_tot, r_stop = colony.get_genetic_load_analysis()
            log_entry = {'time':colony.env.time, 'uv': colony.env.get_current_uv(), 'temp':colony.env.get_current_temperature(), **agg_data,
                         'cautious_stop_pct': (c_stop/c_tot*100 if c_tot>0 else 0),
                         'reckless_stop_pct': (r_stop/r_tot*100 if r_tot>0 else 0)}
            data_log.append(log_entry)

        progress_bar.update(1)
        if step_count%25==0:
            agg_data = colony.get_aggregated_data()
            c_pools = f"{agg_data.get('cautious_nmp_total', 0)/1000:.1f}k/{agg_data.get('cautious_ndp_total', 0)/1000:.1f}k/{agg_data.get('cautious_ntp_total', 0)/1000:.1f}k"
            r_pools = f"{agg_data.get('readthrough_nmp_total', 0)/1000:.1f}k/{agg_data.get('readthrough_ndp_total', 0)/1000:.1f}k/{agg_data.get('readthrough_ntp_total', 0)/1000:.1f}k"
            progress_bar.set_postfix_str(f"Cautious (NMP/NDP/NTP): {c_pools} | Reckless: {r_pools}")

        if save_path and step_count > 0 and step_count % 500 == 0:
            with open(save_path + ".tmp", 'wb') as f: pickle.dump({'step_count':step_count,'colony':colony,'log':data_log}, f)
            os.replace(save_path + ".tmp", save_path)

    progress_bar.close()
    final_agg = colony.get_aggregated_data()
    final_c, final_r = final_agg.get('cautious',0), final_agg.get('readthrough',0)
    winner = "Cautious" if final_r == 0 else "Reckless" if final_c == 0 else "Tie/Limit"
    summary = {'Winner':winner,'Duration':step_count,'Final Cautious':final_c,'Final Reckless':final_r}
    if save_path: pd.DataFrame(data_log).to_pickle(save_path.replace('.pkl', '.df.pkl'))
    return summary, pd.DataFrame(data_log)

def plot_simulation_details(results_df, replica_title=""):
    if results_df.empty: print("No data to plot."); return
    # --- Corrected to 4 plots ---
    fig, axs = plt.subplots(4, 1, figsize=(15, 22), sharex=True, gridspec_kw={'hspace': 0.5})
    fig.suptitle(f'Simulation Detailed Results: {replica_title}', fontsize=16)
    colors = {'cautious': 'orange', 'readthrough': 'purple'}
    rolling_window = 100

    # Plot 1: Environmental Conditions
    ax = axs[0]; ax.set_title('Environmental Conditions')
    ax.plot(results_df['time'], results_df['uv'], color='gray', alpha=0.5, label='Daily UV')
    ax.plot(results_df['time'], results_df['uv'].rolling(window=rolling_window, min_periods=1).mean(), color='black', label='UV Trend')
    ax.set_ylabel('UV Intensity'); ax.legend(loc='upper left')
    ax_t = ax.twinx()
    ax_t.plot(results_df['time'], results_df['temp'], color='lightcoral', alpha=0.5, label='Daily Temp')
    ax_t.plot(results_df['time'], results_df['temp'].rolling(window=rolling_window, min_periods=1).mean(), color='red', label='Temp Trend')
    ax_t.set_ylabel('Temperature (°C)', color='r'); ax_t.legend(loc='upper right')

    # Plot 2: Population Dynamics
    ax = axs[1]; ax.set_title('Population Dynamics (Log Scale)')
    ax.plot(results_df['time'], results_df['cautious'], label='Cautious', color='orange')
    ax.plot(results_df['time'], results_df['readthrough'], label='Reckless', color='purple')
    ax.set_yscale('log'); ax.legend(); ax.set_ylabel('Count')

    # Plot 3: RNA Damage & Genetic Load
    ax = axs[2]; ax.set_title('RNA Damage & Genetic Load')
    ax.plot(results_df['time'], results_df['cautious_avg_damage'], label='Cautious Avg. Damage', color=colors['cautious'])
    ax.plot(results_df['time'], results_df['readthrough_avg_damage'], label='Reckless Avg. Damage', color=colors['readthrough'], alpha=0.8)
    ax.set_ylabel('Average RNA Damage Level'); ax.legend(loc='upper left')
    ax_t = ax.twinx()
    ax_t.plot(results_df['time'], results_df['cautious_stop_pct'], label='Cautious Stop %', color=colors['cautious'], linestyle='--')
    ax_t.plot(results_df['time'], results_df['reckless_stop_pct'], label='Reckless Stop %', color=colors['readthrough'], linestyle='--')
    ax_t.set_ylabel('Population with Stop Codon (%)'); ax_t.legend(loc='upper right')

    # Plot 4: Nucleotide Reserves
    ax = axs[3]; ax.set_title('Total Nucleotide Reserves')
    ax.plot(results_df['time'], results_df['cautious_nmp_total'], label='Cautious NMP', color='orange', alpha=0.5)
    ax.plot(results_df['time'], results_df['cautious_ndp_total'], label='Cautious NDP', color='orange', alpha=0.7)
    ax.plot(results_df['time'], results_df['cautious_ntp_total'], label='Cautious NTP', color='orange', alpha=1.0, lw=2)
    ax.plot(results_df['time'], results_df['readthrough_nmp_total'], label='Reckless NMP', color='purple', alpha=0.5, linestyle='--')
    ax.plot(results_df['time'], results_df['readthrough_ndp_total'], label='Reckless NDP', color='purple', alpha=0.7, linestyle='--')
    ax.plot(results_df['time'], results_df['readthrough_ntp_total'], label='Reckless NTP', color='purple', alpha=1.0, lw=2, linestyle='--')
    ax.set_ylabel('Total Nucleotide Units'); ax.legend(ncol=2)

    for ax_item in axs: ax_item.grid(True, linestyle=':', linewidth='0.5', color='gray')
    plt.tight_layout(rect=[0, 0, 1, 0.96]); plt.show(); plt.close(fig)

def main_orchestrator(action, num_replicas, steps_per_day):
    BASE_SAVE_DIR = os.path.join('/content/drive/My Drive/Colab Notebooks/Sim_Experiments/')
    os.makedirs(BASE_SAVE_DIR, exist_ok=True)
    all_runs = sorted([d for d in os.listdir(BASE_SAVE_DIR) if d.startswith('Experiment_')])

    experiment_dir = None
    if action == 'fresh_start' or (action == 'resume' and not all_runs):
        if action == 'resume': print("No past experiment found, starting fresh.")
        timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
        experiment_dir = os.path.join(BASE_SAVE_DIR, f"Experiment_{timestamp}")
        os.makedirs(experiment_dir, exist_ok=True)
    else:
        experiment_dir = os.path.join(BASE_SAVE_DIR, all_runs[-1])
    print(f"--- Using experiment: {os.path.basename(experiment_dir)} ---")

    if action in ('plot_only', 'monitor'):
        replica_dirs = sorted([os.path.join(experiment_dir, d) for d in os.listdir(experiment_dir) if d.startswith('Replica_')], key=lambda d: int(d.split('_')[-1]))
        if not replica_dirs: print("No replica data in this experiment."); return
        print(f"--- Plotting all completed replicas in {os.path.basename(experiment_dir)} ---")
        for replica_dir in replica_dirs:
            final_path = os.path.join(replica_dir, "simulation.df.pkl")
            if os.path.exists(final_path):
                print(f"\n--- Plotting {os.path.basename(replica_dir)} ---")
                results_df = pd.read_pickle(final_path)
                plot_simulation_details(results_df, replica_title=f"{os.path.basename(replica_dir)} (Completed)")
            else:
                print(f"\n--- Skipping {os.path.basename(replica_dir)} (Not complete) ---")
        return

    all_summaries = []
    DNA_TEMPLATE = "AUGUGUUACUGG"
    for i in range(1, num_replicas + 1):
        replica_dir = os.path.join(experiment_dir, f"Replica_{i}")
        os.makedirs(replica_dir, exist_ok=True)
        replica_save_base = os.path.join(replica_dir, "simulation.pkl")

        final_df_path = replica_save_base.replace('.pkl', '.df.pkl')
        if os.path.exists(final_df_path):
            try:
                results_df = pd.read_pickle(final_df_path)
                last_row = results_df.iloc[-1]
                final_c, final_r = last_row.get('cautious', 0), last_row.get('readthrough', 0)
                winner = "Cautious" if final_r == 0 else "Reckless" if final_c == 0 else "Tie/Limit"
                summary = {'Winner': winner, 'Duration': last_row['time'], 'Final Cautious': final_c, 'Final Reckless': final_r, 'Replica': i}
                all_summaries.append(summary)
                print(f"--- Replica {i} already completed. Loaded summary. ---")
                continue
            except Exception as e: print(f"Warning: Could not load summary for replica {i}: {e}")

        print(f"\n--- Processing Replica {i}/{num_replicas} ---")
        q_rng = QuantumRandomGenerator(29)
        quantum_seed = q_rng.get_seed()
        sim_rng = np.random.default_rng(seed=quantum_seed)
        shuffle_rng = random.Random(quantum_seed)

        summary, details_df = run_single_simulation(steps_per_day, sim_rng, shuffle_rng, replica_save_base, DNA_TEMPLATE)
        summary['Replica'] = i
        all_summaries.append(summary)
        if not details_df.empty:
            plot_simulation_details(details_df, replica_title=f"Replica {i}")

    print("\n\n" + "="*80 + "\n" + " EXPERIMENT SUMMARY ".center(80, "="))
    if all_summaries:
        summary_df = pd.DataFrame(all_summaries).set_index('Replica')
        print(summary_df.to_string())
    print("="*80)

if __name__ == "__main__":
    CHOSEN_ACTION = 'resume'
    NUM_REPLICAS = 10
    STEPS_PER_DAY = 10
    main_orchestrator(CHOSEN_ACTION, NUM_REPLICAS, STEPS_PER_DAY)

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
--- Using experiment: Experiment_2025-07-04_12-05-44 ---

--- Processing Replica 1/10 ---
--- Resuming simulation from step 500 ---


Simulating Replica:   5%|4         | 500/10950 [00:00<?, ?it/s]

ERROR:root:Internal Python error in the inspect module.
Below is the traceback from this internal error.

ERROR:root:Internal Python error in the inspect module.
Below is the traceback from this internal error.

ERROR:root:Internal Python error in the inspect module.
Below is the traceback from this internal error.



Traceback (most recent call last):
  File "/usr/local/lib/python3.11/dist-packages/IPython/core/interactiveshell.py", line 3553, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "/tmp/ipython-input-32-2941301135.py", line 341, in <cell line: 0>
    main_orchestrator(CHOSEN_ACTION, NUM_REPLICAS, STEPS_PER_DAY)
  File "/tmp/ipython-input-32-2941301135.py", line 325, in main_orchestrator
    summary, details_df = run_single_simulation(steps_per_day, sim_rng, shuffle_rng, replica_save_base, DNA_TEMPLATE)
                          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/tmp/ipython-input-32-2941301135.py", line 193, in run_single_simulation
    colony.step(step_count)
  File "/tmp/ipython-input-32-2941301135.py", line 138, in step
    next_generation = [offspring for p in self.active_population if (offspring := p.step(current_uv)) is not None]
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^