<a href="https://colab.research.google.com/github/ishan080604/Neuromorphic-Integrated-Sensing-and-Communications-N-ISAC-/blob/main/IOT_Project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.metrics import accuracy_score
from scipy import signal

In [None]:
L = 80  # Number of slots
L_b = 1  # Bandwidth expansion factor
N_c = 5  # Number of clutter paths
SNR_dB = 10  # Signal-to-Noise Ratio in dB
num_train_examples = 60000
num_test_examples = 10000

def generate_data(L, L_b, N_c, SNR_dB, num_examples):
    # System parameters
    T_c = 1  # Chip duration (normalized)
    T = 2 * L_b * T_c  # Slot duration
    T_h = 4 * T_c  # Maximum delay spread
    L_h = int(T_h / T_c)  # Number of channel taps

    # Generate data
    data = []
    for _ in range(num_examples):
        # Generate random bits
        x = np.random.randint(0, 2, L)

        # Generate random target presence
        v = np.random.randint(0, 2)

        # Generate IR signal
        s = np.zeros(2 * L_b * L)
        for l in range(L):
            if x[l] == 0:
                s[2 * l * L_b] = 1
            else:
                s[(2 * l + 1) * L_b] = 1

        # Generate channel
        h = np.zeros(L_h, dtype=complex)
        if v == 1:
            h[0] = np.random.normal(0, 1) + 1j * np.random.normal(0, 1)
        for _ in range(N_c):
            delay = np.random.randint(0, L_h)
            h[delay] += np.random.normal(0, 1) + 1j * np.random.normal(0, 1)

        # Apply channel
        y = signal.convolve(s, h, mode='same')

        # Add noise
        signal_power = np.mean(np.abs(y)**2)
        SNR_linear = 10**(SNR_dB/10)
        noise_power = signal_power / SNR_linear
        noise = np.sqrt(noise_power/2) * (np.random.normal(0, 1, len(y)) + 1j * np.random.normal(0, 1, len(y)))
        y_noisy = y + noise

        # Prepare input for SNN
        y_real = np.real(y_noisy)
        y_imag = np.imag(y_noisy)
        y_input = np.concatenate([y_real, y_imag])

        data.append((y_input, x, v))

    return data

train_data = generate_data(L, L_b, N_c, SNR_dB, num_train_examples)
test_data = generate_data(L, L_b, N_c, SNR_dB, num_test_examples)

In [None]:
train_y = []
for i in range(0, 321, 4):
  train_y.append(i)

In [None]:
y_input_train = []
x_train = []
v_train = []

for i in range(len(train_data)):
  for j in range(len(train_y) - 1):
    y_input_train.append([train_data[i][0][train_y[j]:train_y[j+1]]])

for i in range(len(train_data)):
  for j in range(L):
    x_train.append([train_data[i][1][j]])

for i in range(len(train_data)):
  for j in range(L):
    v_train.append(train_data[i][2])

In [None]:
y_input_train = torch.tensor(y_input_train, dtype = torch.float64)
x_train = torch.tensor(x_train, dtype = torch.float64)
v_train = torch.tensor(v_train, dtype = torch.float64)

  y_input_train = torch.tensor(y_input_train, dtype = torch.float64)


In [None]:
class ISACNN(nn.Module):
  def __init__(self, hidden_neurons):
    super().__init__()
    self.linear = nn.Linear(hidden_neurons, 1)
    self.relu = nn.ReLU()
    # self.flatten = nn.Flatten(start_dim=0, end_dim=-1)
    self.linear_relu_stack = nn.Sequential(
        nn.Linear(4*L_b, hidden_neurons),
        nn.ReLU(),
    )

  def forward(self, y):
    # y = self.flatten(y)
    y = self.linear_relu_stack(y)
    comm = self.linear(y)
    sense = self.linear(y)
    return comm, sense

In [None]:
train1 = []
for i in range(0, (80*60000) + 1, 80):
  train1.append(i)

In [None]:
y_input = []
x_input = []
v_input = []

for i in range(len(train1) - 1):
  y_input.append(y_input_train[train1[i] : train1[i+1]])

for i in range(len(train1) - 1):
  x_input.append(x_train[train1[i] : train1[i+1]])

for i in range(len(train1) - 1):
  v_input.append(v_train[train1[i] : train1[i+1]])

In [None]:
y_input_tensor = torch.stack(y_input).reshape(60000, 80, -1)
x_input_tensor = torch.stack(x_input)
v_input_tensor = torch.stack(v_input)

# **Training ISAC**

In [None]:
epochs = 10
beta = 0.5
learning_rate = 1e-3
batch_size = 32
h = 10

