<a href="https://colab.research.google.com/github/javermeire12/Simulations/blob/main/DESI_COSMIC_NETWORK_ULTIMATE_MASTER_Before%20REAL%20DATA.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [6]:
# Position 1: Setup and Imports - MASTER VERSION
# This cell initializes the environment and imports all necessary libraries
# for the complete cosmic network analysis.

print("🚀 COSMIC NETWORK ULTIMATE MASTER - INITIALIZING")
print("="*70)

# Import standard libraries for subprocess execution, system paths, time, and datetime
import subprocess
import sys
import time
from datetime import datetime

# Define a function to install required Python packages
def install_packages():
    """
    Install all required packages for complete analysis.
    Checks if packages are already installed and reports status.
    """
    # List of packages required for the analysis
    packages = [
        'astropy', 'healpy', 'networkx', 'scipy', 'matplotlib',
        'seaborn', 'pandas', 'numpy', 'requests', 'tqdm', 'scikit-learn'
    ]

    print("🔧 Installing packages for ultimate master analysis...")
    # Iterate through the list of packages
    for package in packages:
        try:
            # Use subprocess to run the pip install command for each package
            # sys.executable ensures that the correct python environment's pip is used
            subprocess.check_call([sys.executable, '-m', 'pip', 'install', package])
            print(f"✅ {package}") # Print success message if installation is successful
        except:
            # Print a warning if installation fails, assuming it might already be installed
            print(f"⚠️ {package} already installed or failed")

# Call the function to install packages
install_packages()

# Import all necessary libraries after installation
import numpy as np
import pandas as pd
import networkx as nx
import matplotlib.pyplot as plt
import seaborn as sns
from scipy.spatial.distance import cdist # For calculating distances between points
from scipy.stats import norm, shapiro, kstest # For statistical tests (norm for Z-score conversion)
from scipy.optimize import curve_fit # For fitting scaling functions
import json # For saving results to JSON file
from tqdm import tqdm # For showing progress bars during long loops
import warnings # To manage warnings

# Ignore specific warnings to keep output clean
warnings.filterwarnings('ignore')

# Set a predefined style for matplotlib plots for better aesthetics
plt.style.use('seaborn-v0_8')
# Set a color palette for seaborn plots
sns.set_palette("husl")

# Initialize global variables to track the master analysis time
MASTER_START_TIME = time.time() # Record the start time
MASTER_TIMESTAMP = datetime.now().strftime('%Y-%m-%d_%H-%M-%S') # Create a timestamp

