<a href="https://colab.research.google.com/github/pollyjuice74/ECCT/blob/main/ECCT_on_5G.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# ECCT
!git clone https://github.com/pollyjuice74/ECCT
import os
os.chdir('ECCT')

from args import pass_args_ecct
from Model import *
from Codes import *

# Enc/Dec 5G
!pip install sionna
from sionna.fec.ldpc.encoding import LDPC5GEncoder
from sionna.utils import BitErrorRate, BinarySource
from sionna.mapping import Mapper, Demapper
from sionna.channel import AWGN

!wget https://raw.githubusercontent.com/pollyjuice74/REU-LDPC-Project/main/5g_enc_dec/decoder.py
from decoder import LDPC5GDecoder

# Other
from torch.nn import functional as F
import torch.nn as nn
import torch
import copy
import time


Cloning into 'ECCT'...
remote: Enumerating objects: 125, done.[K
remote: Counting objects: 100% (125/125), done.[K
remote: Compressing objects: 100% (115/115), done.[K
remote: Total 125 (delta 68), reused 31 (delta 9), pack-reused 0[K
Receiving objects: 100% (125/125), 92.09 KiB | 1.30 MiB/s, done.
Resolving deltas: 100% (68/68), done.
Collecting sionna
  Downloading sionna-0.18.0-py3-none-any.whl (2.5 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.5/2.5 MB[0m [31m10.5 MB/s[0m eta [36m0:00:00[0m
Collecting mitsuba>=3.2.0 (from sionna)
  Downloading mitsuba-3.5.2-cp310-cp310-manylinux2014_x86_64.manylinux_2_17_x86_64.whl (40.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m40.4/40.4 MB[0m [31m14.1 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting pythreejs>=2.4.2 (from sionna)
  Downloading pythreejs-2.4.2-py3-none-any.whl (3.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.4/3.4 MB[0m [31m46.3 MB/s[0m eta [3

In [2]:
class ECC_Transformer(nn.Module):
    def __init__(self, args, encoder, dropout=0):
        super(ECC_Transformer, self).__init__()

        # code = args.code
        c = copy.deepcopy
        attn = MultiHeadedAttention(args.h, args.d_model)
        ff = PositionwiseFeedForward(args.d_model, args.d_model*4, dropout)

        # 5G Compliant encoder/decoder
        self.encoder5G = encoder

        self.src_embed = torch.nn.Parameter(torch.empty(self.encoder5G._n_ldpc + self.encoder5G.pcm.shape[0], args.d_model)) ### #(code.n + code.pc_matrix.size(0), args.d_model)))

        self.decoder = Encoder(EncoderLayer(
            args.d_model, c(attn), c(ff), dropout), args.N_dec)

        self.oned_final_embed = torch.nn.Sequential(
            *[nn.Linear(args.d_model, 1)])

        self.out_fc = nn.Linear(self.encoder5G._n_ldpc + self.encoder5G.pcm.shape[0], self.encoder5G._n_ldpc) ###

        self.get_mask()
        print(f'Mask:\n {self.src_mask}')

        for p in self.parameters():
            if p.dim() > 1:
                nn.init.xavier_uniform_(p)


    def forward(self, magnitude, syndrome):
        emb = torch.cat([magnitude, syndrome], -1).unsqueeze(-1)
        emb = self.src_embed.unsqueeze(0) * emb
        emb = self.decoder(emb, self.src_mask)
        return self.out_fc(self.oned_final_embed(emb).squeeze(-1))


    def loss(self, z_pred, z2, y):
        loss = F.binary_cross_entropy_with_logits(
            z_pred, sign_to_bin(torch.sign(z2)))

        x_pred = sign_to_bin(torch.sign(-z_pred * torch.sign(y)))
        return loss, x_pred


    def get_mask(self, no_mask=False):
        if no_mask:
            self.src_mask = None
            return

        mask_size = self.encoder5G._n_ldpc +self.encoder5G.pcm.shape[0] # n + m
        src_mask = self.build_mask(mask_size)
        # print(src_mask, mask_size)

        a = mask_size ** 2
        # print(a, )

        print(
            f'Self-Attention Sparsity Ratio={100 * torch.sum((src_mask).int()) / a:0.2f}%, Self-Attention Complexity Ratio={100 * torch.sum((~src_mask).int())//2 / a:0.2f}%')
        self.register_buffer('src_mask', src_mask)


    def build_mask(self, mask_size):
            mask = torch.eye(mask_size, mask_size)

            for ii in range(self.encoder5G.pcm.shape[0]): # m
                # edge idxs for cn ii in pcm
                idx = self.encoder5G.pcm[ii].indices #torch.where(self.encoder5G.pcm[ii] > 0)[0] #code.pc_matrix[ii]

                # print(ii, idx) #self.encoder5G.pcm[ii].indices,
                # print(self.encoder5G._n_ldpc, self.encoder5G._k_ldpc)
                # print()

                for jj in idx:
                    for kk in idx:

                        # print(mask.shape)
                        if jj != kk:
                            mask[jj, kk] += 1
                            mask[kk, jj] += 1
                            mask[self.encoder5G._n_ldpc + ii, jj] += 1
                            mask[jj, self.encoder5G._n_ldpc + ii] += 1

            src_mask = ~ (mask > 0).unsqueeze(0).unsqueeze(0)
            return src_mask



In [5]:
from sionna.mapping import Mapper, Demapper

# Code data
k, n = (90, 100)
bps = 4 # bits per symbol
args = pass_args_ecct()

# 5G compliant encoder
enc = LDPC5GEncoder(k,n)
# Decoder models
dec = LDPC5GDecoder(enc)
ecct = ECC_Transformer(args, enc)

# Message generation
binary_source = BinarySource()
# Channel objects
mapper = Mapper("qam", bps)
channel = AWGN()
demapper = Demapper("app", "qam", bps)


def train(model, batch_size=10, iters=100):
    model.train()
    cum_loss = cum_ber = cum_fer = cum_samples = 0
    t = time.time()

    for i in range(iters):
        b = binary_source([batch_size, enc._k]) # (k,1)
        c = enc(b) # (n,1)

        # NOISELESS Channel to get (n_ldpc,1) original llrs
        x = mapper(c)
        llr = demapper([x, no]) # no noise # (n,1)

        llr_noiseless, _, _ = dec(llr) # decoder turns (n,1) to (n_ldpc,1)

        # AWGN Channel
        x = self._mapper(c_pad)
        y = self._channel([x, no])
        llr_r = self._demapper([y, no])

        llr_nldpc, _, _ = dec(llr_r) # decoder turns (n,1) to (n_ldpc,1)

        # Model pred from noisy llrs
        llr_noiseless_hat = model(llr_nldpc) # (n_ldpc,1)

        loss = F.binary_cross_entropy_with_logits(llr_noiseless_hat, llr_noiseless)

        # llr to bin, first n values of llr_hat (n_ldpc,1) correspond to c (n,1)
        c_hat = (llr_hat[:enc._n] > 0).float()

        cum_loss += loss.item() * x.shape[0]
        cum_ber += BER(c_hat, c) * x.shape[0]
        cum_fer += FER(c_hat, c) * x.shape[0]
        cum_samples += x.shape[0]

        if i%10 == 0:
            print(f'Batch {i + 1}/{iters}: Loss={cum_loss / cum_samples:.2e} BER={cum_ber / cum_samples:.2e} FER={cum_fer / cum_samples:.2e}')

    print(f'Train time: {time.time() - t:.2f}s\n')
    return cum_loss / cum_samples, cum_ber / cum_samples, cum_fer / cum_samples


def test(model):
  pass



train(ecct)

Path to model/logs: Results_ECCT/POLAR__Code_n_64_k_32__27_06_2024_16_10_59
Namespace(epochs=1000, workers=4, lr=0.0001, gpus='-1', batch_size=128, test_batch_size=2048, seed=42, code_type='POLAR', code_k=32, code_n=64, standardize=False, N_dec=6, d_model=32, h=8, code=<args.pass_args_ecct.<locals>.Code object at 0x7d4b55343fa0>, path='Results_ECCT/POLAR__Code_n_64_k_32__27_06_2024_16_10_59')
Self-Attention Sparsity Ratio=99.00%, Self-Attention Complexity Ratio=0.50%
Mask:
 tensor([[[[False,  True,  True,  ...,  True,  True,  True],
          [ True, False,  True,  ...,  True,  True,  True],
          [ True,  True, False,  ...,  True,  True,  True],
          ...,
          [ True,  True,  True,  ..., False,  True,  True],
          [ True,  True,  True,  ...,  True, False,  True],
          [ True,  True,  True,  ...,  True,  True, False]]]])


NameError: name 'no' is not defined