In [1]:
# Importing the required libraries (install in your environment first)
import numpy as np
import pandas as pd
from math import log2, sqrt
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split

import plotly.express as px
import plotly.graph_objects as go

from sklearn.model_selection import train_test_split

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset

import torch.optim as optim
import logging
import os
# Setting up the logger
logging.basicConfig(level=logging.INFO)

## Import Data to Dataframe

In [2]:
data_filePath = 'QRNGvsPRNG_TrainingData.txt'
df = pd.read_csv(data_filePath, sep=' ',header=None, dtype={"data": str, "label": np.int64})
df.columns = ["data", "label"]

In [3]:
def merge_rows_with_same_labels(df, nrows):
    df['label'] = df['label'].apply(lambda x: x-1) # Change the labels to 0 and 1
    qrng_df = df[df['label'] == 0]
    prng_df = df[df['label'] == 1]
    
    qrng_data = qrng_df['data'].values
    prng_data = prng_df['data'].values
    
    qrng_data =[ ''.join([qrng_data[i+j] for j in range(nrows)]) for i in range(0, len(qrng_data), nrows)]
    prng_data = [''.join([prng_data[i+j] for j in range(nrows)]) for i in range(0, len(prng_data), nrows)]
    
    qrng_df = pd.DataFrame(qrng_data, columns=['data'])
    prng_df = pd.DataFrame(prng_data, columns=['data'])
    
    qrng_df['label'] = 0
    prng_df['label'] = 1

    combined_df = pd.concat([qrng_df, prng_df], ignore_index=True)
    combined_df["length"] = combined_df["data"].apply(lambda x: len(x))
    combined_df["length"].value_counts()
    return combined_df

### Process Labels and Train_Test_split

In [4]:
df = merge_rows_with_same_labels(df, 5)
print(df.head())
X = df['data'].values
y = df['label'].values

# Split the each string of X into a list of characters, and convert them to integers
X = np.array([list(map(int, list(x))) for x in X])
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=True, random_state=42)
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.1, shuffle=True, random_state=42)
y_train = y_train.reshape(-1, 1)
y_val = y_val.reshape(-1, 1)
y_test = y_test.reshape(-1, 1)

(X_train.shape, y_train.shape), (X_val.shape, y_val.shape), (X_test.shape, y_test.shape)

                                                data  label  length
0  0000101000100111111110011011110111101101010111...      0     500
1  1110110111100100011101001000011110000111111101...      0     500
2  1101000010011000100101110010111101010011011000...      0     500
3  1001011011100100100101110111101111000011101001...      0     500
4  1011000011010000101000100010110000011101010101...      0     500


(((3456, 500), (3456, 1)), ((384, 500), (384, 1)), ((960, 500), (960, 1)))

In [5]:
X_train[1], y_train[1]