# Print confirmation messages
print(f"✅ All packages imported successfully!")
print(f"📅 Master analysis started: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"🎯 This is the ULTIMATE MASTER VERSION with EVERYTHING included!")
print("="*70)

🚀 COSMIC NETWORK ULTIMATE MASTER - INITIALIZING
🔧 Installing packages for ultimate master analysis...
✅ astropy
✅ healpy
✅ networkx
✅ scipy
✅ matplotlib
✅ seaborn
✅ pandas
✅ numpy
✅ requests
✅ tqdm
✅ scikit-learn
✅ All packages imported successfully!
📅 Master analysis started: 2025-07-02 16:37:49
🎯 This is the ULTIMATE MASTER VERSION with EVERYTHING included!


In [22]:
# Position 2: Statistical Validator Class
# This cell defines the StatisticalValidator class with methods for
# power law fitting and statistical significance calculations.

class StatisticalValidator:
    """
    Provides static methods for statistical calculations, including
    power law fitting and significance testing.
    """

    @staticmethod
    def power_law_fit(x_data, y_data):
        """
        Fit power law: y = A * x^B to data using log-log regression.

        Args:
            x_data (np.ndarray): Array of x-values.
            y_data (np.ndarray): Array of y-values.

        Returns:
            tuple: Fitted parameters (A, B) and R² value, or (None, None, 0.0) if fitting fails.
        """
        # Ensure data are positive for log scale
        # Filter out non-positive values and corresponding y values
        positive_mask = (x_data > 0) & (y_data > 0)
        x_data_pos = x_data[positive_mask]
        y_data_pos = y_data[positive_mask]

        if len(x_data_pos) < 2:
             # Not enough valid data points for fitting
            return None, None, 0.0

        # Use log-log regression
        log_x = np.log10(x_data_pos)
        log_y = np.log10(y_data_pos)

        # Remove any infinite or NaN values that might result from log(0) or invalid data
        valid_mask = np.isfinite(log_x) & np.isfinite(log_y)
        log_x = log_x[valid_mask]
        log_y = log_y[valid_mask]

        # Need at least 2 valid points to fit a line
        if len(log_x) < 2:
            return None, None, 0.0 # Return None if fitting is not possible

        try:
            # Perform linear regression in log space (log(y) = log(A) + B*log(x))
            coeffs = np.polyfit(log_x, log_y, 1)
            slope = coeffs[0]  # This is the exponent B
            intercept = coeffs[1]  # This is log10(A)
            A = 10**intercept # Convert intercept back to A

            # Calculate R² for goodness of fit (on the original scale)
            # Need to use the original x_data (filtered by positive_mask and valid_mask) for prediction
            # and original y_data for comparison.
            original_x_for_r2 = x_data_pos[valid_mask]
            original_y_for_r2 = y_data_pos[valid_mask]


            y_pred = A * (original_x_for_r2 ** slope)
            ss_res = np.sum((original_y_for_r2 - y_pred) ** 2) # Sum of squares of residuals
            ss_tot = np.sum((original_y_for_r2 - np.mean(original_y_for_r2)) ** 2) # Total sum of squares
            r_squared = 1 - (ss_res / ss_tot) if ss_tot > 0 else 0 # Calculate R²

            return A, slope, r_squared # Return fitted parameters and R²
        except Exception as e:
            print(f"Warning: Power law fitting failed - {e}")
            return None, None, 0.0 # Return None if fitting encounters an error


    @staticmethod
    def calculate_z_scores(real_values, random_samples):
        """
        Calculate Z-scores for multiple real measurements against their respective random samples.
        This static method is useful for comparing a set of real values to distributions
        of random values (e.g., multiple radii or bootstrap samples).

        Args:
            real_values (list or np.ndarray): A list or array of real measured values.
            random_samples (list of list/np.ndarray): A list where each element is a list
                                                       or array of random sample values
                                                       corresponding to the real value.

        Returns:
            tuple: A tuple containing two numpy arrays:
                   - z_scores: Array of Z-scores.
                   - p_values: Array of corresponding p-values (two-tailed).
        """
        z_scores = []
        p_values = []

        # Iterate through each real value and its corresponding random sample
        for real_val, random_sample in zip(real_values, random_samples):
            # Ensure random_sample is a numpy array for consistent operations
            random_sample = np.asarray(random_sample)

            if len(random_sample) > 1: # Ensure there are enough random samples to calculate mean/std
                mean_random = np.mean(random_sample)
                std_random = np.std(random_sample)

                if std_random > 0: # Avoid division by zero
                    z = (real_val - mean_random) / std_random # Calculate Z-score
                    # Calculate two-tailed p-value from Z-score using the standard normal CDF
                    p = norm.sf(abs(z)) * 2
                else:
                    # If std dev is zero, the real value is either identical to the single sample (p=1, z=0)
                    # or different (p=0, z=inf). Assuming difference is significant here.
                    z = float('inf') if real_val != random_sample[0] else 0.0
                    p = 0.0 if real_val != random_sample[0] else 1.0

            else:
                # If only one or zero random samples, we cannot reliably calculate Z-score/p-value.
                # Returning inf significance if there's a real value to compare, else 0.
                z = float('inf') if random_sample.size == 1 and real_val != random_sample[0] else 0.0
                p = 0.0 if random_sample.size == 1 and real_val != random_sample[0] else 1.0


            z_scores.append(z)
            p_values.append(p)

        return np.array(z_scores), np.array(p_values) # Return results as NumPy arrays

    @staticmethod
    def combined_significance(z_scores):
        """
        Calculate combined statistical significance across multiple Z-scores.
        Uses Fisher's method to combine p-values derived from Z-scores.

        Args:
            z_scores (np.ndarray): An array of Z-scores from independent tests.

        Returns:
            tuple: A tuple containing the combined Z-score and combined p-value.
        """
        # Ensure z_scores is a numpy array
        z_scores = np.asarray(z_scores)

        # Filter out non-finite Z-scores and Z-scores close to zero (p-value close to 1)
        # A Z-score of 0 gives a p-value of 1, log(1) is 0, which doesn't contribute to chi-squared.
        # So we can filter these out to avoid potential issues with floating point near 1.0.
        valid_z = z_scores[np.isfinite(z_scores) & (np.abs(z_scores) > 1e-9)]


        # If no valid Z-scores, return zero significance
        if len(valid_z) == 0:
            # If all Z-scores were non-significant or invalid
            # Consider combined p-value as 1.0 (no overall significance)
            return 0.0, 1.0

        # Convert Z-scores to p-values (two-tailed)
        # Avoid p-values exactly equal to 0 or 1 which cause issues with log.
        p_values = norm.sf(np.abs(valid_z)) * 2
        # Clamp p-values to a small range to avoid log(0) or log(close to 1) issues
        p_values = np.clip(p_values, 1e-300, 1.0 - 1e-15)


        # Calculate Fisher's chi-squared statistic
        # -2 * sum(log(p_i)) follows a chi-squared distribution
        chi_squared = -2 * np.sum(np.log(p_values))
        # Degrees of freedom for the chi-squared distribution is twice the number of tests
        degrees_freedom = 2 * len(p_values)

        # Calculate the combined p-value using the chi-squared distribution
        from scipy.stats import chi2
        # chi2.cdf gives the cumulative distribution function (P(X <= x)).
        # We want the probability of getting a chi-squared value as extreme or more extreme (the tail).
        combined_p = 1 - chi2.cdf(chi_squared, degrees_freedom)

        # Convert the combined p-value back to an equivalent Z-score
        # Use norm.ppf (percent point function - inverse of CDF)
        # Since the combined test is typically non-directional, we use the two-tailed p-value
        # and find the Z-score corresponding to combined_p / 2 in the upper tail.
        if combined_p > 0 and combined_p < 1:
             # norm.ppf(1 - tail_probability) gives the Z-score
            combined_z = norm.ppf(1 - combined_p/2)
        elif combined_p <= 0: # Extremely small p-value (very significant)
             combined_z = float('inf') # Corresponds to infinite Z-score
        else: # p-value close to 1 (not significant)
             combined_z = 0.0 # Corresponds to a Z-score of 0

        # Return the absolute value of the combined Z-score, as it represents significance level
        # Direction isn't typically combined in Fisher's method unless specifically handled.
        combined_z_significance = abs(combined_z) if np.isfinite(combined_z) else combined_z

        return combined_z_significance, combined_p # Return combined Z and P values


print("✅ StatisticalValidator class ready!")

✅ StatisticalValidator class ready!


In [23]:
# Position 3: CosmicNetworkAnalyzer - Complete Class (Base)
# This cell defines the base class for analyzing cosmic networks.
# It includes methods for building networks and calculating core efficiency metrics.

class CosmicNetworkAnalyzer:
    """
    Ultimate cosmic network analyzer with all features.
    Provides methods to build networks from spatial positions and calculate metrics.
    """

    # The __init__ method initializes the analyzer with a linking radius.
    def __init__(self, radius=20.0):
        """
        Initializes the CosmicNetworkAnalyzer.
        Args:
            radius (float): The linking radius used to define edges between objects.
        """
        # Set the linking radius
        self.radius = radius
        # Dictionary to store analysis results (can be used by derived classes)
        self.results = {}
        # Attributes to store the last built network and random comparison networks
        self.last_real_network = None
        self.last_random_networks = []

    def build_network(self, positions, weight_function='inverse_square'):
        """
        Build a network (Graph) from 3D object positions using a specified linking radius.
        Edges are created between objects closer than the radius. Edge weights can be customized.

        Args:
            positions (numpy.ndarray): An array of shape (n_objects, 3) with object positions.
            weight_function (str): The function used to calculate edge weights ('inverse_square', 'inverse_linear', 'exponential', or None for unweighted).

        Returns:
            networkx.Graph: The built network graph.
        """
        print(f"   Building network (radius={self.radius}, weight={weight_function})...")

        # Calculate the pairwise Euclidean distances between all objects' positions
        distances = cdist(positions, positions)
        # Initialize an empty NetworkX graph
        G = nx.Graph()
        # Add nodes to the graph, one node for each object
        G.add_nodes_from(range(len(positions)))

        # Iterate through all pairs of objects to add edges
        # Use tqdm for a progress bar
        for i in tqdm(range(len(positions)), desc="   Adding edges"):
            for j in range(i+1, len(positions)): # Avoid self-loops and duplicate edges
                dist = distances[i, j] # Get the distance between object i and object j
                # Check if the distance is greater than 0 (not the same object) and within the linking radius
                if 0 < dist <= self.radius:

                    # Calculate the edge weight based on the specified function
                    if weight_function == 'inverse_square':
                        weight = 1.0 / (dist * dist)  # INVERSE SQUARE (common in physics)
                    elif weight_function == 'inverse_linear':
                        weight = 1.0 / (1.0 + dist) # Inverse linear (adding 1 to avoid division by zero if radius can be 0)
                    elif weight_function == 'exponential':
                        weight = np.exp(-dist / 5.0) # Exponential decay (distance scale of 5.0)
                    else:
                        weight = 1.0 / (dist * dist)  # Default to inverse square if function not recognized

                    # Add an edge between nodes i and j with the calculated weight
                    G.add_edge(i, j, weight=weight)

        # Print summary of the built network
        print(f"   ✅ Built: {G.number_of_nodes()} nodes, {G.number_of_edges()} edges")
        # Return the built graph
        return G

    def calculate_efficiency_metrics(self, G):
        """
        Calculate comprehensive network efficiency metrics for a given graph.
        Includes clustering, path length, diameter, density, betweenness, and flow capacity.

        Args:
            G (networkx.Graph): The input graph.

        Returns:
            dict: A dictionary containing the calculated metrics. Returns zeros if graph is empty or has no edges.
        """
        # Check if the graph has nodes and edges to avoid errors
        if G.number_of_nodes() == 0 or G.number_of_edges() == 0:
            # Return default zero values if the graph is empty
            return {'clustering': 0.0, 'path_length': 0.0, 'diameter': 0.0,
                    'density': 0.0, 'betweenness': 0.0, 'flow_capacity': 0.0}

        metrics = {} # Dictionary to store calculated metrics

        # --- Core network metrics ---
        try:
            # Calculate average clustering coefficient (measures local cliquishness)
            # Uses the 'weight' attribute if present
            metrics['clustering'] = nx.average_clustering(G, weight='weight', count_zeros=True)
        except:
            metrics['clustering'] = 0.0 # Handle potential errors

        try:
            # Calculate average shortest path length and diameter (measures global efficiency and reach)
            # These are only meaningful for connected graphs. If not connected, analyze the largest component.
            if nx.is_connected(G):
                metrics['path_length'] = nx.average_shortest_path_length(G)
                metrics['diameter'] = nx.diameter(G)
            else:
                # Find the largest connected component
                largest_cc = max(nx.connected_components(G), key=len)
                # Create a subgraph of the largest connected component
                subgraph = G.subgraph(largest_cc)
                # Calculate metrics on the largest connected component
                metrics['path_length'] = nx.average_shortest_path_length(subgraph)
                metrics['diameter'] = nx.diameter(subgraph)
        except:
            # Handle potential errors (e.g., component with only one node)
            metrics['path_length'] = 0.0
            metrics['diameter'] = 0.0

        # --- Network density and flow ---
        n_nodes = G.number_of_nodes()
        # Calculate the maximum possible number of edges in an undirected graph
        max_edges = n_nodes * (n_nodes - 1) / 2
        # Calculate density (ratio of actual edges to maximum possible edges)
        metrics['density'] = G.number_of_edges() / max_edges if max_edges > 0 else 0

        try:
            # Calculate betweenness centrality (measures how often a node is on shortest paths)
            # Calculate the average betweenness centrality across all nodes
            betweenness = nx.betweenness_centrality(G, weight='weight')
            metrics['betweenness'] = np.mean(list(betweenness.values()))
        except:
            metrics['betweenness'] = 0.0 # Handle potential errors

        try:
            # Calculate flow capacity (can be related to average degree)
            # Here, it's approximated by the average node degree
            degrees = [d for n, d in G.degree()]
            metrics['flow_capacity'] = np.mean(degrees)
        except:
            metrics['flow_capacity'] = 0.0 # Handle potential errors

        # Return the dictionary of calculated metrics
        return metrics

print("✅ CosmicNetworkAnalyzer class ready!")

✅ CosmicNetworkAnalyzer class ready!


In [24]:
# Position 4: CosmicNetworkAnalyzer - Complete Analysis Methods
# This cell extends the base CosmicNetworkAnalyzer class with advanced analysis methods,
# including single-radius analysis, multi-radius analysis, and bootstrap validation.

class CosmicNetworkAnalyzerComplete(CosmicNetworkAnalyzer):
    """
    Extended analyzer with all analysis methods.
    Inherits from CosmicNetworkAnalyzer and adds methods for statistical comparison
    against random networks and analysis across multiple linking radii.
    """

    def analyze_at_radius(self, galaxy_positions, radius, n_random=25):
        """
        Analyze the cosmic network at a specific linking radius.
        Compares the real network metrics against an ensemble of random networks
        generated within the same volume and number of nodes.

        Args:
            galaxy_positions (numpy.ndarray): 3D positions of galaxies.
            radius (float): The linking radius for network construction.
            n_random (int): Number of random networks to generate for comparison.

        Returns:
            dict: A dictionary containing the analysis results for this radius,
                  including real and random network statistics and significance metrics.
        """
        # Set the radius for this analysis run
        self.radius = radius
        print(f"\n🔬 Analyzing at radius = {radius}")

        # --- Build and analyze the real network ---
        real_net = self.build_network(galaxy_positions) # Build the network from actual galaxy positions
        real_edges = real_net.number_of_edges() # Get the number of edges in the real network
        real_metrics = self.calculate_efficiency_metrics(real_net) # Calculate metrics for the real network

        # Store the real network for potential future visualization or analysis
        self.last_real_network = real_net

        # --- Test against random networks ---
        print(f"   Testing vs {n_random} random networks...")
        random_edges = [] # List to store edge counts from random networks
        random_metrics = [] # List to store metrics from random networks
        self.last_random_networks = [] # List to store random networks (for visualization)

        # Loop to generate and analyze random networks
        for i in tqdm(range(n_random), desc="   Random tests"):
            # Generate random positions uniformly within the bounding box of the real galaxy positions
            random_pos = np.random.uniform(
                galaxy_positions.min(axis=0), # Minimum coordinates of the real sample
                galaxy_positions.max(axis=0), # Maximum coordinates of the real sample
                galaxy_positions.shape       # Same shape as the real sample (same number of objects)
            )
            # Build a network from the random positions
            random_net = self.build_network(random_pos)
            # Store the number of edges
            random_edges.append(random_net.number_of_edges())
            # Calculate and store the metrics
            random_metrics.append(self.calculate_efficiency_metrics(random_net))
            # Store the first few random networks for detailed inspection if needed
            if i < 5:
                self.last_random_networks.append(random_net)

        # --- Calculate comprehensive statistics for comparison ---
        random_mean = np.mean(random_edges) # Mean number of edges in random networks
        random_std = np.std(random_edges) # Standard deviation of edge counts in random networks
        # Calculate the ratio of real edges to the mean random edges
        ratio = real_edges / random_mean if random_mean > 0 else float('inf')

        # Calculate the Z-score (number of standard deviations the real value is from the random mean)
        z_score = (real_edges - random_mean) / random_std if random_std > 0 else float('inf')

        # Calculate statistical significance (p-value and sigma level)
        from scipy import stats
        # Calculate p-value using the standard normal distribution (two-tailed test)
        p_value = stats.norm.sf(abs(z_score)) * 2
        # Sigma level is the absolute value of the Z-score
        sigma_level = abs(z_score)

        # Compile results into a dictionary
        result = {
            'radius': radius,
            'real_edges': real_edges,
            'random_mean': random_mean,
            'random_std': random_std,
            'random_edges': random_edges, # Store all random edge counts
            'ratio': ratio,
            'z_score': z_score,
            'p_value': p_value,
            'sigma_level': sigma_level,
            'real_metrics': real_metrics,
            'random_metrics': random_metrics # Store all random metrics
        }

        # Store the results for this radius in the analyzer's results dictionary
        self.results[radius] = result

        # Print a summary of the results for this radius
        print(f"   ✅ Real: {real_edges} edges, Random: {random_mean:.1f}±{random_std:.1f}")
        print(f"   📊 Ratio: {ratio:.2f}x, Z-score: {z_score:.1f}σ, p={p_value:.2e}")

        # Return the results dictionary for this radius
        return result

    def multi_radius_analysis(self, galaxy_positions, radii=[15, 20, 25, 30, 35, 40], n_random=25):
        """
        Perform network analysis across a list of specified radii.
        Calls analyze_at_radius for each radius and compiles the results.

        Args:
            galaxy_positions (numpy.ndarray): 3D positions of galaxies.
            radii (list): A list of linking radii to analyze.
            n_random (int): Number of random networks to generate per radius for comparison.

        Returns:
            list: A list of result dictionaries, one for each radius analyzed.
        """
        print(f"\n🌌 MULTI-RADIUS COSMIC NETWORK ANALYSIS")
        print(f"   Radii: {radii}")
        print(f"   Random trials per radius: {n_random}")
        print("="*60)

        all_results = [] # List to store results from all radii

        # Loop through each specified radius
        for radius in radii:
            # Call the analyze_at_radius method for the current radius
            result = self.analyze_at_radius(galaxy_positions, radius, n_random)
            # Append the result to the list
            all_results.append(result)

        # Print a summary table of the multi-radius results
        print(f"\n📈 MULTI-RADIUS SUMMARY:")
        print(f"{'Radius':<8} {'Real':<8} {'Random':<10} {'Ratio':<8} {'Z-score':<8}")
        print("-" * 50)

        for result in all_results:
            print(f"{result['radius']:<8} {result['real_edges']:<8} "
                  f"{result['random_mean']:<10.1f} {result['ratio']:<8.2f} "
                  f"{result['z_score']:<8.1f}")

        # Return the list of results for all radii
        return all_results

    def bootstrap_validation(self, galaxy_positions, radius=25, n_bootstrap=100, n_random=25):
        """
        Perform bootstrap validation of network analysis at a specific radius.
        Analyzes multiple bootstrap samples of the galaxy positions and calculates
        statistics (mean and std dev) of the results (e.g., ratio, Z-score).

        Args:
            galaxy_positions (numpy.ndarray): 3D positions of galaxies.
            radius (float): The linking radius for network construction.
            n_bootstrap (int): Number of bootstrap samples to analyze.
            n_random (int): Number of random networks per bootstrap sample for comparison.

        Returns:
            dict: A dictionary containing statistics about the bootstrap results.
        """
        print(f"\n🔄 Bootstrap Validation (radius={radius})")
        print(f"   Bootstrap samples: {n_bootstrap}")
        print(f"   Random trials per sample: {n_random}")

        bootstrap_results = [] # List to store results from each bootstrap sample
        n_galaxies = len(galaxy_positions) # Total number of galaxies

        # Loop to generate and analyze bootstrap samples
        for i in tqdm(range(n_bootstrap), desc="Bootstrap"):
            # Create a bootstrap sample by sampling galaxy indices with replacement
            indices = np.random.choice(n_galaxies, n_galaxies, replace=True)
            bootstrap_sample = galaxy_positions[indices] # Get positions for the bootstrap sample

            # Analyze the bootstrap sample at the specified radius
            result = self.analyze_at_radius(bootstrap_sample, radius, n_random)
            # Append the result to the bootstrap results list
            bootstrap_results.append(result)

        # --- Calculate statistics on bootstrap results ---
        # Extract ratios and Z-scores from all bootstrap results
        ratios = [r['ratio'] for r in bootstrap_results]
        z_scores = [r['z_score'] for r in bootstrap_results]

        # Calculate mean and standard deviation for ratios and Z-scores
        print(f"   Bootstrap ratio: {np.mean(ratios):.2f} ± {np.std(ratios):.2f}")
        print(f"   Bootstrap Z-score: {np.mean(z_scores):.1f} ± {np.std(z_scores):.1f}")
        # Calculate the 95% confidence interval for the ratio
        print(f"   95% CI ratio: [{np.percentile(ratios, 2.5):.2f}, {np.percentile(ratios, 97.5):.2f}]")

        # Compile and return the bootstrap statistics
        return {
            'bootstrap_results': bootstrap_results, # Store all individual bootstrap results
            'ratio_mean': np.mean(ratios),
            'ratio_std': np.std(ratios),
            'ratio_ci': [np.percentile(ratios, 2.5), np.percentile(ratios, 97.5)], # 95% CI
            'z_score_mean': np.mean(z_scores),
            'z_score_std': np.std(z_scores)
        }

print("✅ CosmicNetworkAnalyzerComplete class ready with all analysis methods!")

✅ CosmicNetworkAnalyzerComplete class ready with all analysis methods!


In [28]:
# Position 5: Cosmic Visualizer Class
# This cell defines a class with static methods for creating publication-quality visualizations.
# It focuses on plotting the results of the multi-radius analysis and network structure.

class CosmicVisualizer:
    """
    Publication-quality visualization for cosmic network analysis.
    Contains static methods to generate various plots summarizing the analysis results.
    """

    @staticmethod
    def plot_scaling_analysis(results, save_path=None):
        """
        Create a comprehensive scaling analysis plot with multiple subplots.
        Visualizes edge counts, efficiency ratio, statistical significance vs radius, and a summary.

        Args:
            results (list): A list of dictionaries, results from multi_radius_analysis.
            save_path (str, optional): Path to save the plot image file. Defaults to None.

        Returns:
            matplotlib.figure.Figure: The generated matplotlib Figure object.
        """
        # Extract data needed for plotting from the results list
        radii = [r['radius'] for r in results] # List of radii
        real_edges = [r['real_edges'] for r in results] # Real edge counts
        random_means = [r['random_mean'] for r in results] # Mean random edge counts
        z_scores = [r['z_score'] for r in results] # Z-scores

        # Create a figure with 2 rows and 2 columns for the subplots
        fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(16, 12))

        # --- Plot 1: Edge counts vs radius (Log-log scale) ---
        ax1.loglog(radii, real_edges, 'bo-', linewidth=2, markersize=8, label='Real Universe', alpha=0.8)
        ax1.loglog(radii, random_means, 'rs-', linewidth=2, markersize=8, label='Random Networks', alpha=0.8)

        # Fit power laws to the edge counts vs radius data
        # Use the StatisticalValidator.power_law_fit static method
        real_A, real_exp, real_r2 = StatisticalValidator.power_law_fit(np.array(radii), np.array(real_edges))
        random_A, random_exp, random_r2 = StatisticalValidator.power_law_fit(np.array(radii), np.array(random_means))

        # Add fitted power law lines to the plot if fitting was successful
        if real_A is not None and real_exp is not None:
            fit_x = np.linspace(min(radii), max(radii), 100) # Create x values for the fitted line
            ax1.loglog(fit_x, real_A * (fit_x ** real_exp), 'b--', alpha=0.7,
                       label=f'Real: r^{real_exp:.2f} (R²={real_r2:.3f})')

        if random_A is not None and random_exp is not None:
            fit_x = np.linspace(min(radii), max(radii), 100)
            ax1.loglog(fit_x, random_A * (fit_x ** random_exp), 'r--', alpha=0.7,
                       label=f'Random: r^{random_exp:.2f} (R²={random_r2:.3f})')

        # Set labels, title, legend, and grid for Plot 1
        ax1.set_xlabel('Radius', fontsize=12)
        ax1.set_ylabel('Edge Count', fontsize=12)
        ax1.set_title('Cosmic Network Scaling Laws', fontsize=14, fontweight='bold')
        ax1.legend(fontsize=10)
        ax1.grid(True, alpha=0.3)

        # --- Plot 2: Network efficiency ratio vs radius (Semi-log scale for X-axis) ---
        efficiency_ratios = [r['ratio'] for r in results]
        ax2.semilogx(radii, efficiency_ratios, 'go-', linewidth=3, markersize=10, alpha=0.8)
        ax2.axhline(y=1, color='red', linestyle='--', alpha=0.7, label='Random baseline') # Add a line at ratio = 1 for reference
        ax2.set_xlabel('Radius', fontsize=12)
        ax2.set_ylabel('Real/Random Ratio', fontsize=12)
        ax2.set_title('Network Efficiency Advantage', fontsize=14, fontweight='bold')
        ax2.legend(fontsize=10)
        ax2.grid(True, alpha=0.3)

        # Add power law fit to the ratio data
        ratio_A, ratio_exp, ratio_r2 = StatisticalValidator.power_law_fit(np.array(radii), np.array(efficiency_ratios))
        if ratio_A is not None and ratio_exp is not None:
            fit_x = np.linspace(min(radii), max(radii), 100)
            # Note: This fit is plotted on a semi-log x scale, so the fitted function needs log-x input if fitting was on log-log.
            # Since power_law_fit uses log-log, the function form y=A*x^exp is correct for linear/log plotting.
            ax2.semilogx(fit_x, ratio_A * (fit_x ** ratio_exp), 'g--', alpha=0.7,
                        label=f'Fit: r^{ratio_exp:.2f} (R²={ratio_r2:.3f})')
            ax2.legend(fontsize=10)

        # --- Plot 3: Statistical significance vs radius (Semi-log scale for Y-axis) ---
        # Plot the absolute Z-scores to show significance level
        ax3.semilogy(radii, np.abs(z_scores), 'mo-', linewidth=3, markersize=10, alpha=0.8)
        # Add horizontal lines for common significance levels (e.g., 1σ, 3σ, 5σ)
        significance_levels = [1, 2, 3, 4, 5, 10]
        colors = ['yellow', 'orange', 'red', 'purple', 'blue', 'black']
        for level, color in zip(significance_levels, colors):
            ax3.axhline(y=level, color=color, linestyle='--', alpha=0.6, label=f'{level}σ')

        ax3.set_xlabel('Radius', fontsize=12)
        ax3.set_ylabel('Statistical Significance (|Z-score|)', fontsize=12)
        ax3.set_title('Detection Confidence', fontsize=14, fontweight='bold')
        ax3.legend(fontsize=8, ncol=2) # Use two columns for legend
        ax3.grid(True, alpha=0.3)

        # --- Plot 4: Summary metrics and interpretation (Text box) ---
        ax4.axis('off') # Turn off axes for this subplot to use it for text

        # Calculate summary statistics for the text box
        mean_ratio = np.mean(efficiency_ratios)
        mean_z = np.mean(np.abs(z_scores))
        # Calculate combined significance using the static method from StatisticalValidator
        combined_z, combined_p = StatisticalValidator.combined_significance(np.array(z_scores))

        # Format the summary text content
        summary_text = f"""
        🌌 COSMIC NETWORK ANALYSIS SUMMARY

        📊 Scaling Laws:
        • Real Universe: E ∝ r^{real_exp:.2f} (R² = {real_r2:.3f})
        • Random Networks: E ∝ r^{random_exp:.2f} (R² = {random_r2:.3f})
        • Efficiency Ratio: {ratio_A:.1f} × r^{ratio_exp:.2f}

        🎯 Performance:
        • Mean efficiency advantage: {mean_ratio:.1f}×
        • Mean significance: {mean_z:.1f}σ
        • Combined significance: {combined_z:.1f}σ
        • Combined p-value: {combined_p:.2e}

        🚀 Interpretation:
        Real universe shows {real_exp:.2f} scaling (holographic)
        vs random {random_exp:.2f} scaling (volume-like)

        Evidence for cosmic network optimization!
        """

        # Add the summary text as an annotation within the subplot
        ax4.text(0.05, 0.95, summary_text.strip(), transform=ax4.transAxes, fontsize=11,
                verticalalignment='top', fontfamily='monospace',
                bbox=dict(boxstyle='round,pad=0.5', facecolor='lightblue', alpha=0.8)) # Add a background box

        # Adjust layout to prevent overlapping elements
        plt.tight_layout()

        # Save the figure if a save path is provided
        if save_path:
            plt.savefig(save_path, dpi=300, bbox_inches='tight') # Save with high resolution and tight bounding box
            print(f"📊 Plot saved to: {save_path}")

        # Display the plot
        plt.show()

        # Return the figure object
        return fig

    @staticmethod
    def plot_network_structure(galaxy_positions, network, title="Cosmic Network Structure", sample_size=500):
        """
        Visualize network structure in 3D (positions), 2D projection with edges, and degree distribution.
        Useful for qualitative inspection of the network build process.

        Args:
            galaxy_positions (numpy.ndarray): 3D positions of objects.
            network (networkx.Graph): The graph representing the network.
            title (str, optional): Title for the overall plot. Defaults to "Cosmic Network Structure".
            sample_size (int, optional): Number of nodes to sample for visualization if the network is too large.
        """
        print(f"📊 Visualizing network structure for '{title}'...")
        # Sample positions and network if the total number of nodes exceeds sample_size
        if len(galaxy_positions) > sample_size:
            # Randomly select indices without replacement
            indices = np.random.choice(len(galaxy_positions), sample_size, replace=False)
            sample_pos = galaxy_positions[indices] # Get positions of sampled nodes
            # Create a subgraph containing only the sampled nodes and their induced edges
            sample_network = network.subgraph(indices)
            print(f"   Visualizing a sample of {sample_size} nodes.")
        else:
            sample_pos = galaxy_positions
            sample_network = network

        # Create a figure with 1 row and 3 columns for the subplots
        fig = plt.figure(figsize=(15, 5))

        # --- 3D scatter plot of sampled galaxy positions ---
        # Add a 3D subplot (1st column)
        ax1 = fig.add_subplot(131, projection='3d')
        ax1.scatter(sample_pos[:, 0], sample_pos[:, 1], sample_pos[:, 2],
                   c='blue', alpha=0.6, s=20) # Scatter plot in 3D
        ax1.set_xlabel('X')
        ax1.set_ylabel('Y')
        ax1.set_zlabel('Z')
        ax1.set_title('Galaxy Positions')

        # --- Network connectivity (2D projection with edges) ---
        # Add a 2D subplot (2nd column)
        ax2 = fig.add_subplot(132)
        ax2.scatter(sample_pos[:, 0], sample_pos[:, 1], c='blue', alpha=0.6, s=20) # Scatter plot of positions

        # Draw edges between connected nodes in the sampled network (limit number of edges for clarity)
        # Iterate through a limited number of edges (e.g., first 1000)
        for edge in list(sample_network.edges())[:1000]:
            # Ensure the node indices exist in the sampled positions array
            # (Subgraph might have nodes with original indices)
            if edge[0] in sample_network.nodes() and edge[1] in sample_network.nodes():
                 # Need to map subgraph node indices back to original indices if needed, but here sample_network nodes
                 # retain original indices if created via subgraph. Just need to ensure these indices are within sample_pos bounds.
                 # A safer way might be to get positions based on sampled indices mapping:
                 # node1_idx_in_sample = list(sample_network.nodes()).index(edge[0]) # Find index in the sampled list
                 # node2_idx_in_sample = list(sample_network.nodes()).index(edge[1])
                 # x_coords = [sample_pos[node1_idx_in_sample, 0], sample_pos[node2_idx_in_sample, 0]] ...
                 # However, if sample_network nodes retain original indices, direct access works if indices were from range(N)
                 # and sample_pos is just galaxy_positions[indices]. Let's stick to the simpler direct access, assuming indices are original.
                 if edge[0] < len(galaxy_positions) and edge[1] < len(galaxy_positions):
                     # Find the actual index of the original node in the sampled list
                     try:
                         idx1 = np.where(indices == edge[0])[0][0]
                         idx2 = np.where(indices == edge[1])[0][0]
                         x_coords = [sample_pos[idx1, 0], sample_pos[idx2, 0]]
                         y_coords = [sample_pos[idx1, 1], sample_pos[idx2, 1]]
                         ax2.plot(x_coords, y_coords, 'r-', alpha=0.3, linewidth=0.5)
                     except IndexError:
                         # Skip if node index not found in sampled indices (shouldn't happen with subgraph)
                         pass


        ax2.set_xlabel('X Position')
        ax2.set_ylabel('Y Position')
        ax2.set_title('Network Connections (X-Y)')

        # --- Degree distribution ---
        # Add a 2D subplot (3rd column)
        ax3 = fig.add_subplot(133)
        # Get the degree of each node in the sampled network
        degrees = [sample_network.degree(n) for n in sample_network.nodes()]
        ax3.hist(degrees, bins=20, alpha=0.7, color='green') # Plot histogram of degrees
        ax3.set_xlabel('Node Degree')
        ax3.set_ylabel('Frequency')
        ax3.set_title('Degree Distribution')
        ax3.set_yscale('log') # Use log scale for the y-axis (frequency)

        # Add a main title to the entire figure
        plt.suptitle(title, fontsize=16, fontweight='bold')
        # Adjust layout
        plt.tight_layout()
        # Display the plot
        plt.show()

        # Return the figure object
        return fig


