<a href="https://colab.research.google.com/github/kumuds4/BCH/blob/master/MLpolar06042025.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


# Making the Most of your Colab Subscription



## Faster GPUs

Users who have purchased one of Colab's paid plans have access to faster GPUs and more memory. You can upgrade your notebook's GPU settings in `Runtime > Change runtime type` in the menu to select from several accelerator options, subject to availability.

The free of charge version of Colab grants access to Nvidia's T4 GPUs subject to quota restrictions and availability.

You can see what GPU you've been assigned at any time by executing the following cell. If the execution result of running the code cell below is "Not connected to a GPU", you can change the runtime by going to `Runtime > Change runtime type` in the menu to enable a GPU accelerator, and then re-execute the code cell.


In [None]:
gpu_info = !nvidia-smi
gpu_info = '\n'.join(gpu_info)
if gpu_info.find('failed') >= 0:
  print('Not connected to a GPU')
else:
  print(gpu_info)

Thu Jun  5 03:07:02 2025       
+-----------------------------------------------------------------------------------------+
| NVIDIA-SMI 550.54.15              Driver Version: 550.54.15      CUDA Version: 12.4     |
|-----------------------------------------+------------------------+----------------------+
| GPU  Name                 Persistence-M | Bus-Id          Disp.A | Volatile Uncorr. ECC |
| Fan  Temp   Perf          Pwr:Usage/Cap |           Memory-Usage | GPU-Util  Compute M. |
|                                         |                        |               MIG M. |
|   0  NVIDIA A100-SXM4-40GB          Off |   00000000:00:04.0 Off |                    0 |
| N/A   32C    P0             46W /  400W |       0MiB /  40960MiB |      0%      Default |
|                                         |                        |             Disabled |
+-----------------------------------------+------------------------+----------------------+
                                                

In order to use a GPU with your notebook, select the `Runtime > Change runtime type` menu, and then set the hardware accelerator to the desired option.

## More memory

Users who have purchased one of Colab's paid plans have access to high-memory VMs when they are available. More powerful GPUs are always offered with high-memory VMs.



You can see how much memory you have available at any time by running the following code cell. If the execution result of running the code cell below is "Not using a high-RAM runtime", then you can enable a high-RAM runtime via `Runtime > Change runtime type` in the menu. Then select High-RAM in the Runtime shape toggle button. After, re-execute the code cell.


In [None]:
import psutil

ram_gb = psutil.virtual_memory().total / 1e9
print('Your runtime has {:.1f} gigabytes of available RAM\n'.format(ram_gb))

if ram_gb < 20:
  print('Not using a high-RAM runtime')
else:
  print('You are using a high-RAM runtime!')

Your runtime has 89.6 gigabytes of available RAM

You are using a high-RAM runtime!


## Longer runtimes