model = ISACNN(h)
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
criterion = nn.BCEWithLogitsLoss()


dataset = TensorDataset(y_input_tensor, x_input_tensor, v_input_tensor)
dataloader = DataLoader(dataset, batch_size=batch_size)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

for epoch in range(epochs):
    print(f"Epoch {epoch+1}\n-------------------------------")
    total_loss = 0
    for batch_idx, (y_batch, x_batch, v_batch) in enumerate(dataloader):
        y_batch, x_batch, v_batch = y_batch.to(device), x_batch.to(device), v_batch.to(device)

        loss_comm = 0
        loss_sense = 0

        for l in range(L):
            comm, sense = model(y_batch[:, l, :].float())
            loss_comm_l = criterion(comm.reshape(comm.shape[0], 1), x_batch[:, l].float())
            loss_sense_l = criterion(sense.squeeze(), v_batch[:, l].float())
            loss_comm += loss_comm_l
            loss_sense += loss_sense_l

        loss_comm /= 80
        loss_sense /= 80
        loss = (beta * loss_comm) + (1 - beta) * loss_sense

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

        if batch_idx % 100 == 0:
            print(f'Batch {batch_idx}/{len(dataloader)}: Loss: {loss.item():.4f}')

    avg_loss = total_loss / len(dataloader)
    print(f'Epoch {epoch+1} Average Loss: {avg_loss:.4f}')

Epoch 1
-------------------------------
Batch 0/1875: Loss: 0.7196
Batch 100/1875: Loss: 0.6906
Batch 200/1875: Loss: 0.6970
Batch 300/1875: Loss: 0.6950
Batch 400/1875: Loss: 0.6951
Batch 500/1875: Loss: 0.6953
Batch 600/1875: Loss: 0.6930
Batch 700/1875: Loss: 0.6934
Batch 800/1875: Loss: 0.6940
Batch 900/1875: Loss: 0.6944
Batch 1000/1875: Loss: 0.6928
Batch 1100/1875: Loss: 0.6941
Batch 1200/1875: Loss: 0.6933
Batch 1300/1875: Loss: 0.6894
Batch 1400/1875: Loss: 0.6928
Batch 1500/1875: Loss: 0.6923
Batch 1600/1875: Loss: 0.6920
Batch 1700/1875: Loss: 0.6905
Batch 1800/1875: Loss: 0.6909
Epoch 1 Average Loss: 0.6934
Epoch 2
-------------------------------
Batch 0/1875: Loss: 0.6917
Batch 100/1875: Loss: 0.6933
Batch 200/1875: Loss: 0.6892
Batch 300/1875: Loss: 0.6931
Batch 400/1875: Loss: 0.6938
Batch 500/1875: Loss: 0.6937
Batch 600/1875: Loss: 0.6935
Batch 700/1875: Loss: 0.6926
Batch 800/1875: Loss: 0.6949
Batch 900/1875: Loss: 0.6941
Batch 1000/1875: Loss: 0.6933
Batch 1100/1875

# **Evaluating ISAC**

In [None]:
test_y = []
for i in range(0, 321, 4):
  test_y.append(i)

In [None]:
y_test = []
x_test = []
v_test = []

for i in range(len(test_data)):
  for j in range(len(test_y) - 1):
    y_test.append(test_data[i][0][test_y[j]:test_y[j+1]])

for i in range(len(test_data)):
  for j in range(80):
    x_test.append(test_data[i][1][j])

for i in range(len(test_data)):
  for j in range(80):
    v_test.append(test_data[i][2])

In [None]:
y_test = torch.tensor(y_test, dtype = torch.float64)
x_test = torch.tensor(x_test, dtype = torch.float64)
v_test = torch.tensor(v_test, dtype = torch.float64)

In [None]:
test = []
for i in range(0, (80*10000) + 1, 80):
  test.append(i)

In [None]:
y_input_t = []
x_input_t = []
v_input_t = []

for i in range(len(test) - 1):
  y_input_t.append(y_test[test[i] : test[i+1]])

for i in range(len(test) - 1):
  x_input_t.append(x_test[test[i] : test[i+1]])

for i in range(len(test) - 1):
  v_input_t.append(v_test[test[i] : test[i+1]])

In [None]:
y_input_tensor_t = torch.stack(y_input_t)
x_input_tensor_t = torch.stack(x_input_t)
v_input_tensor_t = torch.stack(v_input_t)

In [None]:
beta = 0.5
criterion = nn.BCEWithLogitsLoss()

model.eval()

dataset = TensorDataset(y_input_tensor_t, x_input_tensor_t, v_input_tensor_t)
dataloader = DataLoader(dataset, batch_size = 1)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

total_loss = 0
correct_comm = 0
correct_sense = 0;