print("✅ CosmicVisualizer class ready!")

✅ CosmicVisualizer class ready!


In [32]:
# Position 6: Data Loading - DESI Clustering Catalog (using astropy.io.fits)
# This cell attempts to load DESI galaxy/quasar clustering data from a local FITS file.
# It assumes you have uploaded a FITS file containing the catalog.

# Import necessary libraries
from astropy.io import fits
import numpy as np
import os # Import os to check file existence

# --- Specify the path to your local FITS file ---
# IMPORTANT: Replace 'path/to/your/clustering_catalog.fits' with the actual path to one of your uploaded FITS files.
# Examples:
# '/content/LRG_SGC_clustering.dat.fits'
# '/content/QSO_SGC_clustering.dat.fits'
# '/content/LRG_NGC_clustering.dat.fits'
# '/content/QSO_NGC_clustering.dat.fits'
filename = 'path/to/your/clustering_catalog.fits' # <--- *** REPLACE THIS PATH ***

print(f"Attempting to load data from FITS file: {filename}")

# Check if the file exists before trying to open it
if not os.path.exists(filename):
    print(f"\n❌ Error: The file '{filename}' was not found.")
    print("Please ensure the file is uploaded to your Colab environment and the 'filename' variable points to the correct path.")
    # It's safer to initialize galaxy_positions here as None if the file is not found
    # This avoids potential NameErrors later if the 'else' block is skipped.
    galaxy_positions = None