All Colab runtimes are reset after some period of time (which is faster if the runtime isn't executing code). Colab Pro and Pro+ users have access to longer runtimes than those who use Colab free of charge.

## Background execution

Colab Pro+ users have access to background execution, where notebooks will continue executing even after you've closed a browser tab. This is always enabled in Pro+ runtimes as long as you have compute units available.



## Relaxing resource limits in Colab Pro

Your resources are not unlimited in Colab. To make the most of Colab, avoid using resources when you don't need them. For example, only use a GPU when required and close Colab tabs when finished.



If you encounter limitations, you can relax those limitations by purchasing more compute units via Pay As You Go. Anyone can purchase compute units via [Pay As You Go](https://colab.research.google.com/signup); no subscription is required.

## Send us feedback!

If you have any feedback for us, please let us know. The best way to send feedback is by using the Help > 'Send feedback...' menu. If you encounter usage limits in Colab Pro consider subscribing to Pro+.

If you encounter errors or other issues with billing (payments) for Colab Pro, Pro+, or Pay As You Go, please email [colab-billing@google.com](mailto:colab-billing@google.com).

## More Resources

### Working with Notebooks in Colab
- [Overview of Colab](/notebooks/basic_features_overview.ipynb)
- [Guide to Markdown](/notebooks/markdown_guide.ipynb)
- [Importing libraries and installing dependencies](/notebooks/snippets/importing_libraries.ipynb)
- [Saving and loading notebooks in GitHub](https://colab.research.google.com/github/googlecolab/colabtools/blob/main/notebooks/colab-github-demo.ipynb)
- [Interactive forms](/notebooks/forms.ipynb)
- [Interactive widgets](/notebooks/widgets.ipynb)

<a name="working-with-data"></a>
### Working with Data
- [Loading data: Drive, Sheets, and Google Cloud Storage](/notebooks/io.ipynb)
- [Charts: visualizing data](/notebooks/charts.ipynb)
- [Getting started with BigQuery](/notebooks/bigquery.ipynb)

### Machine Learning Crash Course
These are a few of the notebooks from Google's online Machine Learning course. See the [full course website](https://developers.google.com/machine-learning/crash-course/) for more.
- [Intro to Pandas DataFrame](https://colab.research.google.com/github/google/eng-edu/blob/main/ml/cc/exercises/pandas_dataframe_ultraquick_tutorial.ipynb)
- [Linear regression with tf.keras using synthetic data](https://colab.research.google.com/github/google/eng-edu/blob/main/ml/cc/exercises/linear_regression_with_synthetic_data.ipynb)


<a name="using-accelerated-hardware"></a>
### Using Accelerated Hardware
- [TensorFlow with GPUs](/notebooks/gpu.ipynb)
- [TPUs in Colab](/notebooks/tpu.ipynb)

<a name="machine-learning-examples"></a>

## Machine Learning Examples

To see end-to-end examples of the interactive machine learning analyses that Colab makes possible, check out these tutorials using models from [TensorFlow Hub](https://tfhub.dev).

A few featured examples:

- [Retraining an Image Classifier](https://tensorflow.org/hub/tutorials/tf2_image_retraining): Build a Keras model on top of a pre-trained image classifier to distinguish flowers.
- [Text Classification](https://tensorflow.org/hub/tutorials/tf2_text_classification): Classify IMDB movie reviews as either *positive* or *negative*.
- [Style Transfer](https://tensorflow.org/hub/tutorials/tf2_arbitrary_image_stylization): Use deep learning to transfer style between images.
- [Multilingual Universal Sentence Encoder Q&A](https://tensorflow.org/hub/tutorials/retrieval_with_tf_hub_universal_encoder_qa): Use a machine learning model to answer questions from the SQuAD dataset.
- [Video Interpolation](https://tensorflow.org/hub/tutorials/tweening_conv3d): Predict what happened in a video between the first and the last frame.


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
#latest at 8:14 PM
#on 06/04/2025 from Gemini
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
import logging
import traceback
import pandas as pd
import logging
import traceback
# Configure logging at the DEBUG level
logging.basicConfig(level=logging.DEBUG)

# Rest of your imports and code follow
# %% [markdown]
# # Making the Most of your Colab Subscription
# ...
# Configuration
BLOCK_LENGTH = 128
INFO_BITS = 64
LEARNING_RATE = 1e-3
EPOCHS = 25
BATCH_SIZE = 32
NUM_SAMPLES_TRAIN = 1000
NUM_TRIALS_PERF =500
SNR_RANGE_AWGN = np.linspace(0, 5, 11)
LIST_SIZES = [1, 8, 16]

# Define Classes and Functions
##############################################

def bit_reversal_permutation(n):
    """
    Generates the bit-reversal permutation indices for a given size n (must be a power of 2).
    """
    if not (n and not (n & (n - 1))):
        raise ValueError("n must be a power of a given size n (must be a power of 2).")

    m = int(np.log2(n))
    indices = np.arange(n)
    reversed_indices = np.zeros(n, dtype=int)

    for i in range(n):
        binary = bin(i)[2:].zfill(m)
        reversed_binary = binary[::-1]
        reversed_indices[i] = int(reversed_binary, 2)

    return reversed_indices

def polar_transform(codeword):
    """
    Applies the polar transform (Fast Fourier Transform-like structure) to a codeword.
    The codeword length must be a power of 2.
    """
    n = len(codeword)
    if not (n and not (n & (n - 1))):
        raise ValueError("Codeword length must be a power of 2")

    if n == 1:
        return codeword

    half_n = n // 2
    c1 = codeword[:half_n]
    c2 = codeword[half_n:]

    transformed_c1 = polar_transform(np.array(c1))
    transformed_c2 = polar_transform(np.array(c2))

    transformed_codeword = np.zeros(n, dtype=int)
    transformed_codeword[:half_n] = (transformed_c1 + transformed_c2) % 2
    transformed_codeword[half_n:] = transformed_c2

    return transformed_codeword

def calculate_llr_awgn_bpsk(received_signal, snr_db):
    """
    Calculates LLRs for BPSK modulation in an AWGN channel.
    """
    snr_linear = 10**(snr_db / 10)
    noise_variance = 1 / (2 * snr_linear)
    llrs = 2 * received_signal / noise_variance
    return llrs
#############################################################
#latest frozen set
def calculate_frozen_set_ga(N, K_crc):
    """
    Implements the Gaussian Approximation (GA) to determine the frozen set for
    a polar code over a Binary-Input AWGN (BIAWGN) channel.

    Args:
        N (int): The block length of the polar code (must be a power of 2).
        K_crc (int): The number of information bits including CRC.

    Returns:
        set: A set of indices corresponding to the frozen bits.
    """
    if not (N > 0 and (N & (N - 1)) == 0):
        raise ValueError("N must be a power of 2")

    n = int(np.log2(N))

    # Initialize channel variances for BIAWGN.
    # For BIAWGN with BPSK, initial channel capacity (or parameter related to variance)
    # is based on the channel SNR.
    # You need to define how to get the effective SNR or Eb/N0 for this calculation.
    # A common parameter used in GA is the 'evolution parameter' lambda,
    # which for BIAWGN is related to 1/(2 * sigma^2) where sigma^2 is noise variance.
    # Let's use effective channel reliability values (inverse of variance).
    # Initial reliability for all channels is the same.
    # Replace `initial_reliability` with the actual calculation based on SNR.
    # For AWGN, often a parameter related to the Bhattacharyya parameter is used.
    # A simpler approximation relates to SNR: reliability = 2 * R * Es/N0 = 2 * K/N * N/K * Eb/N0 = 2 * Eb/N0
    # This requires knowing the design SNR (usually the target SNR for performance).
    # Since this function is called during initialization, we don't have an SNR here.
    # A typical approach for GA is to compute the polarization for a target SNR.
    # For now, let's use a placeholder for reliability, this NEEDS to be replaced
    # with a GA-specific initialization and evolution based on a design SNR.

    # WARNING: This initialization is NOT the standard GA initialization.
    # You need to find the correct formula for initializing channel reliabilities
    # or variances for BIAWGN in the context of GA.
    # Example (conceptual, replace with correct formula):
    # Assume design_snr_db = 3 # dB
    # design_snr_linear = 10**(design_snr_db / 10)
    # initial_reliability = 2 * design_snr_linear # Simplified, not accurate GA
    # channel_reliabilities = np.full(N, initial_reliability)

    # Placeholder Initialization: Using simple index-based sorting for now,
    # which is equivalent to the original simplified code but clearly marked
    # as not the actual GA.
    # You MUST replace this with the correct GA initial state based on a design SNR.
    channel_reliabilities = np.arange(N) # Placeholder: Higher index = Higher reliability (for this simple case)


    # --- Placeholder for GA Channel Evolution ---
    # This section needs the core GA iteration logic.
    # The GA evolution updates the reliabilities of the channels across n stages.
    # For stage i from 1 to n:
    #   For each block of 2^(i) channels:
    #     Update the reliabilities of the first half (combined channels)
    #     and the second half (split channels) based on the reliabilities
    #     from the previous stage (2^(i-1)).
    #
    # Example (Conceptual BIAWGN GA update formula - replace with correct one):
    # If r1, r2 are reliabilities from stage i-1 for two channels:
    #   r_combined = formula_for_combined(r1, r2)
    #   r_split = formula_for_split(r1, r2)
    #
    # You will need to implement these formulas and the nested loops
    # to apply them across all channels and stages.
    #
    # Example structure (replace with actual GA formulas and loops):
    # for stage in range(n):
    #     block_size = 2**(stage + 1)
    #     half_block_size = block_size // 2
    #     for j in range(0, N, block_size):
    #         # Apply GA update formula for combined and split channels
    #         # using channel_reliabilities[j : j + block_size]
    #         pass # Implement the update here
    # WARNING: The current code does NOT implement the GA evolution.

    # --- End of Placeholder for GA Channel Evolution ---


    # Sort channels by reliability in descending order
    # (Higher reliability means lower error probability, so we want the K_crc most reliable)
    # If using variances, you would sort in ascending order of variance.
    # Since we are using a placeholder reliability, we sort based on that placeholder.
    # Replace np.argsort(channel_reliabilities)[::-1] with the correct sorting
    # based on the actual GA-computed reliabilities/variances.
    reliable_indices = np.argsort(channel_reliabilities)[::-1] # Sort descending


    # Select the K_crc most reliable channels for information bits
    info_set = sorted(reliable_indices[:K_crc])

    # The remaining channels are the frozen channels
    frozen_set = sorted(list(set(range(N)) - set(info_set)))

    logging.info(f"Frozen set size: {len(frozen_set)}")
    logging.info(f"Information set size: {len(info_set)}")

    # WARNING: The frozen set calculated here is based on the placeholder logic, NOT GA.
    # The logging warning is still relevant until you implement the proper GA evolution.
    logging.warning("Using placeholder for GA frozen set calculation. Implement the actual GA logic.")

    return set(frozen_set)
###############################################################



#add data base generation file
# Define Classes and Functions
#latest
class PolarCodeGenerator:
    def __init__(self, N, K, crc_type='CRC-7'):
        self.N = N  # Should be 128
        self.K = K  # Should be 64
        self.R = K / N
        self.crc_type = crc_type

        # Move these instance attributes inside the __init__ method
        self.crc_polynomials = {
            # Corrected CRC-7 polynomial definition (assuming standard representation)
            # This assumes the standard polynomial has a coefficient of 1 for x^7
            'CRC-7': (np.array([1, 0, 0, 1, 1, 0, 0, 1]), 7) # Use np.array here
            # Note: The exact polynomial depends on the standard you are following.
            # This is a common CRC-7 polynomial.
        }

        # Determine the number of information bits *after* CRC
        self.K_crc = self.K + (self.crc_polynomials[self.crc_type][1] if crc_type in self.crc_polynomials else 0)

        # Determine the frozen and information sets using GA (placeholder)
        self.frozen_set = calculate_frozen_set_ga(self.N, self.K_crc) # Use K_crc for GA input
        self.info_set = sorted(list(set(range(self.N)) - self.frozen_set))


    def generate_info_bits(self):
        return np.random.randint(2, size=self.K)

    def polar_encode(self, info_bits):
        # Step 1: Compute CRC and append to info bits
        info_bits_with_crc = self.compute_crc(info_bits)

        # Ensure the length of info_bits_with_crc is K_crc
        if len(info_bits_with_crc) != self.K_crc:
             raise ValueError(f"Length of info_bits_with_crc ({len(info_bits_with_crc)}) does not match K_crc ({self.K_crc})")

        # Step 2: Create a placeholder sequence of length N (all zeros initially)
        u = np.zeros(self.N, dtype=int)

        # Step 3: Place the info_bits_with_crc into the "information" indices (determined by GA)
        if len(info_bits_with_crc) > len(self.info_set):
             raise ValueError("Number of info+CRC bits exceeds the allocated info indices.")
        u[self.info_set] = info_bits_with_crc

        # Step 4: Frozen bits (at frozen_set indices) are left as zeros

        # Step 5: Apply the polar transform
        codeword = polar_transform(u)

        # Step 6: Apply the bit-reversal permutation
        permuted_codeword = codeword[bit_reversal_permutation(self.N)]

        return permuted_codeword

    def compute_crc(self, info_bits):
        if self.crc_type not in self.crc_polynomials:
            return info_bits # No CRC applied

        polynomial, length = self.crc_polynomials[self.crc_type]
        # Ensure data is a mutable copy to avoid modifying the original info_bits
        data_for_crc = np.copy(info_bits)
        crc_bits = self.crc(data_for_crc, polynomial, length)
        return np.concatenate((info_bits, crc_bits))

    def crc(self, data, polynomial, length):
        data = np.copy(data)
        data = np.concatenate((data, np.zeros(length, dtype=int)))
        polynomial = np.array(polynomial)

        if len(polynomial) != length + 1:
           raise ValueError("CRC polynomial length mismatch.")

        for i in range(len(data) - length):
            if data[i] == 1:
                data[i:i+length+1] ^= polynomial
        return data[-length:]


###############################################
#latest RNN decoderclass EnhancedRNNDecoder(nn.Module):
class EnhancedRNNDecoder(nn.Module):
    def __init__(self, input_size, output_size):
        # Call the parent class (nn.Module) constructor to initialize it
        super().__init__() # Corrected and simplified call to __init__

        self.model = nn.Sequential(
            nn.Linear(input_size, 128),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(64, output_size),
            nn.Sigmoid()
        )

####################################


    def forward(self, x):
        # Optional: Keep the unsqueeze/view logic if needed for input shaping
        if x.dim() == 1:
            x = x.unsqueeze(0)
        if x.dim() > 2:
            x = x.view(x.size(0), -1)

        # Pass the input directly to the sequential model
        return self.model(x)
################################################
#AWGN BPSK
class EnhancedChannelSimulator:
    def __init__(self, channel_type='AWGN'):
        self.channel_type = channel_type

    def simulate(self, signal, snr_db):
        if self.channel_type == 'AWGN':
            snr_linear = 10 ** (snr_db / 10)
            noise_std = np.sqrt(1 / (2 * snr_linear))
            noise = noise_std * np.random.randn(*signal.shape)
            return signal + noise
###############################################

def bpsk_modulate(bits):
    return 2 * bits - 1  # Convert 0 to -1 and 1 to 1
# Placeholder class for training model

# Dataset Preparation Function

#Lateset Dataset preparation
# Dataset Preparation Function
def prepare_polar_dataset(polar_code_gen, num_samples, snr_db=5, channel_type='AWGN'):
    channel_simulator = EnhancedChannelSimulator(channel_type=channel_type)
    X, y = [], []

    for _ in range(num_samples):
        # Generate original info bits
        info_bits = polar_code_gen.generate_info_bits()
        # Encode the info bits (including CRC)
        encoded_signal = polar_code_gen.polar_encode(info_bits)
        # Modulate and simulate channel
        modulated_signal = bpsk_modulate(encoded_signal)
        received_signal = channel_simulator.simulate(modulated_signal, snr_db)

        # Append the received signal as input X
        X.append(received_signal)
        # Append the *original* info bits as target y
        y.append(info_bits)

    return np.array(X), np.array(y)
######################################################################################

########################################################
#Latest DecoderTrainer

class DecoderTrainer:
    def __init__(self, model, learning_rate):
        self.model = model
        self.criterion = nn.BCELoss()
        self.optimizer = optim.Adam(model.parameters(), lr=learning_rate)

    def train(self, X_train, y_train, X_val=None, y_val=None, epochs=50, batch_size=32):
        dataset = torch.utils.data.TensorDataset(X_train, y_train)
        loader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=True)

        train_losses = []
        val_losses = []

        for epoch in range(epochs):
            epoch_loss = 0
            self.model.train()

            for X_batch, y_batch in loader:
                X_batch = X_batch.view(-1, BLOCK_LENGTH)  # Ensure correct shape
                # print("X_batch shape:", X_batch.shape)  # Debugging statement

                self.optimizer.zero_grad()
                outputs = self.model(X_batch)
                loss = self.criterion(outputs, y_batch)
                loss.backward()
                self.optimizer.step()
                epoch_loss += loss.item()

            train_loss = epoch_loss / len(loader)
            train_losses.append(train_loss)
            logging.info(f"Epoch {epoch+1}/{epochs}, Training Loss: {train_loss:.4f}")

            if X_val is not None and y_val is not None:
                self.model.eval()
                with torch.no_grad():
                    val_output = self.model(X_val.view(-1, BLOCK_LENGTH))
                    val_loss = self.criterion(val_output, y_val).item()
                    val_losses.append(val_loss)
                    logging.info(f"Epoch {epoch+1}/{epochs}, Validation Loss: {val_loss:.4f}")

        return train_losses, val_losses if X_val is not None else None
########################################################


#########################################################



def save_dataset_to_csv(X, y, filename='dataset.csv'):
    data = np.hstack((X, y))
    columns = [f'received_{i}' for i in range(X.shape[1])] + [f'bit_{j}' for j in range(y.shape[1])]
    df = pd.DataFrame(data, columns=columns)
    df.to_csv(filename, index=False)
    logging.info(f"Dataset saved to {filename}")
##############################################
# SCLD decoder add on
# Add the CRC check logic used in the generator to the decoder
# It's good practice to share this logic or ensure consistency
def check_crc(data, polynomial, length):
    """
    Performs a CRC check on the data. Returns True if the remainder is zero, False otherwise.
    Assumes 'data' already includes the transmitted CRC bits (size K_crc).
    """
    # Ensure data is a mutable copy
    data = np.copy(data)
    # Convert polynomial to a numpy array if it's a list (should already be an array from class def)
    polynomial = np.array(polynomial)

    # Ensure polynomial is the correct length for XOR operation
    if len(polynomial) != length + 1:
       raise ValueError("CRC polynomial length mismatch.")

    # The data length for CRC checking is K_crc.
    # The loop runs for K_crc - length iterations.
    data_len = len(data)
    if data_len != length + len(data) - length: # Equivalent to data_len != K_crc
        # This check might be too strict depending on how you call it.
        # The important part is the loop range.
        pass # Or add a more specific check if needed

    # Perform division (XOR operations)
    # The loop iterates over the 'message' part plus the appended zeros (which are the received CRC bits here)
    # The division process is the same as in the encoder's crc function.
    for i in range(data_len - length):
        if data[i] == 1:
            data[i:i+length+1] ^= polynomial

    # The remainder is the last 'length' bits
    remainder = data[-length:]

    # Check if the remainder is all zeros
    return np.all(remainder == 0)


class SCLDecoder:
    def __init__(self, N, K, list_size, crc_type='CRC-7'): # Add crc_type to SCLDecoder init
        self.N = N  # Should be 128
        self.K = K  # Should be 64
        self.list_size = list_size
        self.crc_type = crc_type

        # Get the CRC polynomial and length
        # Use the same definition as in PolarCodeGenerator
        self.crc_polynomials = {
             'CRC-7': (np.array([1, 0, 0, 1, 1, 0, 0, 1]), 7)
        }
        if crc_type not in self.crc_polynomials:
             self.crc_poly_info = (None, 0)
        else:
             self.crc_poly_info = self.crc_polynomials[crc_type]

        self.K_crc = self.K + self.crc_poly_info[1] # Number of info + CRC bits (71 for CRC-7)

        # You will need to determine the info and frozen indices using a proper
        # polar code construction method (like GA) that is consistent with the encoder.
        # For now, keeping the placeholder, but it should ideally be K_crc info bits
        # selected based on channel polarization.
        self.frozen_set = self._get_frozen_set() # This still uses the placeholder logic
        self.info_set = sorted(list(set(range(N)) - set(self.frozen_set)))


    def _get_frozen_set(self):
        # This should ideally use a GA calculation consistent with the encoder
        # For now, keeping the simple placeholder, but it should select K_crc info bits
        # In a real polar code, this set is determined by channel polarization
        # based on the desired code rate and channel conditions.
        # This simplified version uses a fixed set.
        # The info set should contain the indices corresponding to the K_crc best channels.
        # The frozen set contains the remaining N - K_crc indices.
        # A simple placeholder based on index order (not GA):
        info_indices_placeholder = np.arange(self.K_crc)
        frozen_indices_placeholder = np.arange(self.K_crc, self.N)
        return set(frozen_indices_placeholder) # Return the set of frozen indices


    def decode(self, received_signal, snr_db):
        """
        SCL decoding process using LLRs.
        This needs the full SCL decoding logic with CRC.
        """
        llrs = calculate_llr_awgn_bpsk(received_signal, snr_db)

        # --- Implement the core SCL decoding algorithm here ---
        # This involves building the decoding tree, propagating LLRs/path metrics,
        # managing the list of active paths, and making bit decisions.
        # At the end of the decoding tree, you will have 'list_size' candidate
        # decoded sequences corresponding to the 'u' vector (size N).

        # --- Extract candidate info+CRC bits ---
        # From each of the 'list_size' candidate 'u' vectors (size N),
        # extract the bits corresponding to the 'info_set' indices.
        # These will be candidate 'K_crc'-length sequences (size 71).

        candidate_info_crc_sequences = [] # List to store the extracted K_crc sequences
        candidate_metrics = [] # List to store metrics for path selection

        # IMPORTANT: Replace this placeholder with your actual SCL tree search
        # and path extraction logic. The loop below simulates having candidate
        # K_crc sequences, but you need to get them from the SCL process.
        # Example Placeholder (you need to replace this):
        # For demonstration, let's simulate getting some random K_crc sequences
        # and metrics. This is NOT the SCL output.
        for _ in range(self.list_size):
            # In a real SCL, this would come from a decoded path
            simulated_k_crc_sequence = np.random.randint(2, size=self.K_crc)
            simulated_metric = -np.random.rand() # Simulate a metric (lower is better, e.g., path metric)
            candidate_info_crc_sequences.append(simulated_k_crc_sequence)
            candidate_metrics.append(simulated_metric)

        if not candidate_info_crc_sequences:
             # If the SCL placeholder didn't produce candidates (shouldn't happen with the simulation above,
             # but keep for robustness when you implement real SCL).
             logging.warning("SCL placeholder produced no candidate sequences.")
             return np.zeros(self.K, dtype=int)


        # --- Perform CRC check and select the best valid path ---
        best_decoded_k_crc = None
        best_metric_among_valid = -np.inf # Assuming higher metric is better, adjust if needed

        valid_candidates_with_metrics = []
        for i, candidate_k_crc in enumerate(candidate_info_crc_sequences):
            metric = candidate_metrics[i] # Get the metric for this candidate
            if self.crc_poly_info[0] is not None and check_crc(candidate_k_crc, self.crc_poly_info[0], self.crc_poly_info[1]):
                 valid_candidates_with_metrics.append({'sequence': candidate_k_crc, 'metric': metric})

        # Select the best candidate that passed CRC
        if valid_candidates_with_metrics:
             # Sort by metric (e.g., descending for higher metric is better)
             valid_candidates_with_metrics.sort(key=lambda x: x['metric'], reverse=True) # Adjust sorting order based on your metric
             best_decoded_k_crc = valid_candidates_with_metrics[0]['sequence']
             logging.debug(f"SCL found {len(valid_candidates_with_metrics)} valid path(s) with CRC.")
        else:
             # If no path passed CRC, select the best path based on metric alone (optional, common practice)
             logging.warning("No SCL path passed CRC. Selecting best path based on metric alone.")
             # Sort ALL candidates by metric
             all_candidates_with_metrics = [{'sequence': seq, 'metric': met} for seq, met in zip(candidate_info_crc_sequences, candidate_metrics)]
             all_candidates_with_metrics.sort(key=lambda x: x['metric'], reverse=True) # Adjust sorting order
             if all_candidates_with_metrics:
                 best_decoded_k_crc = all_candidates_with_metrics[0]['sequence']
             else:
                 # Should not happen if candidate_info_crc_sequences was not empty
                 logging.error("No SCL candidates available for selection.")
                 return np.zeros(self.K, dtype=int)


        # --- Extract the K information bits from the K_crc sequence ---
        # The first K bits of the K_crc sequence are the information bits.
        if best_decoded_k_crc is not None:
             decoded_info_bits = best_decoded_k_crc[:self.K] # Extract the first K bits (size 64)
        else:
             # Should ideally not be reached if logic is correct
             logging.error("Best decoded K_crc sequence is None.")
             decoded_info_bits = np.zeros(self.K, dtype=int)


        # WARNING: This still indicates the core SCL algorithm is a placeholder.
        logging.warning("Using a simplified placeholder for core SCL decoding logic within decode method. A full implementation is required.")

        # Ensure the output is of size K
        if len(decoded_info_bits) != self.K:
             # This check should now pass if the logic for extracting[:self.K] is correct
             raise ValueError(f"SCL decoder output size mismatch. Expected {self.K}, got {len(decoded_info_bits)}")

        return decoded_info_bits # This should be the decoded info bits (size K = 64)
##################################################











    # ... (rest of the class methods) ...
################################################################################



    def _compute_crc_for_info_bits(self, info_bits):
        """
        Computes the CRC for a given set of information bits and checks if it's valid.
        Assumes the info_bits are the *original* K information bits before encoding.
        In SCL, this is used to check the decoded information bits.
        """
        if self.crc_polynomial is None:
            return True # No CRC, always valid

        # Ensure the input info_bits are the K original bits
        if len(info_bits) != self.K:
            raise ValueError(f"Expected {self.K} information bits, got {len(info_bits)}.")

        # Concatenate info bits with the CRC calculated on them
        # Note: This requires the same CRC calculation used during encoding.
        # Assuming your `PolarCodeGenerator.crc` function is correct.
        # To perform the check, you'd typically append the received CRC bits
        # (if you could separate them) or perform the division on the
        # concatenated info+CRC bits.

        # A common way to check CRC is to perform the division on the received
        # info+CRC sequence and check if the remainder is zero.
        # However, in SCL, you have *candidate* info bits, and you compute the CRC
        # *for* these candidate bits and compare it to the received CRC bits.
        # Since we don't explicitly separate info and CRC in the received signal
        # in this simplified setup, let's implement the check by re-computing CRC
        # on the *candidate* info bits and seeing if it matches the CRC that *should*
        # be there based on the original encoding process. This is tricky without
        # the full SCL decoding output structure.

        # Let's implement the CRC calculation for a given sequence.
        # This is the same logic as in PolarCodeGenerator.crc but for a check.
        data_to_check = np.copy(info_bits)
        # Append zeros for remainder calculation
        data_to_check = np.concatenate((data_to_check, np.zeros(self.crc_length, dtype=int)))
        polynomial = np.array(self.crc_polynomial)

        if len(polynomial) != self.crc_length + 1:
           raise ValueError("CRC polynomial length mismatch in check.")

        for i in range(len(data_to_check) - self.crc_length):
            if data_to_check[i] == 1:
                data_to_check[i:i+self.crc_length+1] ^= polynomial

        remainder = data_to_check[-self.crc_length:]

        # A valid CRC has a remainder of zero
        return np.all(remainder == 0)


###############################################
#latest def run decoder

# Function to run SCL decoder performance evaluation
def run_scl_decoder(polar_code_gen, snr_range, list_size, channel_type, num_trials):
    results = []
    for snr_db in snr_range:
        # The data preparation will now use the polar-encoded signals
        X, y = prepare_polar_dataset(polar_code_gen, num_samples=num_trials, snr_db=snr_db, channel_type=channel_type)

        # Instantiate SCLDecoder - Corrected to use crc_type='CRC-7' (or default)
        # Ensure you are NOT passing crc_polynomial here.
        decoder = SCLDecoder(N=polar_code_gen.N, K=polar_code_gen.K, list_size=list_size, crc_type='CRC-7') # Explicitly pass crc_type

        decoded_bits = []
        for x_sample, y_sample in zip(X, y):
            # Pass the received signal to the decoder
            # The decode method expects the received signal and SNR
            decoded_info = decoder.decode(x_sample, snr_db)
            decoded_bits.append(decoded_info)

        decoded_bits = np.array(decoded_bits)

        # Ensure decoded_bits and y have compatible shapes for comparison
        # y now contains the original INFO_BITS (64), and decoded_bits should too.
        if decoded_bits.shape != y.shape:
             logging.error(f"Shape mismatch for error calculation: decoded_bits {decoded_bits.shape}, y {y.shape}")
             # Handle shape mismatch - this might indicate an issue in the decode method's output
             # For now, we'll skip error calculation for this list size/SNR to avoid crashing
             # but you MUST fix the SCL decoder implementation to return the correct shape.
             print(f"Skipping error calculation for List Size {list_size}, SNR {snr_db} due to shape mismatch.")
             continue # Skip to the next SNR/list size

        # Error rate calculations (assuming decoded_bits and y are now (num_trials, INFO_BITS))
        bit_errors = np.sum(np.abs(decoded_bits - y))
        block_errors = np.sum(np.any(decoded_bits != y, axis=1))

        ber = bit_errors / (num_trials * polar_code_gen.K) # Use K for info bits
        bler = block_errors / num_trials

        results.append({'SNR': snr_db, 'BER': ber, 'BLER': bler})
        print(f"SCL Decoder - SNR: {snr_db:.1f} dB, List Size: {list_size}, BER: {ber:.6f}, BLER: {bler:.6f}") # Increased precision for printing
    return results

##############################################


#############################
#Performance comparison
def performance_comparison(rnn_trainer, polar_code_gen, snr_range, channel_type, list_sizes, num_trials):
    results = {'SNR': snr_range, 'BER_RNN': [], 'BLER_RNN': []}

    # Initialize dictionaries for SCL results for each list size
    for size in list_sizes:
        results[f'BER_SCL_{size}'] = []
        results[f'BLER_SCL_{size}'] = []

    # The SCL decoder needs the frozen set and CRC polynomial from the generator
    frozen_set = polar_code_gen.frozen_set
    crc_polynomial = polar_code_gen.crc_polynomials.get(polar_code_gen.crc_type, (None, 0))[0]


    for snr in snr_range:
        rnn_bit_errors = 0
        rnn_block_errors = 0
        scl_bit_errors = {size: 0 for size in list_sizes}
        scl_block_errors = {size: 0 for size in list_sizes}

        # Generate a batch of data for the current SNR
        X_perf, y_perf = prepare_polar_dataset(polar_code_gen, num_samples=num_trials, snr_db=snr, channel_type=channel_type)

        # RNN decoding for the batch
        rnn_input_batch = torch.FloatTensor(X_perf)
        # Reshape the input to match the model's expected input size
        rnn_input_batch = rnn_input_batch.view(-1, BLOCK_LENGTH)

        rnn_output_batch = rnn_trainer.model(rnn_input_batch).round().detach().numpy()

        rnn_bit_errors = np.sum(np.abs(rnn_output_batch - y_perf))
        rnn_block_errors = np.sum(np.any(rnn_output_batch != y_perf, axis=1))

        # SCL Performance for all list sizes
        scl_perf_results = {}
        for list_size in LIST_SIZES:
            # The SCLDecoder instantiation happens INSIDE run_scl_decoder,
            # so we just need to call run_scl_decoder correctly here.
            logging.info(f"Running SCL Decoder performance for List Size: {list_size}")
            scl_results_for_list_size = run_scl_decoder(
                polar_code_gen,
                SNR_RANGE_AWGN,
                list_size,
                'AWGN',
                NUM_TRIALS_PERF
            )
            scl_perf_results[list_size] = scl_results_for_list_size

        # SCL decoding for each trial
        for i in range(num_trials):
            info_bits = y_perf[i] # True info bits
            simulated_signal = X_perf[i]

            for size in list_sizes:
                # Instantiate the SCL decoder for each trial (or batch if implemented)
                # Pass frozen_set and crc_polynomial to the SCLDecoder
                scl_decoder_instance = SCLDecoder(N=polar_code_gen.N, K=polar_code_gen.K, list_size=size,
                                                 crc_polynomial=crc_polynomial, frozen_set=frozen_set)
                scl_output = scl_decoder_instance.decode(simulated_signal, snr) # This is the placeholder output

                # In a real scenario, scl_output would be the decoded info bits (K length).
                # Since it's a placeholder returning zeros, the comparison below
                # will show high error rates.

                # Ensure the SCL output has the correct shape before comparison
                if scl_output.shape != info_bits.shape:
                    logging.warning(f"SCL decoder output shape mismatch for list size {size}. Expected {info_bits.shape}, got {scl_output.shape}")
                    # Skip error calculation for this trial if shapes are inconsistent
                    continue


                scl_bit_errors[size] += np.sum(info_bits != scl_output)
                scl_block_errors[size] += np.any(info_bits != scl_output)

        results['BER_RNN'].append(rnn_bit_errors / (num_trials * INFO_BITS))
        results['BLER_RNN'].append(rnn_block_errors / num_trials)

        for size in list_sizes:
             if num_trials > 0 and INFO_BITS > 0:
                results[f'BER_SCL_{size}'].append(scl_bit_errors[size] / (num_trials * INFO_BITS))
                results[f'BLER_SCL_{size}'].append(scl_block_errors[size] / num_trials)
             else:
                results[f'BER_SCL_{size}'].append(float('nan'))
                results[f'BLER_SCL_{size}'].append(float('nan'))


    return results

##############################
#Plotting functions

def plot_training_validation(train_losses, val_losses):
    plt.figure(figsize=(8, 4))
    plt.plot(train_losses, label='Training Loss')
    if val_losses:
        plt.plot(val_losses, label='Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title('Training and Validation Loss')
    plt.legend()
    plt.show()

def plot_ber_bler(snr_range, results):
    plt.figure(figsize=(12, 6))

    # Bit Error Rate (BER)
    plt.subplot(1, 2, 1)
    plt.plot(snr_range, results['BER_RNN'], label='RNN')
    plt.plot(snr_range, results['BER_SCL_1'], label='SCL, List Size 1')
    plt.plot(snr_range, results['BER_SCL_8'], label='SCL, List Size 8')
    plt.plot(snr_range, results['BER_SCL_16'], label='SCL, List Size 16')
    plt.xlabel('SNR (dB)')
    plt.ylabel('Bit Error Rate')
    plt.title('Bit Error Rate (BER)')
    plt.legend()

    # Block Error Rate (BLER)
    plt.subplot(1, 2, 2)
    plt.plot(snr_range, results['BLER_RNN'], label='RNN')
    plt.plot(snr_range, results['BLER_SCL_1'], label='SCL, List Size 1')
    plt.plot(snr_range, results['BLER_SCL_8'], label='SCL, List Size 8')
    plt.plot(snr_range, results['BLER_SCL_16'], label='SCL, List Size 16')
    plt.xlabel('SNR (dB)')
    plt.ylabel('Block Error Rate')
    plt.title('Block Error Rate (BLER)')
    plt.legend()

    plt.tight_layout()
    plt.show()

def plot_confusion_matrix(y_true, y_pred, title='Confusion Matrix'):
    cm = confusion_matrix(y_true, y_pred)
    disp = ConfusionMatrixDisplay(confusion_matrix=cm)
    disp.plot()
    plt.title(title)
    plt.show()
#############################

# More functions
def plot_ber_bler_comparison(snr_range, rnn_results, scl_results, list_size):
    plt.figure(figsize=(12, 6))

    # BER Plot
    plt.subplot(1, 2, 1)
    plt.plot(snr_range, rnn_results['BER_RNN'], label='RNN')
    plt.plot(snr_range, [result['BER'] for result in scl_results], label=f'SCL, List Size {list_size}')
    plt.xlabel('SNR (dB)')
    plt.ylabel('BER')
    plt.title('Bit Error Rate (BER)')
    plt.legend()

    # BLER Plot
    plt.subplot(1, 2, 2)
    plt.plot(snr_range, rnn_results['BLER_RNN'], label='RNN')
    plt.plot(snr_range, [result['BLER'] for result in scl_results], label=f'SCL, List Size {list_size}')
    plt.xlabel('SNR (dB)')
    plt.ylabel('BLER')
    plt.title('Block Error Rate (BLER)')
    plt.legend()

    plt.tight_layout()
    plt.show()

##################################
def plot_ber_bler_scl(snr_range, scl_results, list_size):
    plt.figure(figsize=(12, 6))

    # BER Plot for SCL
    plt.subplot(1, 2, 1)
    plt.plot(snr_range, [result['BER'] for result in scl_results], label=f'SCL, List Size {list_size}')
    plt.xlabel('SNR (dB)')
    plt.ylabel('BER')
    plt.title('SCL Bit Error Rate (BER)')
    plt.legend()

    # BLER Plot for SCL
    plt.subplot(1, 2, 2)
    plt.plot(snr_range, [result['BLER'] for result in scl_results], label=f'SCL, List Size {list_size}')
    plt.xlabel('SNR (dB)')
    plt.ylabel('BLER')
    plt.title('SCL Block Error Rate (BLER)')
    plt.legend()

    plt.tight_layout()
    plt.show()

def plot_ber_bler_rnn(snr_range, rnn_results):
    plt.figure(figsize=(12, 6))

    # BER Plot for RNN
    plt.subplot(1, 2, 1)
    plt.plot(snr_range, rnn_results['BER_RNN'], label='RNN')
    plt.xlabel('SNR (dB)')
    plt.ylabel('BER')
    plt.title('RNN Bit Error Rate (BER)')
    plt.legend()

    # BLER Plot for RNN
    plt.subplot(1, 2, 2)
    plt.plot(snr_range, rnn_results['BLER_RNN'], label='RNN')
    plt.xlabel('SNR (dB)')
    plt.ylabel('BLER')
    plt.title('RNN Block Error Rate (BLER)')
    plt.legend()

    plt.tight_layout()
    plt.show()

def plot_combined_ber_bler(snr_range, rnn_results, scl_results):
    plt.figure(figsize=(12, 6))

    # Bit Error Rate (BER)
    plt.subplot(1, 2, 1)
    plt.plot(snr_range, rnn_results['BER_RNN'], label='RNN')
    for list_size, results in scl_results.items():
        plt.plot(snr_range, [result['BER'] for result in results], label=f'SCL, List Size {list_size}')
    plt.xlabel('SNR (dB)')
    plt.ylabel('Bit Error Rate')
    plt.title('Bit Error Rate (BER)')
    plt.legend()
    plt.grid(True)
    plt.yscale('log') # Use a log scale for BER


    # Block Error Rate (BLER)
    plt.subplot(1, 2, 2)
    plt.plot(snr_range, rnn_results['BLER_RNN'], label='RNN')
    for list_size, results in scl_results.items():
        plt.plot(snr_range, [result['BLER'] for result in results], label=f'SCL, List Size {list_size}')
    plt.xlabel('SNR (dB)')
    plt.ylabel('Block Error Rate')
    plt.title('Block Error Rate (BLER)')
    plt.legend()
    plt.grid(True)
    plt.yscale('log') # Use a log scale for BLER


    plt.tight_layout()
    plt.show()
######################################################################
#Latest main() function
# Main Function
#add configuration varables.
#latest changes
def main():
    try:
      # Configuration parameters
        BLOCK_LENGTH = 128
        INFO_BITS = 64
        LEARNING_RATE = 1e-3
        EPOCHS = 25
        BATCH_SIZE = 32
        NUM_SAMPLES_TRAIN = 1000
        NUM_TRIALS_PERF = 500
        SNR_RANGE_AWGN = np.linspace(0, 5, 11)
        LIST_SIZES = [1, 8, 16]

        logging.basicConfig(level=logging.INFO)

        # Configuration
        # PolarCodeGenerator will now calculate frozen set using GA (placeholder)
        #polar_code_gen = PolarCodeGenerator(N=BLOCK_LENGTH, K=INFO_BITS)
        polar_code_gen = PolarCodeGenerator(BLOCK_LENGTH, INFO_BITS)

        logging.info(f"Code Rate: {polar_code_gen.R}")
        #################################################################
        #latest fix for dataset generation
          # Data Preparation
           # Data Preparation
        X_raw, y_raw = prepare_polar_dataset(polar_code_gen, num_samples=NUM_SAMPLES_TRAIN, snr_db=5.0, channel_type='AWGN') # Line 475
        print("Shape of X_raw after preparation:", X_raw.shape)
        print("Shape of y_raw after preparation:", y_raw.shape)
        save_dataset_to_csv(X_raw, y_raw, 'awgn_dataset.csv')








# Determine the number of full blocks
        num_samples = X_raw.shape[0]
        encoded_signal_length = X_raw.shape[1] # This should be 71 with CRC-7
        num_blocks = num_samples * encoded_signal_length // BLOCK_LENGTH

# Ensure data lengths are valid and aligned
        X_aligned = X_raw.flatten()[:num_blocks * BLOCK_LENGTH]
        y_aligned = y_raw[:num_blocks]

# Convert to tensors
        X_tensor = torch.FloatTensor(X_aligned).view(-1, BLOCK_LENGTH)
        y_tensor = torch.FloatTensor(y_aligned).view(-1, INFO_BITS)

         # Debugging shapes (you already have these)
        print("X_tensor shape:", X_tensor.shape)
        print("y_tensor shape:", y_tensor.shape)

# Debugging shapes
        print("X_tensor shape:", X_tensor.shape)
        print("y_tensor shape:", y_tensor.shape)

# Split data
        train_size = int(0.8 * X_tensor.shape[0]) # Use the actual number of samples in X_tensor
        val_size = X_tensor.shape[0] - train_size

# Ensure that indices do not exceed the tensor size
        if train_size + val_size != X_tensor.shape[0]:
            raise ValueError("Training and validation sizes do not sum to total dataset size.")

        train_X = X_tensor[:train_size]
        train_y = y_tensor[:train_size]
        val_X = X_tensor[train_size:]
        val_y = y_tensor[train_size:]

# Confirm correct splits
        print("Training samples:", len(train_X))
        print("Validation samples:", len(val_X))
        #################################################################


        # Proceed with model training...

        ################################################################



        # RNN Model Training
        rnn_model_1 = EnhancedRNNDecoder(input_size=BLOCK_LENGTH, output_size=INFO_BITS)
        rnn_trainer_1 = DecoderTrainer(rnn_model_1, learning_rate=LEARNING_RATE)

        # Corrected call to train() - removed val_X and val_y as positional arguments
        train_losses, val_losses = rnn_trainer_1.train(
            train_X, train_y, epochs=EPOCHS, batch_size=BATCH_SIZE
        )

        # Proceed with further operations...
        # ... (rest of your main function) ...

###############################






        # RNN Performance Comparison
        rnn_perf_results = performance_comparison(
            rnn_trainer_1, polar_code_gen, SNR_RANGE_AWGN, 'AWGN', LIST_SIZES, NUM_TRIALS_PERF
        )

         # SCL Performance for all list sizes
        scl_perf_results = {}
        for list_size in LIST_SIZES:
            # The SCLDecoder instantiation happens INSIDE run_scl_decoder,
            # so we just need to call run_scl_decoder correctly here.
            logging.info(f"Running SCL Decoder performance for List Size: {list_size}")
            scl_results_for_list_size = run_scl_decoder(
                polar_code_gen,
                SNR_RANGE_AWGN,
                list_size,
                'AWGN',
                NUM_TRIALS_PERF
            )
            scl_perf_results[list_size] = scl_results_for_list_size

        # SCL Performance for all list sizes
       # scl_perf_results = {}
        #for list_size in LIST_SIZES:
            # run_scl_decoder will now use the frozen set and CRC polynomial
          #  scl_perf_results[list_size] = run_scl_decoder(polar_code_gen, SNR_RANGE_AWGN, list_size, 'AWGN', NUM_TRIALS_PERF)


        # Plot Training and Validation Loss
        plot_training_validation(train_losses, val_losses)

        # Plot Combined BER/BLER
        plot_combined_ber_bler(SNR_RANGE_AWGN, rnn_perf_results, scl_perf_results)

        # Confusion Matrix Example (for a specific SNR)
        # snr_index = 5  # Adjust as needed
        # y_true_example = []  # Replace with true labels from your specific validation set for this SNR
        # y_pred_example = []  # Replace with predicted labels for this SNR
        # plot_confusion_matrix(y_true_example, y_pred_example, title=f'Confusion Matrix at SNR={SNR_RANGE_AWGN[snr_index]} dB')

        logging.info("🎉 AWGN Channel Simulation Complete!")

    except Exception as e:
        logging.error(f"Simulation Error: {e}")
        traceback.print_exc()

if __name__ == "__main__":
    main()
######################################################################



Shape of X_raw after preparation: (1000, 128)
Shape of y_raw after preparation: (1000, 64)
X_tensor shape: torch.Size([1000, 128])
y_tensor shape: torch.Size([1000, 64])
X_tensor shape: torch.Size([1000, 128])
y_tensor shape: torch.Size([1000, 64])
Training samples: 800
Validation samples: 200




SCL Decoder - SNR: 0.0 dB, List Size: 1, BER: 0.498844, BLER: 1.000000




SCL Decoder - SNR: 0.5 dB, List Size: 1, BER: 0.502125, BLER: 1.000000




SCL Decoder - SNR: 1.0 dB, List Size: 1, BER: 0.497031, BLER: 1.000000




SCL Decoder - SNR: 1.5 dB, List Size: 1, BER: 0.497906, BLER: 1.000000




SCL Decoder - SNR: 2.0 dB, List Size: 1, BER: 0.497906, BLER: 1.000000




SCL Decoder - SNR: 2.5 dB, List Size: 1, BER: 0.500219, BLER: 1.000000




SCL Decoder - SNR: 3.0 dB, List Size: 1, BER: 0.500844, BLER: 1.000000




SCL Decoder - SNR: 3.5 dB, List Size: 1, BER: 0.499156, BLER: 1.000000




SCL Decoder - SNR: 4.0 dB, List Size: 1, BER: 0.499094, BLER: 1.000000




SCL Decoder - SNR: 4.5 dB, List Size: 1, BER: 0.502281, BLER: 1.000000




SCL Decoder - SNR: 5.0 dB, List Size: 1, BER: 0.499937, BLER: 1.000000




SCL Decoder - SNR: 0.0 dB, List Size: 8, BER: 0.499906, BLER: 1.000000




SCL Decoder - SNR: 0.5 dB, List Size: 8, BER: 0.492000, BLER: 1.000000




In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
import logging
import pandas as pd

# Configure logging
logging.basicConfig(level=logging.INFO)

# Configuration parameters
BLOCK_LENGTH = 128
INFO_BITS = 64
LEARNING_RATE = 1e-3
EPOCHS = 50
BATCH_SIZE = 32
NUM_SAMPLES_TRAIN = 10000
NUM_TRIALS_PERF = 1000
SNR_RANGE_AWGN = np.linspace(0, 5, 11)
LIST_SIZES = [1, 8, 16]

#part two
class PolarCodeGenerator:
    def __init__(self, N, K, crc_type='CRC-7'):
        self.N = N
        self.K = K
        self.R = K / N
        self.crc_type = crc_type
        self.crc_polynomials = {'CRC-7': (np.array([1, 0, 0, 1, 1, 0, 0, 1]), 7)}
        self.K_crc = self.K + (self.crc_polynomials[self.crc_type][1] if crc_type in self.crc_polynomials else 0)

    def generate_info_bits(self):
        return np.random.randint(2, size=self.K)

    def polar_encode(self, info_bits):
        # Step 1: Compute CRC and append to info bits
        info_bits_with_crc = self.compute_crc(info_bits) # Variable is assigned here

        # Ensure the length of info_bits_with_crc is K_crc
        if len(info_bits_with_crc) != self.K_crc: # Variable is used here
             raise ValueError(f"Length of info_bits_with_crc ({len(info_bits_with_crc)}) does not match K_crc ({self.K_crc})")

        # Step 2: Create a placeholder sequence of length N (all zeros initially)
        u = np.zeros(self.N, dtype=int)

        # Step 3: Place the info_bits_with_crc into the "information" indices
        # This is still a simplified placement. In a real polar code,
        # the bits at the "good" channels are set to the info+CRC bits.
        if len(info_bits_with_crc) > len(self.info_indices): # Variable is used here
             raise ValueError("Number of info+CRC bits exceeds the allocated info indices.")
        u[self.info_indices] = info_bits_with_crc # Variable is used here

        # ... rest of the function ...
#    def polar_encode(self, info_bits):
        # Encoding logic with CRC
 #       pass


    def compute_crc(self, info_bits):
        if self.crc_type not in self.crc_polynomials:
            return info_bits
        polynomial, length = self.crc_polynomials[self.crc_type]
        data_for_crc = np.copy(info_bits)
        crc_bits = self.crc(data_for_crc, polynomial, length)
        return np.concatenate((info_bits, crc_bits))

def crc(self, data, polynomial, length):
    # Append zeros to data for CRC calculation
    data = np.concatenate((data, np.zeros(length, dtype=int)))
    polynomial = np.array(polynomial)

    # Perform bitwise mod-2 division
    for i in range(len(data) - length):
        if data[i] == 1:
            data[i:i+length+1] ^= polynomial

    return data[-length:]
#####################################################
def polar_transform(codeword):
    """
    Applies the polar transform (Fast Fourier Transform-like structure) to a codeword.
    The codeword length must be a power of 2.
    """
    n = len(codeword)
    if not (n and not (n & (n - 1))):
        raise ValueError("Codeword length must be a power of 2")

    if n == 1:
        return codeword # Base case: returns the single bit

    half_n = n // 2
    c1 = codeword[:half_n] # Slicing
    c2 = codeword[half_n:] # Slicing

    transformed_c1 = polar_transform(np.array(c1)) # Recursive call
    transformed_c2 = polar_transform(np.array(c2)) # Recursive call

    transformed_codeword = np.zeros(n, dtype=int) # Initializes with integers
    transformed_codeword[:half_n] = (transformed_c1 + transformed_c2) % 2 # Arithmetic on transformed_c1 and transformed_c2
    transformed_codeword[half_n:] = transformed_c2 # Assignment

    return transformed_codeword # Returns the resulting integer array


##########################################################

###################################################


def bit_reverse_permutation(n_bits):
    indices = np.arange(1 << n_bits)
    reversed_indices = np.zeros_like(indices)
    for i in range(n_bits):
        mask = 1 << i
        reversed_indices = (reversed_indices << 1) | ((indices & mask) >> i)
    return reversed_indices

#part three
#print("Type of bits:", type(bits))
def bpsk_modulate(bits):
    print(f"Before np.array: type={type(bits)}, shape={bits.shape if hasattr(bits, 'shape') else 'N/A'}")
    print(f"Before np.array: values={bits if not hasattr(bits, '__len__') or len(bits) < 20 else bits[:20]}") # Print first few values

    # Ensure the input is a NumPy array of integers
    # Make sure this line uses dtype=int
    bits = np.array(bits, dtype=int)

    print(f"After np.array: type={type(bits)}, shape={bits.shape}")
    print(f"After np.array: dtype={bits.dtype}")
    print(f"After np.array: values={bits if len(bits) < 20 else bits[:20]}") # Print first few values

    modulated_bits = 2 * bits - 1

    print(f"After modulation: type={type(modulated_bits)}, shape={modulated_bits.shape}")
    print(f"After modulation: dtype={modulated_bits.dtype}")
    print(f"After modulation: values={modulated_bits if len(modulated_bits) < 20 else modulated_bits[:20]}") # Print first few values


    return modulated_bits # Ensure you return the result of the multiplication/subtraction



class EnhancedChannelSimulator:
    def __init__(self, channel_type='AWGN'):
        self.channel_type = channel_type

    def simulate(self, signal, snr_db):
        snr_linear = 10 ** (snr_db / 10)
        noise_std = np.sqrt(1 / (2 * snr_linear))
        noise = noise_std * np.random.randn(*signal.shape)
        return signal + noise

def prepare_polar_dataset(polar_code_gen, num_samples, snr_db=5, channel_type='AWGN'):
    channel_simulator = EnhancedChannelSimulator(channel_type=channel_type)
    X, y = [], []

    for _ in range(num_samples):
        info_bits = polar_code_gen.generate_info_bits()
        encoded_signal = polar_code_gen.polar_encode(info_bits)
        modulated_signal = bpsk_modulate(encoded_signal)
        received_signal = channel_simulator.simulate(modulated_signal, snr_db)
        X.append(received_signal)
        y.append(info_bits)

    return np.array(X), np.array(y)
#part four
class EnhancedRNNDecoder(nn.Module):
    def __init__(self, input_size, output_size):
        super(EnhancedRNNDecoder, self).__init__()
        self.model = nn.Sequential(
            nn.Linear(input_size, 128),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(64, output_size),
            nn.Sigmoid()
        )

    def forward(self, x):
        if x.dim() == 1:
            x = x.unsqueeze(0)
        if x.dim() > 2:
            x = x.view(x.size(0), -1)
        return self.model(x)

class DecoderTrainer:
    def __init__(self, model, learning_rate):
        self.model = model
        self.criterion = nn.BCELoss()
        self.optimizer = optim.Adam(model.parameters(), lr=learning_rate)

    def train(self, X_train, y_train, X_val=None, y_val=None, epochs=100, batch_size=32):
        dataset = torch.utils.data.TensorDataset(X_train, y_train)
        loader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=True)

        train_losses = []
        val_losses = []

        for epoch in range(epochs):
            epoch_loss = 0
            self.model.train()

            for X_batch, y_batch in loader:
                X_batch = X_batch.view(-1, BLOCK_LENGTH)
                self.optimizer.zero_grad()
                outputs = self.model(X_batch)
                loss = self.criterion(outputs, y_batch)
                loss.backward()
                self.optimizer.step()
                epoch_loss += loss.item()

            train_loss = epoch_loss / len(loader)
            train_losses.append(train_loss)
            logging.info(f"Epoch {epoch+1}/{epochs}, Training Loss: {train_loss:.4f}")

            if X_val is not None and y_val is not None:
                self.model.eval()
                with torch.no_grad():
                    val_output = self.model(X_val.view(-1, BLOCK_LENGTH))
                    val_loss = self.criterion(val_output, y_val).item()
                    val_losses.append(val_loss)
                    logging.info(f"Epoch {epoch+1}/{epochs}, Validation Loss: {val_loss:.4f}")

        return train_losses, val_losses if X_val is not None else None

#part 5

class SCLDecoder:
    def __init__(self, N, K, list_size, crc_poly=None):
        self.N = N
        self.K = K
        self.list_size = list_size
        self.crc_poly = crc_poly
        self.frozen_set = self._get_frozen_set()
        self.info_set = sorted(list(set(range(N)) - set(self.frozen_set)))

    def _get_frozen_set(self):
        return set(range(self.K, self.N))

    def decode(self, received_signal, snr_db):
        snr_linear = 10 ** (snr_db / 10)
        llrs = 2 * received_signal * snr_linear

        paths = [np.zeros(self.N, dtype=int) for _ in range(self.list_size)]
        path_metrics = [0.0 for _ in range(self.list_size)]
        current_llrs = [llrs.copy() for _ in range(self.list_size)]

        for bit_index in range(self.N):
            new_paths = []
            new_path_metrics = []
            new_llrs = []

            for path_idx in range(len(paths)):
                current_path = paths[path_idx]
                current_metric = path_metrics[path_idx]
                path_llrs = current_llrs[path_idx]

                if bit_index in self.frozen_set:
                    decision = 0
                    new_path = current_path.copy()
                    new_path[bit_index] = decision
                    new_metric = current_metric - 0.5 * path_llrs[bit_index] if decision == 0 else current_metric + 0.5 * path_llrs[bit_index]
                    new_paths.append(new_path)
                    new_path_metrics.append(new_metric)
                    new_llrs.append(path_llrs.copy())
                else:
                    for decision in [0, 1]:
                        new_path = current_path.copy()
                        new_path[bit_index] = decision
                        new_metric = current_metric - 0.5 * path_llrs[bit_index] if decision == 0 else current_metric + 0.5 * path_llrs[bit_index]
                        new_paths.append(new_path)
                        new_path_metrics.append(new_metric)
                        new_llrs.append(path_llrs.copy())

            sorted_indices = np.argsort(new_path_metrics)[:self.list_size]
            paths = [new_paths[i] for i in sorted_indices]
            path_metrics = [new_path_metrics[i] for i in sorted_indices]
            current_llrs = [new_llrs[i] for i in sorted_indices]

        best_path_index = np.argmin(path_metrics)
        decoded_info_bits = paths[best_path_index][self.info_set]
        return decoded_info_bits
#part 6
def plot_training_validation(train_losses, val_losses):
    plt.figure(figsize=(8, 4))
    plt.plot(train_losses, label='Training Loss')
    if val_losses:
        plt.plot(val_losses, label='Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title('Training and Validation Loss')
    plt.legend()
    plt.show()

def plot_ber_bler_comparison(snr_range, rnn_results, scl_results_all, list_sizes):
    plt.figure(figsize=(12, 6))

    # BER Plot
    plt.subplot(1, 2, 1)
    plt.yscale('log')
    plt.ylim(1e-4, 1)
    plt.plot(snr_range, rnn_results['BER_RNN'], label='RNN')
    for size, scl_results in zip(list_sizes, scl_results_all):
        plt.plot(snr_range, [result['BER'] for result in scl_results], label=f'SCL, List Size {size}')
    plt.xlabel('SNR (dB)')
    plt.ylabel('BER')
    plt.title('Bit Error Rate (BER)')
    plt.legend()

    # BLER Plot
    plt.subplot(1, 2, 2)
    plt.yscale('log')
    plt.ylim(1e-4, 1)
    plt.plot(snr_range, rnn_results['BLER_RNN'], label='RNN')
    for size, scl_results in zip(list_sizes, scl_results_all):
        plt.plot(snr_range, [result['BLER'] for result in scl_results], label=f'SCL, List Size {size}')
    plt.xlabel('SNR (dB)')
    plt.ylabel('BLER')
    plt.title('Block Error Rate (BLER)')
    plt.legend()

    plt.tight_layout()
    plt.show()

def plot_confusion_matrix(y_true, y_pred, title='Confusion Matrix'):
    cm = confusion_matrix(y_true, y_pred)
    disp = ConfusionMatrixDisplay(confusion_matrix=cm)
    disp.plot()
    plt.title(title)
    plt.show()
#part 7

def main():
    try:
        # Configuration parameters
        BLOCK_LENGTH = 128
        INFO_BITS = 64
        LEARNING_RATE = 1e-3
        EPOCHS = 25
        BATCH_SIZE = 32
        NUM_SAMPLES_TRAIN = 1000
        NUM_TRIALS_PERF = 500
        SNR_RANGE_AWGN = np.linspace(0, 5, 11)
        LIST_SIZES = [1, 8, 16]

        logging.basicConfig(level=logging.INFO)

        # Polar code generator initialization
        polar_code_gen = PolarCodeGenerator(N=BLOCK_LENGTH, K=INFO_BITS)

        logging.info(f"Code Rate: {polar_code_gen.R}")

        # Data preparation
        X_raw, y_raw = prepare_polar_dataset(
            polar_code_gen, num_samples=NUM_SAMPLES_TRAIN, snr_db=5.0, channel_type='AWGN'
        )
        save_dataset_to_csv(X_raw, y_raw, 'awgn_dataset.csv')

        # Reshape and split data
        X_tensor = torch.FloatTensor(X_raw)
        y_tensor = torch.FloatTensor(y_raw)

        train_size = int(0.8 * X_tensor.shape[0])
        train_X = X_tensor[:train_size]
        train_y = y_tensor[:train_size]
        val_X = X_tensor[train_size:]
        val_y = y_tensor[train_size:]

        # RNN Model Training
        rnn_model_1 = EnhancedRNNDecoder(input_size=BLOCK_LENGTH, output_size=INFO_BITS)
        rnn_trainer_1 = DecoderTrainer(rnn_model_1, learning_rate=LEARNING_RATE)

        train_losses, val_losses = rnn_trainer_1.train(
            train_X, train_y, X_val=val_X, y_val=val_y, epochs=EPOCHS, batch_size=BATCH_SIZE
        )

        # Performance comparison for all list sizes
        rnn_perf_results = performance_comparison(
            rnn_trainer_1, polar_code_gen, SNR_RANGE_AWGN, 'AWGN', LIST_SIZES, NUM_TRIALS_PERF
        )

        # Collect results for all list sizes
        scl_perf_results_all = [run_scl_decoder(polar_code_gen, SNR_RANGE_AWGN, size, 'AWGN', NUM_TRIALS_PERF) for size in LIST_SIZES]

        # Plot results
        plot_training_validation(train_losses, val_losses)
        plot_ber_bler_comparison(SNR_RANGE_AWGN, rnn_perf_results, scl_perf_results_all, LIST_SIZES)

        # Confusion Matrix
        snr_index = 5
        X_test, y_test = prepare_polar_dataset(polar_code_gen, num_samples=100, snr_db=SNR_RANGE_AWGN[snr_index], channel_type='AWGN')
        X_test_tensor = torch.FloatTensor(X_test)

        with torch.no_grad():
            rnn_predictions = rnn_trainer_1.model(X_test_tensor).round().detach().numpy()

        y_true_example = y_test.flatten()
        y_pred_example = rnn_predictions.flatten()

        if len(y_true_example) and len(y_pred_example):
            plot_confusion_matrix(y_true_example, y_pred_example, title=f'Confusion Matrix at SNR={SNR_RANGE_AWGN[snr_index]:.1f} dB')
        else:
            logging.warning(f"Confusion Matrix data is empty for SNR={SNR_RANGE_AWGN[snr_index]:.1f} dB. Skipping plot.")

        logging.info("🎉 AWGN Channel Simulation Complete!")

    except Exception as e:
        logging.error(f"Simulation Error: {e}")
        traceback.print_exc()

if __name__ == "__main__":
    main()