for batch_idx, (y, x, v) in enumerate(dataloader):
  y, x, v = y.to(device), x.to(device), v.to(device)

  loss_comm_t = 0
  loss_sense_t = 0

  for l in range(y.shape[1]):
    comm_t, sense_t = model(y[:, l, :].float())
    loss_comm_l = criterion(comm_t, x[:, l].float().unsqueeze(1))
    loss_sense_l = criterion(sense_t, v[:, l].float().reshape(sense_t.shape))
    loss_comm_t += loss_comm_l
    loss_sense_t += loss_sense_l


  loss_comm_t /= y.shape[1]
  loss_sense_t /= y.shape[1]
  loss = (beta * loss_comm_t) + (1-beta) * loss_sense_t
  total_loss += loss.item()

test_loss = total_loss/(10000)


print(f"Avg loss: {test_loss:>8f} \n")

Avg loss: 0.692422 



# **Separate Sensing & Communication**

# The Model

In [None]:
class SSACNN(nn.Module):
  def __init__(self, hidden_neurons):
    super().__init__()
    self.linear = nn.Linear(hidden_neurons, 1)
    self.relu = nn.ReLU()
    # self.flatten = nn.Flatten()
    self.linear_relu_stack = nn.Sequential(
        nn.Linear(4*L_b, hidden_neurons),
        nn.ReLU(),
        nn.Linear(hidden_neurons, 1),
    )

  def forward(self, y):
    # y = self.flatten(y)
    y = self.linear_relu_stack(y)
    return y

# Dataset for training of Communication part

In [None]:
alpha = 0.5
# Uses alpha*L for communication doesn't care about sensing
# Uses L - alpha*L for sensing bits are set to 1

# Training the Communication Part of SSAC

In [None]:
epochs = 10
learning_rate = 1e-3
batch_size = 32
h = 10

model_comm = SSACNN(h)
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
criterion = nn.BCEWithLogitsLoss()


dataset = TensorDataset(y_input_tensor, x_input_tensor)
dataloader = DataLoader(dataset, batch_size=batch_size)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model_comm.to(device)

for epoch in range(epochs):
    print(f"Epoch {epoch+1}\n-------------------------------")
    total_loss = 0
    for batch_idx, (y_batch, x_batch) in enumerate(dataloader):
        y_batch, x_batch= y_batch.to(device), x_batch.to(device)

        loss_comm = 0

        for l in range(int(alpha*L)):
            comm = model_comm(y_batch[:, l, :].float()).squeeze()
            comm = comm.reshape(comm.shape[0], 1)
            loss_comm_l = criterion(comm, x_batch[:, l].float())
            loss_comm += loss_comm_l

        loss_comm /= int(alpha*L)
        loss = loss_comm

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

        if batch_idx % 100 == 0:
            print(f'Batch {batch_idx}/{len(dataloader)}: Loss: {loss.item():.4f}')

    avg_loss = total_loss / len(dataloader)
    print(f'Epoch {epoch+1} Average Loss: {avg_loss:.4f}')

Epoch 1
-------------------------------
Batch 0/1875: Loss: 0.6792
Batch 100/1875: Loss: 0.7014
Batch 200/1875: Loss: 0.7456
Batch 300/1875: Loss: 0.7484
Batch 400/1875: Loss: 0.7112
Batch 500/1875: Loss: 0.7263
Batch 600/1875: Loss: 0.6881
Batch 700/1875: Loss: 0.7544
Batch 800/1875: Loss: 0.6950
Batch 900/1875: Loss: 0.7100
Batch 1000/1875: Loss: 0.7193
Batch 1100/1875: Loss: 0.7081
Batch 1200/1875: Loss: 0.6928
Batch 1300/1875: Loss: 0.7573
Batch 1400/1875: Loss: 0.7074
Batch 1500/1875: Loss: 0.6769
Batch 1600/1875: Loss: 0.6886
Batch 1700/1875: Loss: 0.7252
Batch 1800/1875: Loss: 0.7272
Epoch 1 Average Loss: 0.7122
Epoch 2
-------------------------------
Batch 0/1875: Loss: 0.6792
Batch 100/1875: Loss: 0.7014
Batch 200/1875: Loss: 0.7456
Batch 300/1875: Loss: 0.7484
Batch 400/1875: Loss: 0.7112
Batch 500/1875: Loss: 0.7263
Batch 600/1875: Loss: 0.6881
Batch 700/1875: Loss: 0.7544
Batch 800/1875: Loss: 0.6950
Batch 900/1875: Loss: 0.7100
Batch 1000/1875: Loss: 0.7193
Batch 1100/1875

In [None]:
epochs = 10
learning_rate = 1e-3
batch_size = 32
h = 10