else:
    # Declare galaxy_positions as global at the beginning of the block where it's modified
    global galaxy_positions # Ensure we are modifying the global variable
    try:
        # Open the FITS file
        # hdul is a list of HDUs (Header Data Units) in the FITS file
        hdul = fits.open(filename)

        # Assuming the main data table is in the first extension (often the second HDU, index 1)
        # You might need to inspect the HDUs of your specific FITS file to confirm the correct index.
        # You can print hdul.info() to see the structure of the FITS file.
        print("\nFITS file info:")
        hdul.info()

        # Try to access data from the first extension (index 1)
        if len(hdul) > 1:
            data = hdul[1].data # Extract the data from the first extension
            print("\n✅ FITS data loaded successfully from extension 1!")
            print(f"Data type: {type(data)}")
            print(f"Number of rows loaded: {len(data)}")

            # Print the column names to help identify position columns
            print("\nAvailable columns (from FITS header):", data.dtype.names)

            # --- Extract galaxy positions ---
            # The exact column names for positions (e.g., 'RA', 'DEC', 'Z' or 'X', 'Y', 'Z')
            # will depend on the specific FITS file structure.
            # You need to verify the actual column names in your FITS file (printed above).
            # Common for clustering catalogs are 'RA', 'DEC', 'Z' (redshift).
            # We need to convert RA, DEC, Z to 3D Cartesian coordinates if necessary.

            # --- Example: Assuming positions are in 'RA', 'DEC', 'Z' columns ---
            # You will likely need to adapt this based on your file's structure and required coordinate system.
            galaxy_positions_from_fits = None

            if 'RA' in data.dtype.names and 'DEC' in data.dtype.names and 'Z' in data.dtype.names:
                 print("\nFound 'RA', 'DEC', 'Z' columns. Assuming these are celestial coordinates + redshift.")
                 # You would typically convert these to Cartesian coordinates (X, Y, Z) for distance calculations.
                 # This requires cosmological calculations (e.g., using astropy.cosmology).
                 # For simplicity *for now*, let's just use RA, DEC, Z as if they were Cartesian for demonstration,
                 # but be aware this is not physically accurate for distance in cosmology.
                 # A proper conversion would involve:
                 # from astropy.cosmology import FlatLambdaCDM
                 # cosmo = FlatLambdaCDM(H0=70, Om0=0.3) # Example cosmology - adjust as needed
                 # dist = cosmo.comoving_distance(data['Z']).value # Comoving distance from redshift
                 # from astropy.coordinates import SkyCoord
                 # from astropy import units as u
                 # coords = SkyCoord(ra=data['RA']*u.deg, dec=data['DEC']*u.deg, distance=dist*u.Mpc)
                 # cart_coords = coords.cartesian
                 # galaxy_positions_from_fits = np.vstack([cart_coords.x.value, cart_coords.y.value, cart_coords.z.value]).T
                 # print("Converted RA, DEC, Z to Cartesian coordinates.")

                 # --- Using RA, DEC, Z directly as proxy for 3D positions (NOT PHYSICALLY ACCURATE FOR DISTANCE) ---
                 # This is a placeholder. You MUST implement proper cosmological distance conversion for accurate analysis.
                 print("⚠️ Using RA, DEC, Z directly as proxy for 3D positions. IMPLEMENT PROPER COSMOLOGICAL DISTANCE CONVERSION.")
                 galaxy_positions_from_fits = np.vstack([data['RA'], data['DEC'], data['Z']]).T


            elif 'x' in data.dtype.names and 'y' in data.dtype.names and 'z' in data.dtype.names:
                 # If positions are already in Cartesian 'x', 'y', 'z'
                 galaxy_positions_from_fits = np.vstack([data['x'], data['y'], data['z']]).T
                 print("\nExtracted positions from 'x', 'y', 'z' columns.")
            elif 'pos' in data.dtype.names and len(data['pos'].shape) == 2 and data['pos'].shape[1] == 3:
                 # If positions are stored directly in a 'pos' column of shape (N, 3)
                 galaxy_positions_from_fits = data['pos']
                 print("\nExtracted positions from 'pos' column.")
            else:
                print("\n⚠️ Could not automatically identify position columns ('RA, DEC, Z', 'pos', or 'x','y','z').")
                print("Please inspect the 'Available columns' list above and manually update the code to extract and potentially convert positions.")
                galaxy_positions_from_fits = None # Set to None if positions couldn't be extracted automatically


            # --- Update galaxy_positions for the analysis pipeline ---
            if galaxy_positions_from_fits is not None:
                 # Ensure the extracted data is a numpy array of the correct shape (N, 3)
                 galaxy_positions_from_fits = np.asarray(galaxy_positions_from_fits)
                 if galaxy_positions_from_fits.ndim == 2 and galaxy_positions_from_fits.shape[-1] == 3:
                     print(f"Extracted {len(galaxy_positions_from_fits)} galaxy positions.")
                     # Assign to the global galaxy_positions variable
                     galaxy_positions = galaxy_positions_from_fits
                     print("\n✅ Updated galaxy_positions with loaded FITS data.")
                 else:
                     print("\n⚠️ Extracted data is not a 2D array with shape (N, 3). Skipping replacement of galaxy_positions.")
                     print(f"   Shape of extracted data: {galaxy_positions_from_fits.shape}")
                     galaxy_positions = None # Set to None if data shape is incorrect
            else:
                 print("\n⚠️ Skipping update of galaxy_positions as positions could not be extracted or were in incorrect format.")


        # Close the FITS file
        hdul.close()
        print("✅ FITS file closed.")

    except FileNotFoundError:
        print(f"\n❌ Error: The file '{filename}' was not found.")
        print("Please ensure the file is uploaded to your Colab environment and the 'filename' variable points to the correct path.")
        galaxy_positions = None # Set to None if file not found
    except Exception as e:
        print(f"\n❌ An error occurred while loading or processing the FITS data: {e}")
        print("Please verify the file format and the code for accessing the data table and columns.")
        galaxy_positions = None # Set to None if loading fails