(array([0, 1, 0, 1, 1, 1, 1, 1, 0, 0, 1, 1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1,
        0, 0, 0, 1, 1, 1, 0, 1, 1, 1, 0, 1, 0, 1, 1, 0, 0, 0, 0, 0, 0, 1,
        0, 0, 1, 1, 1, 1, 1, 0, 1, 0, 1, 0, 0, 0, 0, 0, 0, 1, 1, 0, 1, 1,
        1, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 0, 0, 1, 1, 0, 1,
        0, 0, 1, 1, 1, 0, 1, 0, 1, 0, 0, 1, 0, 1, 1, 1, 1, 1, 1, 1, 0, 1,
        0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 1, 1, 0, 1, 1, 0, 1, 1, 0, 1, 1, 1,
        0, 0, 1, 0, 1, 0, 1, 0, 0, 0, 1, 1, 0, 0, 0, 0, 1, 1, 1, 1, 0, 1,
        0, 1, 0, 0, 1, 1, 0, 1, 1, 0, 0, 0, 1, 1, 1, 1, 0, 0, 1, 0, 0, 1,
        1, 0, 1, 0, 0, 1, 1, 1, 1, 1, 1, 0, 1, 0, 0, 1, 0, 0, 1, 1, 0, 0,
        0, 1, 0, 1, 0, 0, 1, 1, 1, 0, 1, 1, 0, 0, 1, 1, 1, 1, 0, 1, 0, 1,
        1, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 1, 1, 1, 0, 0, 0, 0, 0, 0, 1, 1,
        1, 0, 0, 0, 1, 1, 1, 0, 0, 0, 0, 0, 1, 1, 0, 0, 1, 0, 1, 1, 1, 1,
        1, 1, 1, 1, 0, 0, 0, 0, 1, 1, 1, 0, 0, 1, 1, 1, 0, 1, 1, 1, 1, 1,
        1, 0, 0, 0, 1, 0, 0, 1, 0, 0, 

## Train Classifier

In [13]:
class RNClassifier100(nn.Module):
    def __init__(self):
        super(RNClassifier100, self).__init__()
        self.dropout = nn.Dropout(0.5)
        
        self.fc1 = nn.Linear(100, 64)
        self.bn1 = nn.BatchNorm1d(64)
        
        self.fc2 = nn.Linear(64, 64)
        self.bn2 = nn.BatchNorm1d(64)
        
        self.fc3 = nn.Linear(64, 64)   
        self.bn3 = nn.BatchNorm1d(64)
    
        self.fc5 = nn.Linear(64, 1)

    def forward(self, x):
        x = self.fc1(x)
        x = self.bn1(x)
        x = F.relu(x)
        x = self.dropout(x)
        
        x = self.fc2(x)
        x = self.bn2(x)
        x = F.relu(x)
        x = self.dropout(x)
        
        # x = self.fc3(x)
        # x = self.bn3(x)
        # x = F.relu(x)
        
        x = self.fc5(x)
        # return x
        return torch.sigmoid(x)

class RNClassifier200(nn.Module):
    def __init__(self):
        super(RNClassifier200, self).__init__()
        self.dropout = nn.Dropout(0.5)
        
        self.fc1 = nn.Linear(200, 64)
        self.bn1 = nn.BatchNorm1d(64)
        
        self.fc2 = nn.Linear(64, 64)
        self.bn2 = nn.BatchNorm1d(64)
        
        self.fc3 = nn.Linear(64, 64)   
        self.bn3 = nn.BatchNorm1d(64)
    
        self.fc5 = nn.Linear(64, 1)

    def forward(self, x):
        x = self.fc1(x)
        x = self.bn1(x)
        x = F.relu(x)
        x = self.dropout(x)
        
        x = self.fc2(x)
        x = self.bn2(x)
        x = F.relu(x)
        x = self.dropout(x)
        
        # x = self.fc3(x)
        # x = self.bn3(x)
        # x = F.relu(x)
        
        x = self.fc5(x)
        # return x
        return torch.sigmoid(x)
    
class RNClassifier400(nn.Module):
    lr = 0.0001
    batch_size = 16
    def __init__(self):
        super(RNClassifier400, self).__init__()
        self.dropout = nn.Dropout(0.5)
        
        self.fc1 = nn.Linear(400, 256)
        self.bn1 = nn.BatchNorm1d(256)
        
        self.fc2 = nn.Linear(256, 64)
        self.bn2 = nn.BatchNorm1d(64)
        
        self.fc3 = nn.Linear(64, 64)   
        self.bn3 = nn.BatchNorm1d(64)
    
        self.fc5 = nn.Linear(64, 1)

    def forward(self, x):
        x = self.fc1(x)
        x = self.bn1(x)
        x = F.relu(x)
        x = self.dropout(x)
        
        x = self.fc2(x)
        x = self.bn2(x)
        x = F.relu(x)
        x = self.dropout(x)
        
        # x = self.fc3(x)
        # x = self.bn3(x)
        # x = F.relu(x)
        
        x = self.fc5(x)
        # return x
        return torch.sigmoid(x)

class RNClassifier800(nn.Module):
    def __init__(self):
        super(RNClassifier800, self).__init__()
        self.dropout = nn.Dropout(0.5)
        
        self.fc1 = nn.Linear(800, 512)
        self.bn1 = nn.BatchNorm1d(512)
        
        self.fc2 = nn.Linear(512, 256)
        self.bn2 = nn.BatchNorm1d(256)
        
        self.fc3 = nn.Linear(256, 64)   
        self.bn3 = nn.BatchNorm1d(64)
    
        self.fc5 = nn.Linear(256, 1)

    def forward(self, x):
        x = self.fc1(x)
        x = self.bn1(x)
        x = F.relu(x)
        x = self.dropout(x)
        
        x = self.fc2(x)
        x = self.bn2(x)
        x = F.relu(x)
        x = self.dropout(x)
        
        # x = self.fc3(x)
        # x = self.bn3(x)
        # x = F.relu(x)
        
        x = self.fc5(x)
        # return x
        return torch.sigmoid(x)

class RNClassifier500(nn.Module):
    lr = 0.0001
    batch_size = 16
    
    def __init__(self):
        super(RNClassifier500, self).__init__()
        self.dropout = nn.Dropout(0.5)
        
        self.fc1 = nn.Linear(500, 64)
        self.bn1 = nn.BatchNorm1d(64)
        
        self.fc2 = nn.Linear(64, 64)
        self.bn2 = nn.BatchNorm1d(64)
        
        self.fc3 = nn.Linear(64, 64)   
        self.bn3 = nn.BatchNorm1d(64)
    
        self.fc5 = nn.Linear(64, 1)

    def forward(self, x):
        x = self.fc1(x)
        x = self.bn1(x)
        x = F.relu(x)
        x = self.dropout(x)
        
        x = self.fc2(x)
        x = self.bn2(x)
        x = F.relu(x)
        x = self.dropout(x)
        
        # x = self.fc3(x)
        # x = self.bn3(x)
        # x = F.relu(x)
        
        x = self.fc5(x)
        # return x
        return torch.sigmoid(x)

# model = RNClassifier500()
# Split the data into training and validation sets

In [14]:
def train(model, n_bits,  X_train, y_train, X_val, y_val):
    os.makedirs(f'{n_bits}bit_model', exist_ok=True)
    
    # logging file
    logging.basicConfig(filename=f'{n_bits}bit_model/{model.__class__.__name__}.log', level=logging.INFO)
    
    criterion = nn.BCELoss()
    optimizer = optim.Adam(model.parameters(), lr=model.lr)
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=100, verbose=True, factor=0.5)

    epochs = 1000
    bactch_size = model.batch_size
    patience = 50

    X_train = torch.tensor(X_train).float()
    y_train = torch.tensor(y_train).float()
    X_val = torch.tensor(X_val).float()
    y_val = torch.tensor(y_val).float()

    train_data = TensorDataset(X_train, y_train)
    train_loader = DataLoader(train_data, batch_size=bactch_size, shuffle=True)

    val_data = TensorDataset(X_val, y_val)
    val_loader = DataLoader(val_data, batch_size=bactch_size, shuffle=True)
    
    
    best_val_loss = np.inf
    patience_counter = 0
    best_val_accuracy = 0
    best_accu_epoch = 0
    
    logging.info(model.__class__.__name__)
    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        
        for i, data in enumerate(train_loader, 0):
            inputs, labels = data
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            
            running_loss += loss.item()
            
        avg_train_loss = running_loss / len(train_loader)
        
        logging.info(f'Epoch: {epoch}, Loss: {avg_train_loss}')
        
        if epoch % 5 == 0:
            model.eval()
            total_loss = 0.0
            accurate = 0
            
            with torch.no_grad():
                for i, data in enumerate(val_loader, 0):
                    inputs, labels = data
                    outputs = model(inputs)
                    # print(outputs)
                    # output = torch.argmax(outputs, 1)
                    output = torch.round(outputs)
                    accurate += torch.sum(output == labels).item()
                    
                    val_loss = criterion(outputs, labels)
                    total_loss += val_loss.item()
            
            avg_val_loss = total_loss / len(val_loader)
            accuracy = accurate / len(y_val)
            
            # scheduler.step(avg_val_loss)
            
            logging.info(f'Epoch: {epoch}, Validation Loss:{avg_val_loss}, Accuracy: {accuracy}, accurate: {accurate}, total: {len(y_val)}')
            
            # Check if validation loss improved
            if accuracy > best_val_accuracy:
                best_val_accuracy = accuracy
                patience_counter = 0  # Reset counter if validation loss improved
                best_accu_epoch = epoch
                torch.save(model.state_dict(), f'model/best_model_epoch{epoch}.pth')  # Save the best model
            else:
                patience_counter += 1
                logging.info(f"Validation accuracy did not improve for {patience_counter} epochs.")
            
            #  # Early stopping
            if patience_counter >= patience:
                logging.info(f"Early stopping triggered after {patience} epochs with no improvement. Best accuracy: {best_val_accuracy} at epoch {best_accu_epoch}")
                break

train(RNClassifier500(), 500, X_train, y_train, X_val, y_val)

INFO:root:RNClassifier500
INFO:root:Epoch: 0, Loss: 0.7256984307810113
INFO:root:Epoch: 0, Validation Loss:0.7017377391457558, Accuracy: 0.4973958333333333, accurate: 191, total: 384
INFO:root:Epoch: 1, Loss: 0.7125358324911859
INFO:root:Epoch: 2, Loss: 0.7088457707453657
INFO:root:Epoch: 3, Loss: 0.7082038783916721
INFO:root:Epoch: 4, Loss: 0.6999357549680604
INFO:root:Epoch: 5, Loss: 0.6857246797945764
INFO:root:Epoch: 5, Validation Loss:0.7022880986332893, Accuracy: 0.5104166666666666, accurate: 196, total: 384
INFO:root:Epoch: 6, Loss: 0.6872103252896556
INFO:root:Epoch: 7, Loss: 0.6821459918110458
INFO:root:Epoch: 8, Loss: 0.676004535622067
INFO:root:Epoch: 9, Loss: 0.6778175303781474
INFO:root:Epoch: 10, Loss: 0.6719838560179427
INFO:root:Epoch: 10, Validation Loss:0.7074536979198456, Accuracy: 0.4739583333333333, accurate: 182, total: 384
INFO:root:Validation accuracy did not improve for 1 epochs.
INFO:root:Epoch: 11, Loss: 0.657430165067867
INFO:root:Epoch: 12, Loss: 0.65213876

KeyboardInterrupt: 

In [315]:
#### BEST EPOCH 125 for 500 bits input
# Calculate the accuracy of the model
best_epoch = 125
model.load_state_dict(torch.load(f'model/best_model_epoch{best_epoch}.pth'))
model.eval()
test_data = TensorDataset(torch.tensor(X_test).float(), torch.tensor(y_test).float())
test_loader = DataLoader(test_data, batch_size=bactch_size, shuffle=True)
print(model.__class__.__name__)
print(X_test.shape, y_test.shape)
total_loss = 0.0
accurate = 0

with torch.no_grad():
    for i, data in enumerate(test_loader, 0):
        inputs, labels = data
        outputs = model(inputs)
        output = torch.round(outputs)
        accurate += torch.sum(output == labels).item()
        
        test_loss = criterion(outputs, labels)
        total_loss += test_loss.item()

avg_test_loss = total_loss / len(test_loader)
accuracy = accurate / len(y_test)
print(f'Test Loss: {avg_test_loss}, Accuracy: {accuracy}')

RNClassifier500
(960, 500) (960, 1)
Test Loss: 0.7012461584061385, Accuracy: 0.503125


  model.load_state_dict(torch.load(f'model/best_model_epoch{best_epoch}.pth'))


In [296]:
#### BEST EPOCH 105 for 800 bits input
# Calculate the accuracy of the model
best_epoch = 105
model.load_state_dict(torch.load(f'model/best_model_epoch{best_epoch}.pth'))
model.eval()
test_data = TensorDataset(torch.tensor(X_test).float(), torch.tensor(y_test).float())
test_loader = DataLoader(test_data, batch_size=bactch_size, shuffle=True)
print(model.__class__.__name__)
print(X_test.shape, y_test.shape)
total_loss = 0.0
accurate = 0

with torch.no_grad():
    for i, data in enumerate(test_loader, 0):
        inputs, labels = data
        outputs = model(inputs)
        output = torch.round(outputs)
        accurate += torch.sum(output == labels).item()
        
        test_loss = criterion(outputs, labels)
        total_loss += test_loss.item()

avg_test_loss = total_loss / len(test_loader)
accuracy = accurate / len(y_test)
print(f'Test Loss: {avg_test_loss}, Accuracy: {accuracy}')

RNClassifier800
(600, 800) (600, 1)
Test Loss: 0.6935586561759313, Accuracy: 0.5116666666666667


  model.load_state_dict(torch.load(f'model/best_model_epoch{best_epoch}.pth'))


In [250]:
#### BEST EPOCH 145 for 400 bits input
# Calculate the accuracy of the model
best_epoch = 145
model.load_state_dict(torch.load(f'model/best_model_epoch{best_epoch}.pth'))
model.eval()
test_data = TensorDataset(torch.tensor(X_test).float(), torch.tensor(y_test).float())
test_loader = DataLoader(test_data, batch_size=bactch_size, shuffle=True)
print(model.__class__.__name__)
print(X_test.shape, y_test.shape)
total_loss = 0.0
accurate = 0

with torch.no_grad():
    for i, data in enumerate(test_loader, 0):
        inputs, labels = data
        outputs = model(inputs)
        output = torch.round(outputs)
        accurate += torch.sum(output == labels).item()
        
        test_loss = criterion(outputs, labels)
        total_loss += test_loss.item()

avg_test_loss = total_loss / len(test_loader)
accuracy = accurate / len(y_test)
print(f'Test Loss: {avg_test_loss}, Accuracy: {accuracy}')

RNClassifier400
(1200, 400) (1200, 1)
Test Loss: 0.691421952744325, Accuracy: 0.5316666666666666


  model.load_state_dict(torch.load(f'model/best_model_epoch{best_epoch}.pth'))


In [197]:
best_val_loss = np.inf
patience_counter = 0
best_val_accuracy = 0
best_accu_epoch = 0
print(model.__class__.__name__)

for epoch in range(epochs):
    model.train()
    running_loss = 0.0
    
    for i, data in enumerate(train_loader, 0):
        inputs, labels = data
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
        
    avg_train_loss = running_loss / len(train_loader)
    
    print(f'Epoch: {epoch}, Loss: {avg_train_loss}')
    
    if epoch % 5 == 0:
        model.eval()
        total_loss = 0.0
        accurate = 0
        
        with torch.no_grad():
            for i, data in enumerate(val_loader, 0):
                inputs, labels = data
                outputs = model(inputs)
                # print(outputs)
                # output = torch.argmax(outputs, 1)
                output = torch.round(outputs)
                accurate += torch.sum(output == labels).item()
                
                val_loss = criterion(outputs, labels)
                total_loss += val_loss.item()
        
        avg_val_loss = total_loss / len(val_loader)
        accuracy = accurate / len(y_val)
        
        # scheduler.step(avg_val_loss)
        
        print(f'Epoch: {epoch}, Validation Loss:{avg_val_loss}, Accuracy: {accuracy}, accurate: {accurate}, total: {len(y_val)}')
        
        # Check if validation loss improved
        if accuracy > best_val_accuracy:
            best_val_accuracy = accuracy
            patience_counter = 0  # Reset counter if validation loss improved
            best_accu_epoch = epoch
            torch.save(model.state_dict(), f'model/best_model_epoch{epoch}.pth')  # Save the best model
        else:
            patience_counter += 1
            print(f"Validation accuracy did not improve for {patience_counter} epochs.")
        
        #  # Early stopping
        if patience_counter >= patience:
            print(f"Early stopping triggered after {patience} epochs with no improvement. Best accuracy: {best_val_accuracy} at epoch {best_accu_epoch}")
            break

RNClassifier200


RuntimeError: mat1 and mat2 shapes cannot be multiplied (8x200 and 100x64)

In [170]:

#### BEST EPOCH 60 for 100 bits input
# Calculate the accuracy of the model

def test_model(model, path, X_test, y_test):
    criterion = nn.BCELoss()
    model.load_state_dict(torch.load(f'{path}'))
    # model.load_state_dict(torch.load(f'model/best_model_epoch{best_epoch}.pth'))
    model.eval()
    test_data = TensorDataset(torch.tensor(X_test).float(), torch.tensor(y_test).float())
    test_loader = DataLoader(test_data, batch_size=model.batch_size, shuffle=True)

    total_loss = 0.0
    accurate = 0

    with torch.no_grad():
        for i, data in enumerate(test_loader, 0):
            inputs, labels = data
            outputs = model(inputs)
            output = torch.round(outputs)
            accurate += torch.sum(output == labels).item()
            
            test_loss = criterion(outputs, labels)
            total_loss += test_loss.item()

    avg_test_loss = total_loss / len(test_loader)
    accuracy = accurate / len(y_test)
    print(f'Test Loss: {avg_test_loss}, Accuracy: {accuracy}')

Test Loss: 0.6979442413647969, Accuracy: 0.5225


  model.load_state_dict(torch.load(f'model/best_model_epoch{best_epoch}.pth'))