model_sense = SSACNN(h)
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
criterion = nn.BCEWithLogitsLoss()

dataset = TensorDataset(y_input_tensor, v_input_tensor)
dataloader = DataLoader(dataset, batch_size=batch_size)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model_sense.to(device)

for epoch in range(epochs):
    print(f"Epoch {epoch+1}\n-------------------------------")
    total_loss = 0
    for batch_idx, (y_batch, v_batch) in enumerate(dataloader):
        y_batch, v_batch= y_batch.to(device), v_batch.to(device)

        loss_sense = 0

        for l in range(int(alpha*L), L):
            sense = model_sense(y_batch[:, l, :].float())
            loss_sense_l = criterion(sense.squeeze(), v_batch[:, l].float())
            loss_sense += loss_sense_l

        loss_sense /= L - int(alpha*L)
        loss = loss_sense

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

        if batch_idx % 100 == 0:
            print(f'Batch {batch_idx}/{len(dataloader)}: Loss: {loss.item():.4f}')

    avg_loss = total_loss / len(dataloader)
    print(f'Epoch {epoch+1} Average Loss: {avg_loss:.4f}')

Epoch 1
-------------------------------
Batch 0/1875: Loss: 0.7159
Batch 100/1875: Loss: 0.7225
Batch 200/1875: Loss: 0.7130
Batch 300/1875: Loss: 0.6900
Batch 400/1875: Loss: 0.7352
Batch 500/1875: Loss: 0.6980
Batch 600/1875: Loss: 0.6583
Batch 700/1875: Loss: 0.7061
Batch 800/1875: Loss: 0.7075
Batch 900/1875: Loss: 0.7301
Batch 1000/1875: Loss: 0.6644
Batch 1100/1875: Loss: 0.7099
Batch 1200/1875: Loss: 0.6792
Batch 1300/1875: Loss: 0.6760
Batch 1400/1875: Loss: 0.6611
Batch 1500/1875: Loss: 0.7373
Batch 1600/1875: Loss: 0.6735
Batch 1700/1875: Loss: 0.6823
Batch 1800/1875: Loss: 0.7204
Epoch 1 Average Loss: 0.7089
Epoch 2
-------------------------------
Batch 0/1875: Loss: 0.7159
Batch 100/1875: Loss: 0.7225
Batch 200/1875: Loss: 0.7130
Batch 300/1875: Loss: 0.6900
Batch 400/1875: Loss: 0.7352
Batch 500/1875: Loss: 0.6980
Batch 600/1875: Loss: 0.6583
Batch 700/1875: Loss: 0.7061
Batch 800/1875: Loss: 0.7075
Batch 900/1875: Loss: 0.7301
Batch 1000/1875: Loss: 0.6644
Batch 1100/1875

# Evaluating Communication part of SSAC

In [None]:
criterion = nn.BCEWithLogitsLoss()
model_comm.eval()

dataset = TensorDataset(y_input_tensor_t, x_input_tensor_t)
dataloader = DataLoader(dataset, batch_size = 1)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model_comm.to(device)

total_loss = 0
correct_comm = 0

with torch.no_grad():
  for batch_idx, (y, x) in enumerate(dataloader):
    y, x = y.to(device), x.to(device)

    loss_comm_t = 0

    for l in range(int(alpha * L)):
      comm_t = model_comm(y[:, l, :].float())
      loss_comm_l = criterion(comm_t, x[:, l].float().unsqueeze(1))
      loss_comm_t += loss_comm_l

    loss_comm_t /= int(alpha*L)
    loss = loss_comm_t
    total_loss += loss.item()

test_loss = total_loss/10000
print(f"Avg loss: {test_loss:>8f} \n")

Avg loss: 0.712137 



In [None]:
criterion = nn.BCEWithLogitsLoss()
model_sense.eval()

dataset = TensorDataset(y_input_tensor_t, v_input_tensor_t)
dataloader = DataLoader(dataset, batch_size = 1)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model_sense.to(device)

total_loss = 0
correct_sense = 0

with torch.no_grad():
  for batch_idx, (y, v) in enumerate(dataloader):
    y, v = y.to(device), v.to(device)

    loss_sense_t = 0

    for l in range(int(alpha * L), L):
      sense_t = model_sense(y[:, l, :].float())
      loss_sense_l = criterion(sense_t, v[:, l].float().unsqueeze(1))
      loss_sense_t += loss_sense_l

    loss_sense_t /= L - int(alpha*L)
    loss = loss_sense_t
    total_loss += loss.item()

test_loss = total_loss/10000
print(f"Avg loss: {test_loss:>8f} \n")

Avg loss: 0.709362 