# Final check
if galaxy_positions is None:
    print("\nTask failed: Galaxy positions could not be loaded from the FITS file.")
else:
     print(f"\nSuccessfully loaded/updated galaxy_positions with {len(galaxy_positions)} objects.")

Attempting to load data from FITS file: path/to/your/clustering_catalog.fits


SyntaxError: name 'galaxy_positions' is assigned to before global declaration (ipython-input-32-2416382472.py, line 29)

In [27]:
# Position 7: Analyzer Initialization
# This cell initializes the CosmicNetworkAnalyzerComplete instance,
# which will be used for the network analysis steps.

print("🚀 INITIALIZING COSMIC NETWORK ANALYZER")
print("="*70)

# Initialize the network analyzer instance
# The CosmicNetworkAnalyzerComplete class is defined in Cell 4 (Position 4)
analyzer = CosmicNetworkAnalyzerComplete()

print("\n✅ CosmicNetworkAnalyzer instance ready!")

# The data generation part using DESIDataAccess has been moved/removed
# as we are now loading galaxy positions from a FITS file in Cell 6 (Position 6).

print(f"\n🌌 READY FOR MULTI-RADIUS COSMIC NETWORK ANALYSIS")
print("="*60)

🚀 INITIALIZING COSMIC NETWORK ANALYZER

✅ CosmicNetworkAnalyzer instance ready!

🌌 READY FOR MULTI-RADIUS COSMIC NETWORK ANALYSIS


In [12]:
# Position 7: Execute Multi-Radius Analysis
print("🌌 EXECUTING COMPLETE COSMIC NETWORK ANALYSIS")
print("="*70)

# Analysis parameters
ANALYSIS_RADII = [15, 20, 25, 30, 35, 40]  # Multi-scale analysis
N_RANDOM_TRIALS = 30  # Statistical validation
ENABLE_BOOTSTRAP = True
N_BOOTSTRAP = 50

print(f"📋 Analysis Configuration:")
print(f"   Radii: {ANALYSIS_RADII}")
print(f"   Random trials per radius: {N_RANDOM_TRIALS}")
print(f"   Bootstrap validation: {ENABLE_BOOTSTRAP}")
if ENABLE_BOOTSTRAP:
    print(f"   Bootstrap samples: {N_BOOTSTRAP}")

# Execute multi-radius analysis
print(f"\n🚀 Starting multi-radius analysis...")
start_time = time.time()

multi_results = analyzer.multi_radius_analysis(
    galaxy_positions,
    radii=ANALYSIS_RADII,
    n_random=N_RANDOM_TRIALS
)

analysis_time = time.time() - start_time
print(f"\n⏱️ Multi-radius analysis completed in {analysis_time:.1f} seconds")

# Detailed results summary
print(f"\n📊 DETAILED RESULTS SUMMARY:")
print("="*70)

total_z_scores = []
for i, result in enumerate(multi_results):
    r = result['radius']
    real = result['real_edges']
    rand_mean = result['random_mean']
    rand_std = result['random_std']
    ratio = result['ratio']
    z_score = result['z_score']
    p_value = result['p_value']

    total_z_scores.append(z_score)

    print(f"🔍 Radius {r}:")
    print(f"   Real network: {real} edges")
    print(f"   Random average: {rand_mean:.1f} ± {rand_std:.1f} edges")
    print(f"   Efficiency ratio: {ratio:.2f}×")
    print(f"   Statistical significance: {z_score:.1f}σ (p = {p_value:.2e})")

    # Interpretation
    if z_score > 5:
        significance_level = "🚀 DISCOVERY (>5σ)"
    elif z_score > 3:
        significance_level = "⭐ STRONG EVIDENCE (>3σ)"
    elif z_score > 2:
        significance_level = "✅ MODERATE EVIDENCE (>2σ)"
    else:
        significance_level = "⚠️ WEAK EVIDENCE (<2σ)"

    print(f"   Interpretation: {significance_level}")
    print()

# Overall statistical summary
overall_z = np.mean(total_z_scores)
overall_std = np.std(total_z_scores)
combined_z, combined_p = StatisticalValidator.combined_significance(np.array(total_z_scores))

print(f"🎯 OVERALL STATISTICAL SUMMARY:")
print(f"   Mean Z-score: {overall_z:.1f} ± {overall_std:.1f}σ")
print(f"   Combined significance: {combined_z:.1f}σ")
print(f"   Combined p-value: {combined_p:.2e}")
print(f"   Discovery confidence: {norm.cdf(combined_z)*100:.4f}%")

# Store results for later use
MASTER_RESULTS = {
    'timestamp': datetime.now().isoformat(),
    'analysis_time': analysis_time,
    'galaxy_count': len(galaxy_positions),
    'multi_radius_results': multi_results,
    'overall_statistics': {
        'mean_z_score': overall_z,
        'std_z_score': overall_std,
        'combined_z_score': combined_z,
        'combined_p_value': combined_p
    }
}

print(f"\n✅ Multi-radius analysis complete!")

🌌 EXECUTING COMPLETE COSMIC NETWORK ANALYSIS
📋 Analysis Configuration:
   Radii: [15, 20, 25, 30, 35, 40]
   Random trials per radius: 30
   Bootstrap validation: True
   Bootstrap samples: 50

🚀 Starting multi-radius analysis...


NameError: name 'analyzer' is not defined

In [13]:
# Position 8: Bulletproof Validation Suite (5 Phases) - Class Definition
print("🛡️ BULLETPROOF VALIDATION SUITE - 5 PHASE ANALYSIS")
print("="*70)

class BulletproofValidator:
    """5-phase bulletproof validation for cosmic network discovery"""

    def __init__(self, analyzer, galaxy_positions):
        self.analyzer = analyzer
        self.galaxy_positions = galaxy_positions
        self.validation_results = {}

    def phase_1_baseline_validation(self, radius=25, n_trials=50):
        """Phase 1: Establish baseline statistical significance"""
        print("\n🔍 PHASE 1: Baseline Statistical Validation")
        print("-" * 50)

        result = self.analyzer.analyze_at_radius(self.galaxy_positions, radius, n_trials)

        baseline_metrics = {
            'z_score': result['z_score'],
            'p_value': result['p_value'],
            'ratio': result['ratio'],
            'real_edges': result['real_edges'],
            'random_mean': result['random_mean'],
            'random_std': result['random_std']
        }

        self.validation_results['phase_1'] = baseline_metrics

        print(f"✅ Baseline Z-score: {baseline_metrics['z_score']:.1f}σ")
        print(f"✅ Baseline p-value: {baseline_metrics['p_value']:.2e}")
        print(f"✅ Efficiency ratio: {baseline_metrics['ratio']:.2f}×")

        return baseline_metrics

    def phase_2_bootstrap_robustness(self, radius=25, n_bootstrap=100, n_random=25):
        """Phase 2: Bootstrap validation for robustness"""
        print("\n🔄 PHASE 2: Bootstrap Robustness Testing")
        print("-" * 50)

        bootstrap_result = self.analyzer.bootstrap_validation(
            self.galaxy_positions, radius, n_bootstrap, n_random
        )

        robustness_metrics = {
            'bootstrap_mean_ratio': bootstrap_result['ratio_mean'],
            'bootstrap_std_ratio': bootstrap_result['ratio_std'],
            'bootstrap_ci': bootstrap_result['ratio_ci'],
            'bootstrap_mean_z': bootstrap_result['z_score_mean'],
            'bootstrap_std_z': bootstrap_result['z_score_std']
        }

        self.validation_results['phase_2'] = robustness_metrics

        # Robustness check
        cv_ratio = robustness_metrics['bootstrap_std_ratio'] / robustness_metrics['bootstrap_mean_ratio']
        cv_z = robustness_metrics['bootstrap_std_z'] / robustness_metrics['bootstrap_mean_z']

        print(f"✅ Bootstrap ratio: {robustness_metrics['bootstrap_mean_ratio']:.2f} ± {robustness_metrics['bootstrap_std_ratio']:.2f}")
        print(f"✅ Bootstrap Z-score: {robustness_metrics['bootstrap_mean_z']:.1f} ± {robustness_metrics['bootstrap_std_z']:.1f}")
        print(f"✅ Ratio CV: {cv_ratio:.3f} ({'ROBUST' if cv_ratio < 0.2 else 'VARIABLE'})")
        print(f"✅ Z-score CV: {cv_z:.3f} ({'ROBUST' if cv_z < 0.3 else 'VARIABLE'})")

        return robustness_metrics

    def phase_3_scale_invariance(self, radii=[15, 20, 25, 30, 35, 40], n_random=30):
        """Phase 3: Multi-scale validation"""
        print("\n📏 PHASE 3: Scale Invariance Testing")
        print("-" * 50)

        scale_results = self.analyzer.multi_radius_analysis(self.galaxy_positions, radii, n_random)

        # Extract scaling metrics
        radii_vals = [r['radius'] for r in scale_results]
        real_edges = [r['real_edges'] for r in scale_results]
        random_means = [r['random_mean'] for r in scale_results]
        ratios = [r['ratio'] for r in scale_results]
        z_scores = [r['z_score'] for r in scale_results]

        # Power law fits
        real_A, real_exp, real_r2 = StatisticalValidator.power_law_fit(np.array(radii_vals), np.array(real_edges))
        random_A, random_exp, random_r2 = StatisticalValidator.power_law_fit(np.array(radii_vals), np.array(random_means))
        ratio_A, ratio_exp, ratio_r2 = StatisticalValidator.power_law_fit(np.array(radii_vals), np.array(ratios))

        scale_metrics = {
            'radii': radii_vals,
            'real_scaling': {'A': real_A, 'exponent': real_exp, 'r_squared': real_r2},
            'random_scaling': {'A': random_A, 'exponent': random_exp, 'r_squared': random_r2},
            'ratio_scaling': {'A': ratio_A, 'exponent': ratio_exp, 'r_squared': ratio_r2},
            'mean_z_score': np.mean(z_scores),
            'min_z_score': np.min(z_scores),
            'scale_results': scale_results
        }

        self.validation_results['phase_3'] = scale_metrics

        print(f"✅ Real universe scaling: r^{real_exp:.2f} (R² = {real_r2:.3f})")
        print(f"✅ Random network scaling: r^{random_exp:.2f} (R² = {random_r2:.3f})")
        print(f"✅ Efficiency ratio scaling: r^{ratio_exp:.2f} (R² = {ratio_r2:.3f})")
        print(f"✅ Mean significance: {np.mean(z_scores):.1f}σ")
        print(f"✅ Minimum significance: {np.min(z_scores):.1f}σ")

        # Check for holographic vs volume scaling
        if real_exp is not None and random_exp is not None:
            if 2.0 <= real_exp <= 2.5 and 2.7 <= random_exp <= 3.0:
                scaling_interpretation = "🎯 HOLOGRAPHIC vs VOLUME SCALING CONFIRMED"
            else:
                scaling_interpretation = "⚠️ Unexpected scaling behavior"
            print(f"✅ {scaling_interpretation}")

        return scale_metrics

    def phase_4_contamination_testing(self, radius=25, contamination_levels=[0.1, 0.2, 0.3], n_random=25):
        """Phase 4: Test robustness against data contamination"""
        print("\n🧪 PHASE 4: Contamination Resistance Testing")
        print("-" * 50)

        contamination_results = []

        for contamination in contamination_levels:
            print(f"   Testing {contamination*100:.0f}% contamination...")

            # Add random contamination
            n_galaxies = len(self.galaxy_positions)
            n_contamination = int(n_galaxies * contamination)

            # Create contaminated sample
            contaminated_sample = self.galaxy_positions.copy()
            contamination_indices = np.random.choice(n_galaxies, n_contamination, replace=False)

            # Replace with random positions
            pos_range = [self.galaxy_positions.min(axis=0), self.galaxy_positions.max(axis=0)]
            random_contamination = np.random.uniform(pos_range[0], pos_range[1], (n_contamination, 3))
            contaminated_sample[contamination_indices] = random_contamination

            # Analyze contaminated sample
            result = self.analyzer.analyze_at_radius(contaminated_sample, radius, n_random)

            contamination_results.append({
                'contamination_level': contamination,
                'z_score': result['z_score'],
                'ratio': result['ratio'],
                'degradation_z': (self.validation_results['phase_1']['z_score'] - result['z_score']) / self.validation_results['phase_1']['z_score'],
                'degradation_ratio': (self.validation_results['phase_1']['ratio'] - result['ratio']) / self.validation_results['phase_1']['ratio']
            })

            print(f"     Z-score: {result['z_score']:.1f}σ (degradation: {contamination_results[-1]['degradation_z']*100:.1f}%)")
            print(f"     Ratio: {result['ratio']:.2f}× (degradation: {contamination_results[-1]['degradation_ratio']*100:.1f}%)")

        contamination_metrics = {
            'contamination_results': contamination_results,
            'max_degradation_z': max([r['degradation_z'] for r in contamination_results]),
            'max_degradation_ratio': max([r['degradation_ratio'] for r in contamination_results])
        }

        self.validation_results['phase_4'] = contamination_metrics

        print(f"✅ Maximum Z-score degradation: {contamination_metrics['max_degradation_z']*100:.1f}%")
        print(f"✅ Maximum ratio degradation: {contamination_metrics['max_degradation_ratio']*100:.1f}%")
        print(f"✅ Contamination resistance: {'EXCELLENT' if contamination_metrics['max_degradation_z'] < 0.3 else 'MODERATE' if contamination_metrics['max_degradation_z'] < 0.5 else 'POOR'}")

        return contamination_metrics

    def phase_5_final_validation(self):
        """Phase 5: Final comprehensive validation"""
        print("\n🏆 PHASE 5: Final Comprehensive Validation")
        print("-" * 50)

        # Collect all validation metrics
        baseline_z = self.validation_results['phase_1']['z_score']
        baseline_ratio = self.validation_results['phase_1']['ratio']

        bootstrap_z = self.validation_results['phase_2']['bootstrap_mean_z']
        bootstrap_ratio = self.validation_results['phase_2']['bootstrap_mean_ratio']

        scale_mean_z = self.validation_results['phase_3']['mean_z_score']
        scale_min_z = self.validation_results['phase_3']['min_z_score']

        contamination_resistance = 1 - self.validation_results['phase_4']['max_degradation_z']

        # Combined validation score
        validation_score = (
            min(baseline_z / 5.0, 1.0) * 0.3 +  # Baseline significance (max 5σ)
            min(scale_mean_z / 5.0, 1.0) * 0.3 +  # Scale consistency
            min(scale_min_z / 3.0, 1.0) * 0.2 +  # Worst-case scenario
            contamination_resistance * 0.2  # Robustness
        )

        final_metrics = {
            'validation_score': validation_score,
            'baseline_significance': baseline_z,
            'scale_consistency': scale_mean_z,
            'worst_case_significance': scale_min_z,
            'contamination_resistance': contamination_resistance,
            'overall_ratio': baseline_ratio,
            'discovery_confidence': norm.cdf(baseline_z) * 100
        }

        self.validation_results['phase_5'] = final_metrics

        # Final verdict
        if validation_score >= 0.8 and baseline_z >= 5.0:
            verdict = "🚀 DISCOVERY CONFIRMED - PARADIGM SHIFT"
        elif validation_score >= 0.7 and baseline_z >= 3.0:
            verdict = "⭐ STRONG EVIDENCE - PUBLICATION READY"
        elif validation_score >= 0.6 and baseline_z >= 2.0:
            verdict = "✅ MODERATE EVIDENCE - FURTHER STUDY"
        else:
            verdict = "⚠️ INSUFFICIENT EVIDENCE"

        print(f"✅ Validation Score: {validation_score:.3f}")
        print(f"✅ Discovery Confidence: {final_metrics['discovery_confidence']:.4f}%")
        print(f"✅ Overall Efficiency: {baseline_ratio:.1f}× advantage")
        print(f"✅ FINAL VERDICT: {verdict}")

        return final_metrics, verdict

    def run_complete_validation(self):
        """Run all 5 phases of bulletproof validation"""
        print("🛡️ EXECUTING COMPLETE BULLETPROOF VALIDATION")
        print("="*70)

        start_time = time.time()

        # Run all phases
        self.phase_1_baseline_validation()
        self.phase_2_bootstrap_robustness()
        self.phase_3_scale_invariance()
        self.phase_4_contamination_testing()
        final_metrics, verdict = self.phase_5_final_validation()

        validation_time = time.time() - start_time

        print(f"\n⏱️ Total validation time: {validation_time:.1f} seconds")
        print(f"🎯 BULLETPROOF VALIDATION COMPLETE")
        print("="*70)

        return self.validation_results, verdict

print("✅ BulletproofValidator class ready!")

🛡️ BULLETPROOF VALIDATION SUITE - 5 PHASE ANALYSIS
✅ BulletproofValidator class ready!


In [17]:
# Position 9: Execute Bulletproof Validation
print("🚀 EXECUTING BULLETPROOF VALIDATION SUITE")
print("="*70)

# Initialize bulletproof validator
validator = BulletproofValidator(analyzer, galaxy_positions)

# Run complete validation
validation_results, final_verdict = validator.run_complete_validation()

# Store validation results
MASTER_RESULTS['bulletproof_validation'] = validation_results
MASTER_RESULTS['final_verdict'] = final_verdict

print(f"\n🎯 BULLETPROOF VALIDATION SUMMARY:")
print("="*70)

# Phase summaries
phase_1 = validation_results['phase_1']
phase_2 = validation_results['phase_2']
phase_3 = validation_results['phase_3']
phase_4 = validation_results['phase_4']
phase_5 = validation_results['phase_5']

print(f"📊 PHASE 1 - Baseline: {phase_1['z_score']:.1f}σ, {phase_1['ratio']:.1f}× efficiency")
print(f"🔄 PHASE 2 - Bootstrap: {phase_2['bootstrap_mean_z']:.1f}±{phase_2['bootstrap_std_z']:.1f}σ robustness")
print(f"📏 PHASE 3 - Scale: {phase_3['mean_z_score']:.1f}σ mean, {phase_3['min_z_score']:.1f}σ minimum")
print(f"🧪 PHASE 4 - Contamination: {(1-phase_4['max_degradation_z'])*100:.0f}% resistance")
print(f"🏆 PHASE 5 - Final Score: {phase_5['validation_score']:.3f}")

print(f"\n🚀 FINAL RESULT: {final_verdict}")

# Scaling law summary
real_scaling = phase_3['real_scaling']
random_scaling = phase_3['random_scaling']

if real_scaling['exponent'] and random_scaling['exponent']:
    print(f"\n🌌 COSMIC SCALING LAWS:")
    print(f"   Real Universe: E ∝ r^{real_scaling['exponent']:.2f} (R² = {real_scaling['r_squared']:.3f})")
    print(f"   Random Networks: E ∝ r^{random_scaling['exponent']:.2f} (R² = {random_scaling['r_squared']:.3f})")

    if 2.0 <= real_scaling['exponent'] <= 2.5:
        print(f"   🎯 Real universe exhibits HOLOGRAPHIC scaling (surface-like)")
    else:
        print(f"   ⚠️ Unexpected real universe scaling")

    if 2.7 <= random_scaling['exponent'] <= 3.0:
        print(f"   🎯 Random networks exhibit VOLUME scaling (3D-like)")
    else:
        print(f"   ⚠️ Unexpected random network scaling")

print(f"\n✅ Bulletproof validation complete!")
print(f"📊 Confidence level: {phase_5['discovery_confidence']:.4f}%")

🚀 EXECUTING BULLETPROOF VALIDATION SUITE


NameError: name 'analyzer' is not defined

In [19]:
# Position 10: Execute Visualizations
# This cell generates publication-quality visualizations of the analysis results.
print("📊 GENERATING PUBLICATION-QUALITY VISUALIZATIONS")
print("="*60)

# Use validation results for visualization
if 'bulletproof_validation' in MASTER_RESULTS:
    validation_results = MASTER_RESULTS['bulletproof_validation']
    if 'phase_3' in validation_results:
        scale_results = validation_results['phase_3']['scale_results']

        # Create the main scaling analysis plot
        save_path = f"cosmic_network_scaling_analysis_{MASTER_TIMESTAMP}.png"
        # Ensure you are using the correct CosmicVisualizer class (e.g., from Position 5/Cell 7)
        # If you consolidated fixes into the class in Cell 7, use that class name here.
        # If you decided to keep the fixed class in Cell 11, ensure it's defined before this cell.
        # Assuming CosmicVisualizer from Position 5/Cell 7 is available:
        fig = CosmicVisualizer.plot_scaling_analysis(scale_results, save_path)

        print("✅ Main scaling analysis visualization complete!")
    else:
        print("⚠️ Phase 3 results not found in MASTER_RESULTS. Skipping scaling analysis plot.")
else:
    print("⚠️ Bulletproof validation results not found in MASTER_RESULTS. Skipping visualizations.")
    # Fallback to using multi_results if validation results are not available (less preferred)
    # if 'multi_radius_results' in globals():
    #     print("⚠️ Using multi-radius results for visualization (validation results not found)")
    #     save_path = f"cosmic_network_scaling_analysis_{MASTER_TIMESTAMP}_fallback.png"
    #     fig = CosmicVisualizer.plot_scaling_analysis(multi_radius_results, save_path)
    # else:
    #     print("❌ No multi-radius results available for visualization.")


# You might also add calls to plot_network_structure here if desired, e.g.:
# if 'analyzer' in globals() and analyzer.last_real_network is not None:
#     CosmicVisualizer.plot_network_structure(galaxy_positions, analyzer.last_real_network, title="Real Network Structure")


print("✅ All visualizations generated (if data and results were available)!")

📊 GENERATING PUBLICATION-QUALITY VISUALIZATIONS


NameError: name 'MASTER_RESULTS' is not defined

In [20]:
# Position 11: Final Results Compilation and Export
print("📋 FINAL RESULTS COMPILATION AND EXPORT")
print("="*70)

# Calculate total runtime
MASTER_END_TIME = time.time()
TOTAL_RUNTIME = MASTER_END_TIME - MASTER_START_TIME

# Complete master results compilation
# Ensure MASTER_RESULTS dictionary exists (it's created in Position 7)
if 'MASTER_RESULTS' in globals():
    MASTER_RESULTS.update({
        'total_runtime_seconds': TOTAL_RUNTIME,
        'total_runtime_formatted': f"{TOTAL_RUNTIME//60:.0f}m {TOTAL_RUNTIME%60:.0f}s",
        'completion_timestamp': datetime.now().isoformat(),
        'analysis_summary': {
            'galaxy_sample_size': len(galaxy_positions) if 'galaxy_positions' in globals() else 'N/A',
            'radii_analyzed': len(ANALYSIS_RADII) if 'ANALYSIS_RADII' in globals() else 'N/A',
            'total_random_trials': (len(ANALYSIS_RADII) * N_RANDOM_TRIALS) if ('ANALYSIS_RADII' in globals() and 'N_RANDOM_TRIALS' in globals()) else 'N/A',
            'validation_phases_completed': 5 if 'bulletproof_validation' in MASTER_RESULTS else 0
        }
    })
else:
    MASTER_RESULTS = {
         'timestamp': datetime.now().isoformat(), # Use current time if MASTER_RESULTS not initialized
         'total_runtime_seconds': TOTAL_RUNTIME,
         'total_runtime_formatted': f"{TOTAL_RUNTIME//60:.0f}m {TOTAL_RUNTIME%60:.0f}s",
         'completion_timestamp': datetime.now().isoformat(),
         'analysis_summary': {
             'galaxy_sample_size': len(galaxy_positions) if 'galaxy_positions' in globals() else 'N/A',
             'radii_analyzed': 'N/A',
             'total_random_trials': 'N/A',
             'validation_phases_completed': 0
         },
         'note': 'MASTER_RESULTS was not fully initialized before final export.'
    }
    print("⚠️ MASTER_RESULTS dictionary was not found, creating a new one for export.")


# Generate comprehensive summary report
def generate_summary_report():
    """Generate comprehensive text summary"""

    report = f"""
🌌 COSMIC NETWORK ULTIMATE MASTER ANALYSIS - COMPLETE REPORT
================================================================

📊 ANALYSIS OVERVIEW:
• Timestamp: {MASTER_RESULTS.get('completion_timestamp', 'N/A')}
• Total Runtime: {MASTER_RESULTS.get('total_runtime_formatted', 'N/A')}
• Galaxy Sample: {MASTER_RESULTS['analysis_summary'].get('galaxy_sample_size', 'N/A')}
• Radii Analyzed: {MASTER_RESULTS['analysis_summary'].get('radii_analyzed', 'N/A')}
• Random Trials: {MASTER_RESULTS['analysis_summary'].get('total_random_trials', 'N/A')}

🎯 KEY DISCOVERIES:
"""

    if 'bulletproof_validation' in MASTER_RESULTS:
        val_results = MASTER_RESULTS['bulletproof_validation']

        # Phase 1 Results
        if 'phase_1' in val_results:
            phase_1 = val_results['phase_1']
            report += f"""
📍 PHASE 1 - Baseline Statistical Validation:
• Z-score: {phase_1.get('z_score', 'N/A'):.1f}σ
• P-value: {phase_1.get('p_value', 'N/A'):.2e}
• Efficiency Ratio: {phase_1.get('ratio', 'N/A'):.2f}×
• Real Edges: {phase_1.get('real_edges', 'N/A')}
• Random Mean: {phase_1.get('random_mean', 'N/A'):.1f} ± {phase_1.get('random_std', 'N/A'):.1f}
"""
        else:
             report += "\n📍 PHASE 1 - Baseline Statistical Validation: Results not available."


        # Phase 2 Results
        if 'phase_2' in val_results:
            phase_2 = val_results['phase_2']
            report += f"""
🔄 PHASE 2 - Bootstrap Robustness:
• Bootstrap Ratio: {phase_2.get('bootstrap_mean_ratio', 'N/A'):.2f} ± {phase_2.get('bootstrap_std_ratio', 'N/A'):.2f}
• Bootstrap Z-score: {phase_2.get('bootstrap_mean_z', 'N/A'):.1f} ± {phase_2.get('bootstrap_std_z', 'N/A'):.1f}
• 95% Confidence Interval: [{phase_2['bootstrap_ci'][0]:.2f if phase_2.get('bootstrap_ci') else 'N/A'}, {phase_2['bootstrap_ci'][1]:.2f if phase_2.get('bootstrap_ci') else 'N/A'}]
"""
        else:
            report += "\n🔄 PHASE 2 - Bootstrap Robustness: Results not available."


        # Phase 3 Results
        if 'phase_3' in val_results:
            phase_3 = val_results['phase_3']
            report += f"""
📏 PHASE 3 - Scale Invariance:
• Real Universe Scaling: r^{phase_3['real_scaling'].get('exponent', 'N/A'):.2f} (R² = {phase_3['real_scaling'].get('r_squared', 'N/A'):.3f})
• Random Network Scaling: r^{phase_3['random_scaling'].get('exponent', 'N/A'):.2f} (R² = {phase_3['random_scaling'].get('r_squared', 'N/A'):.3f})
• Mean Significance: {phase_3.get('mean_z_score', 'N/A'):.1f}σ
• Minimum Significance: {phase_3.get('min_z_score', 'N/A'):.1f}σ
"""
        else:
             report += "\n📏 PHASE 3 - Scale Invariance: Results not available."


        # Phase 4 Results
        if 'phase_4' in val_results:
            phase_4 = val_results['phase_4']
            max_degradation_z = phase_4.get('max_degradation_z', 1.0) # Default to 1.0 if not found
            report += f"""
🧪 PHASE 4 - Contamination Resistance:
• Maximum Z-score Degradation: {max_degradation_z*100:.1f}%
• Maximum Ratio Degradation: {phase_4.get('max_degradation_ratio', 1.0)*100:.1f}%
• Resistance Level: {((1-max_degradation_z)*100):.0f}%
"""
        else:
            report += "\n🧪 PHASE 4 - Contamination Resistance: Results not available."


        # Phase 5 Results
        if 'phase_5' in val_results:
            phase_5 = val_results['phase_5']
            report += f"""
🏆 PHASE 5 - Final Validation:
• Validation Score: {phase_5.get('validation_score', 'N/A'):.3f}
• Discovery Confidence: {phase_5.get('discovery_confidence', 'N/A'):.4f}%
• Overall Efficiency: {phase_5.get('overall_ratio', 'N/A'):.1f}×
"""
            report += f"""
🚀 FINAL VERDICT: {MASTER_RESULTS.get('final_verdict', 'N/A')}
"""i found ngo
        else:
            report += "\n🏆 PHASE 5 - Final Validation: Results not available."
            report += f"""
🚀 FINAL VERDICT: {MASTER_RESULTS.get('final_verdict', 'Analysis incomplete')}
"""


        report += f"""
🌟 SCIENTIFIC IMPLICATIONS:
• The real universe exhibits holographic-like scaling (r^~{phase_3['real_scaling'].get('exponent', 'N/A'):.2f})
• Random networks show volume-like scaling (r^~{phase_3['random_scaling'].get('exponent', 'N/A'):.2f})
• Cosmic networks are {phase_1.get('ratio', 'N/A'):.0f}× more efficient than random
• Statistical significance exceeds {phase_1.get('z_score', 'N/A'):.0f}σ (discovery level)
• Evidence for cosmic information processing optimization

📈 SCALING LAW ANALYSIS:
• Real universe behaves like a holographic information processor
• Surface scaling dominates over volume scaling
• Consistent with black hole physics and holographic principle
• Network efficiency follows power law decay with distance

🔬 VALIDATION SUMMARY:
• All 5 validation phases passed (if results available)
• Robust against data contamination (if results available)
• Scale-invariant across multiple radii (if results available)
• Bootstrap-validated statistical significance (if results available)
• Publication-ready evidence quality (if final verdict confirms)

================================================================
ANALYSIS COMPLETE - COSMIC NETWORK OPTIMIZATION CONFIRMED (if final verdict confirms)
================================================================
"""

    else:
        report += "\n⚠️ Bulletproof validation results not available. Full report cannot be generated."
        report += f"""

================================================================
ANALYSIS INCOMPLETE
================================================================
"""

    return report

# Generate and save summary report
summary_report = generate_summary_report()
print(summary_report)

# Save results to files
results_filename = f"cosmic_network_master_results_{MASTER_TIMESTAMP}.json"
report_filename = f"cosmic_network_summary_report_{MASTER_TIMESTAMP}.txt"

# Export JSON results
try:
    # Ensure MASTER_RESULTS is serializable
    serializable_results = json.loads(json.dumps(MASTER_RESULTS, default=str))
    with open(results_filename, 'w') as f:
        json.dump(serializable_results, f, indent=2)
    print(f"✅ Results saved to: {results_filename}")
except Exception as e:
    print(f"⚠️ Failed to save JSON results: {e}")
    print(f"Error: {e}")

# Export text summary
try:
    with open(report_filename, 'w') as f:
        f.write(summary_report)
    print(f"✅ Summary report saved to: {report_filename}")
except Exception as e:
    print(f"⚠️ Failed to save text report: {e}")
    print(f"Error: {e}")


# Final completion message
print(f"\n🎉 COSMIC NETWORK ULTIMATE MASTER ANALYSIS COMPLETE!")
print("="*70)
print(f"📊 Total analysis time: {MASTER_RESULTS.get('total_runtime_formatted', 'N/A')}")
print(f"🎯 Final verdict: {MASTER_RESULTS.get('final_verdict', 'Analysis completed/incomplete')}")
print(f"📁 Results exported to: {results_filename}")
print(f"📄 Report exported to: {report_filename}")

if 'bulletproof_validation' in MASTER_RESULTS and 'phase_5' in MASTER_RESULTS['bulletproof_validation']:
    validation_score = MASTER_RESULTS['bulletproof_validation']['phase_5'].get('validation_score', 'N/A')
    discovery_confidence = MASTER_RESULTS['bulletproof_validation']['phase_5'].get('discovery_confidence', 'N/A')
    print(f"🏆 Validation score: {validation_score:.3f}" if isinstance(validation_score, (int, float)) else f"🏆 Validation score: {validation_score}")
    print(f"🚀 Discovery confidence: {discovery_confidence:.4f}%" if isinstance(discovery_confidence, (int, float)) else f"🚀 Discovery confidence: {discovery_confidence}")

print(f"\n🌌 This analysis provides definitive evidence for cosmic network optimization!" if 'bulletproof_validation' in MASTER_RESULTS and MASTER_RESULTS.get('final_verdict', '').startswith('🚀 DISCOVERY') else "\n⚠️ Analysis did not reach discovery confidence or is incomplete.")
print(f"✨ Ready for publication and paradigm shift!" if 'bulletproof_validation' in MASTER_RESULTS and MASTER_RESULTS.get('final_verdict', '').startswith('🚀 DISCOVERY') else "✅ Analysis steps completed, review results for next steps.")
print("="*70)

📋 FINAL RESULTS COMPILATION AND EXPORT


TypeError: object of type 'NoneType' has no len()

In [1]:
# Cell to install abacusutils
!pip install abacusutils

Collecting abacusutils
  Downloading abacusutils-2.1.0-py3-none-any.whl.metadata (3.9 kB)
Collecting blosc>=1.9.2 (from abacusutils)
  Downloading blosc-1.11.3-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (16 kB)
Collecting asdf>=3.1.0 (from abacusutils)
  Downloading asdf-4.2.0-py3-none-any.whl.metadata (12 kB)
Collecting parallel_numpy_rng>=0.1.2 (from abacusutils)
  Downloading parallel_numpy_rng-0.1.2-py3-none-any.whl.metadata (2.5 kB)
Collecting asdf-astropy>=0.3 (from abacusutils)
  Downloading asdf_astropy-0.8.0-py3-none-any.whl.metadata (5.5 kB)
Collecting asdf-standard>=1.1.0 (from asdf>=3.1.0->abacusutils)
  Downloading asdf_standard-1.3.0-py3-none-any.whl.metadata (3.3 kB)
Collecting asdf-transform-schemas>=0.3 (from asdf>=3.1.0->abacusutils)
  Downloading asdf_transform_schemas-0.6.0-py3-none-any.whl.metadata (2.2 kB)
Collecting jmespath>=0.6.2 (from asdf>=3.1.0->abacusutils)
  Downloading jmespath-1.0.1-py3-none-any.whl.metadata (7.6 kB)
Collecting a

# Task
Explain the error in the selected code. If possible, fix the error and incorporate the changes into the existing code. Otherwise, try to diagnose the error.

## Data loading (cell 5: `d2ed6324`)

### Subtask:
Load galaxy position data from the specified local FITS file using `astropy.io.fits`. This replaces the simulated data generated in previous steps.


**Reasoning**:
The subtask is to load data from a local FITS file. The current code block is set up to do this using `astropy.io.fits`. The error `ModuleNotFoundError: No module named 'abacusnbody.data.abacus_hod'` is due to an incorrect import statement that is not necessary for the task of loading the FITS file using `astropy`. The primary issue is that the `filename` is a placeholder. I need to remove the incorrect import and replace the placeholder filename with a valid path to a FITS file. Since I cannot know the user's local file path, I will keep the placeholder but add a comment emphasizing that the user *must* replace it and explain how to verify the column names. I will also remove the incorrect import.



In [5]:
# Cell 5: Data Loading - AbacusSummit HOD Mock (using astropy.io.fits)
# This cell attempts to load AbacusSummit HOD mock galaxy data from a local FITS file.
# It assumes you have downloaded a FITS file containing the mock catalog.

# Import necessary libraries
from astropy.io import fits
import numpy as np

# --- Specify the path to your local FITS file ---
# IMPORTANT: Replace 'path/to/your/bgs_mock.fits' with the actual path to your downloaded FITS file.
# You might need to upload the file to your Colab environment first.
# Example filenames from AbacusSummit HOD mocks might look like:
# 'bgs_abacussummit_c000_box_z0.2.fits' or 'bgs_abacussummit_c000_cutsky_z0.2.fits'
filename = 'path/to/your/bgs_mock.fits' # <--- *** REPLACE THIS PATH ***

print(f"Attempting to load data from FITS file: {filename}")

try:
    # Open the FITS file
    # hdul is a list of HDUs (Header Data Units) in the FITS file
    hdul = fits.open(filename)

    # Assuming the main data table is in the first extension (often the second HDU, index 1)
    # You might need to inspect the HDUs of your specific FITS file to confirm the correct index.
    # You can print hdul.info() to see the structure of the FITS file.
    print("\nFITS file info:")
    hdul.info()

    # Try to access data from the first extension (index 1)
    if len(hdul) > 1:
        data = hdul[1].data # Extract the data from the first extension
        print("\n✅ FITS data loaded successfully from extension 1!")
        print(f"Data type: {type(data)}")
        print(f"Number of rows loaded: {len(data)}")

        # Print the column names to help identify position columns
        print("\nAvailable columns (from FITS header):", data.dtype.names)

        # --- Extract galaxy positions ---
        # The documentation suggests positions might be in 'x', 'y', 'z' columns.
        # You need to verify the actual column names in your FITS file (printed above).
        if 'x' in data.dtype.names and 'y' in data.dtype.names and 'z' in data.dtype.names:
             # Stack the position columns horizontally and transpose to get shape (n_objects, 3)
             galaxy_positions_from_mock = np.vstack([data['x'], data['y'], data['z']]).T
             print("\nExtracted positions from 'x', 'y', 'z' columns.")
        elif 'pos' in data.dtype.names and len(data['pos'].shape) == 2 and data['pos'].shape[1] == 3:
             # Some files might store positions directly in a 'pos' column of shape (N, 3)
             galaxy_positions_from_mock = data['pos']
             print("\nExtracted positions from 'pos' column.")
        else:
            print("\n⚠️ Could not automatically identify position columns ('pos' or 'x','y','z').")
            print("Please inspect the 'Available columns' list above and manually update the code to extract positions.")
            galaxy_positions_from_mock = None # Set to None if positions couldn't be extracted automatically

        # --- Update galaxy_positions for the analysis pipeline ---
        if galaxy_positions_from_mock is not None:
             print(f"Extracted {len(galaxy_positions_from_mock)} galaxy positions.")
             # Replace the simulated galaxy_positions with the loaded mock data
             # Ensure the global variable 'galaxy_positions' is updated
             global galaxy_positions
             galaxy_positions = galaxy_positions_from_mock
             print("\n✅ Replaced simulated galaxy_positions with loaded mock data.")
        else:
             print("\n⚠️ Skipping replacement of galaxy_positions as positions could not be extracted.")

    else:
        print("\n❌ Error: FITS file does not contain any extensions with data.")
        data = None
        galaxy_positions_from_mock = None


    # Close the FITS file
    hdul.close()
    print("✅ FITS file closed.")

except FileNotFoundError:
    print(f"\n❌ Error: The file '{filename}' was not found.")
    print("Please ensure the file is uploaded to your Colab environment and the 'filename' variable points to the correct path.")
    # Set galaxy_positions to None or keep the simulated data if preferred
    # For this task, we assume failure if the file is not found
    galaxy_positions = None
except Exception as e:
    print(f"\n❌ An error occurred while loading the FITS data: {e}")
    print("Please verify the file format and the code for accessing the data table and columns.")
    # Set galaxy_positions to None or keep the simulated data if preferred
    # For this task, we assume failure if loading fails
    galaxy_positions = None

# Check if galaxy_positions was successfully loaded/updated
if galaxy_positions is None:
    print("\nTask failed: Galaxy positions could not be loaded from the FITS file.")
else:
     print(f"\nSuccessfully loaded/updated galaxy_positions with {len(galaxy_positions)} objects.")


Attempting to load data from FITS file: path/to/your/bgs_mock.fits

❌ Error: The file 'path/to/your/bgs_mock.fits' was not found.
Please ensure the file is uploaded to your Colab environment and the 'filename' variable points to the correct path.

Task failed: Galaxy positions could not be loaded from the FITS file.
