## Implementing the QPDN with IMDB data

In [None]:
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.models import Model
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.datasets import imdb
from sklearn.metrics import classification_report

class ComplexEmbedding(layers.Layer):
    def __init__(self, input_dim, output_dim, **kwargs):
        super(ComplexEmbedding, self).__init__(**kwargs)
        self.input_dim = input_dim
        self.output_dim = output_dim
        self.real_embedding = layers.Embedding(input_dim, output_dim)
        self.imag_embedding = layers.Embedding(input_dim, output_dim)

    def call(self, inputs):
        real_part = self.real_embedding(inputs)
        imag_part = self.imag_embedding(inputs)
        return tf.concat([real_part, imag_part], axis=-1)

    def compute_output_shape(self, input_shape):
        return input_shape + (self.output_dim * 2,)

class DensityMatrix(layers.Layer):
    def __init__(self, **kwargs):
        super(DensityMatrix, self).__init__(**kwargs)

    def call(self, inputs):
        real_part, imag_part = tf.split(inputs, 2, axis=-1)
        density_matrix_real = tf.einsum('bij,bik->bjk', real_part, real_part) + tf.einsum('bij,bik->bjk', imag_part, imag_part)
        density_matrix_imag = tf.einsum('bij,bik->bjk', imag_part, real_part) - tf.einsum('bij,bik->bjk', real_part, imag_part)
        return tf.concat([density_matrix_real, density_matrix_imag], axis=-1)

    def compute_output_shape(self, input_shape):
        return input_shape[:-1] + (input_shape[-1], input_shape[-1])

class Measurement(layers.Layer):
    def __init__(self, num_measurements, output_dim, **kwargs):
        super(Measurement, self).__init__(**kwargs)
        self.num_measurements = num_measurements
        self.output_dim = output_dim
        self.measurement_vectors_real = self.add_weight(
            shape=(num_measurements, output_dim),
            initializer='glorot_uniform',
            name='measurement_vectors_real',
            dtype=tf.float32
        )
        self.measurement_vectors_imag = self.add_weight(
            shape=(num_measurements, output_dim),
            initializer='glorot_uniform',
            name='measurement_vectors_imag',
            dtype=tf.float32
        )

    def call(self, inputs):
        density_matrix_real, density_matrix_imag = tf.split(inputs, 2, axis=-1)
        measured_probs_real = tf.einsum('bij,ki->bk', density_matrix_real, self.measurement_vectors_real) - tf.einsum('bij,ki->bk', density_matrix_imag, self.measurement_vectors_imag)
        measured_probs_imag = tf.einsum('bij,ki->bk', density_matrix_real, self.measurement_vectors_imag) + tf.einsum('bij,ki->bk', density_matrix_imag, self.measurement_vectors_real)
        return tf.sqrt(tf.square(measured_probs_real) + tf.square(measured_probs_imag))

    def compute_output_shape(self, input_shape):
        return (input_shape[0], self.num_measurements)

def create_qpdn_model(vocab_size, embedding_dim, num_measurements, num_classes, max_length):
    inputs = layers.Input(shape=(max_length,))
    embedding_layer = ComplexEmbedding(vocab_size, embedding_dim)
    complex_embeddings = embedding_layer(inputs)
    density_matrix_layer = DensityMatrix()
    density_matrix = density_matrix_layer(complex_embeddings)
    measurement_layer = Measurement(num_measurements, embedding_dim)
    measured_probs = measurement_layer(density_matrix)

    lstm = layers.Bidirectional(layers.LSTM(64, return_sequences=True))(layers.Reshape((num_measurements, 1))(measured_probs))
    attention = layers.Attention()([lstm, lstm])

    conv1 = layers.Conv1D(filters=256, kernel_size=5, activation='relu', padding='same')(attention)
    conv1 = layers.BatchNormalization()(conv1)
    conv2 = layers.Conv1D(filters=128, kernel_size=3, activation='relu', padding='same')(conv1)
    conv2 = layers.BatchNormalization()(conv2)
    pool = layers.GlobalMaxPooling1D()(conv2)
    dropout = layers.Dropout(0.05)(pool)
    dense1 = layers.Dense(64, activation='relu')(dropout)
    dense1 = layers.Dropout(0.05)(dense1)
    dense2 = layers.Dense(32, activation='relu')(dense1)
    outputs = layers.Dense(num_classes, activation='softmax')(dense2)

    model = Model(inputs=inputs, outputs=outputs)
    return model

def load_imdb_data(num_words, maxlen):
    (X_train, y_train), (X_test, y_test) = imdb.load_data(num_words=num_words)
    X_train = pad_sequences(X_train, maxlen=maxlen)
    X_test = pad_sequences(X_test, maxlen=maxlen)
    y_train = to_categorical(y_train)
    y_test = to_categorical(y_test)
    return (X_train, y_train), (X_test, y_test), num_words

def main():
    max_words = 20000
    max_length = 200
    embedding_dim = 200
    num_measurements = 100
    num_classes = 2
    batch_size = 32
    epochs = 20

    (X_train, y_train), (X_test, y_test), vocab_size = load_imdb_data(max_words, max_length)

    model = create_qpdn_model(vocab_size, embedding_dim, num_measurements, num_classes, max_length)
    model.compile(optimizer=keras.optimizers.Adam(learning_rate=0.001), loss='categorical_crossentropy', metrics=['accuracy'])

    model.summary()

    history = model.fit(
        X_train, y_train,
        validation_data=(X_test, y_test),
        batch_size=batch_size,
        epochs=epochs,
        verbose=1
    )

    loss, accuracy = model.evaluate(X_test, y_test, verbose=0)
    print(f"Test accuracy: {accuracy:.4f}")

    model.save("qpdn_model.h5")

    y_pred = model.predict(X_test)
    y_pred_classes = np.argmax(y_pred, axis=1)
    y_test_classes = np.argmax(y_test, axis=1)

    print(classification_report(y_test_classes, y_pred_classes))

    loaded_model = keras.models.load_model("qpdn_model.h5", custom_objects={
        'ComplexEmbedding': ComplexEmbedding,
        'DensityMatrix': DensityMatrix,
        'Measurement': Measurement
    })

    sample_text = "This movie was fantastic! The acting was great and the plot was engaging."
    sample_sequence = imdb.get_word_index()
    sample_sequence = [sample_sequence.get(word, 2) for word in sample_text.lower().split()]
    sample_sequence = pad_sequences([sample_sequence], maxlen=max_length)
    prediction = loaded_model.predict(sample_sequence)
    predicted_class = np.argmax(prediction, axis=1)
    print(f"Predicted class for the sample text: {predicted_class[0]}")

if __name__ == "__main__":
    main()

Epoch 1/20
[1m782/782[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m37s[0m 33ms/step - accuracy: 0.5952 - loss: 0.7103 - val_accuracy: 0.8604 - val_loss: 0.3358
Epoch 2/20
[1m782/782[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m35s[0m 33ms/step - accuracy: 0.9010 - loss: 0.2532 - val_accuracy: 0.8560 - val_loss: 0.3446
Epoch 3/20
[1m782/782[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m39s[0m 31ms/step - accuracy: 0.9445 - loss: 0.1584 - val_accuracy: 0.8231 - val_loss: 0.4831
Epoch 4/20
[1m782/782[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m26s[0m 33ms/step - accuracy: 0.9589 - loss: 0.1165 - val_accuracy: 0.8036 - val_loss: 0.5606
Epoch 5/20
[1m782/782[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m40s[0m 31ms/step - accuracy: 0.9611 - loss: 0.1124 - val_accuracy: 0.6890 - val_loss: 0.5937
Epoch 6/20
[1m782/782[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m41s[0m 31ms/step - accuracy: 0.8805 - loss: 0.2539 - val_accuracy: 0.6016 - val_loss: 1.4818
Epoch 7/20
[1m7



Test accuracy: 0.6342
[1m782/782[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 6ms/step
              precision    recall  f1-score   support

           0       0.59      0.92      0.72     12500
           1       0.81      0.35      0.49     12500

    accuracy                           0.63     25000
   macro avg       0.70      0.63      0.60     25000
weighted avg       0.70      0.63      0.60     25000





Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/imdb_word_index.json
[1m1641221/1641221[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 0us/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 639ms/step
Predicted class for the sample text: 0


## Trying some different config for the newsgroup data

In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchtext.vocab import GloVe
from sklearn.datasets import fetch_20newsgroups
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from collections import Counter
import re
import nltk
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
nltk.download('stopwords')
nltk.download('wordnet')

# Data preprocessing
def preprocess_text(text):
    lemmatizer = WordNetLemmatizer()
    text = re.sub(r'\W+', ' ', text.lower())
    tokens = text.split()
    stop_words = set(stopwords.words('english'))
    tokens = [lemmatizer.lemmatize(token) for token in tokens if token not in stop_words]
    return ' '.join(tokens)

# Load and preprocess the 20newsgroups dataset
newsgroups = fetch_20newsgroups(subset='all', remove=('headers', 'footers', 'quotes'))
texts = [preprocess_text(text) for text in newsgroups.data]
labels = newsgroups.target

# Print some rows of the dataset
print("Sample rows of the dataset:")
for i in range(5):
    print(f"Text: {texts[i][:500]}...")  # Print first 500 characters
    print(f"Label: {labels[i]}")
    print()

# Create vocabulary
word_counts = Counter()
for text in texts:
    word_counts.update(text.split())

vocab = ['<PAD>', '<UNK>'] + [word for word, count in word_counts.items() if count >= 5]
word2idx = {word: idx for idx, word in enumerate(vocab)}

# Load GloVe embeddings
glove = GloVe(name='6B', dim=100)
embedding_matrix = np.zeros((len(vocab), 100))
for word, idx in word2idx.items():
    if word in glove.stoi:
        embedding_matrix[idx] = glove[word].numpy()
    else:
        embedding_matrix[idx] = np.random.normal(scale=0.6, size=(100, ))



[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


Sample rows of the dataset:
Text: sure bashers pen fan pretty confused lack kind post recent pen massacre devil actually bit puzzled bit relieved however going put end non pittsburghers relief bit praise pen man killing devil worse thought jagr showed much better regular season stats also lot fo fun watch playoff bowman let jagr lot fun next couple game since pen going beat pulp jersey anyway disappointed see islander lose final regular season game pen rule...
Label: 10

Text: brother market high performance video card support vesa local bus 1 2mb ram anyone suggestion idea diamond stealth pro local bus orchid farenheit 1280 ati graphic ultra pro high performance vlb card please post email thank matt...
Label: 3

Text: finally said dream mediterranean new area greater year like holocaust number july usa sweden april still cold changed calendar nothing mentioned true let say true shall azeri woman child going pay price raped killed tortured armenian hearded something called geneva conve

In [None]:
# Encode texts
max_len = 200
encoded_texts = []
for text in texts:
    encoded = [word2idx.get(word, 1) for word in text.split()[:max_len]]
    encoded += [0] * (max_len - len(encoded))
    encoded_texts.append(encoded)

# Convert to PyTorch tensors
X = torch.tensor(encoded_texts, dtype=torch.long)
y = torch.tensor(labels, dtype=torch.long)

# Split the data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Create dataset and dataloader
class NewsGroupsDataset(Dataset):
    def __init__(self, X, y):
        self.X = X
        self.y = y

    def __len__(self):
        return len(self.y)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

train_dataset = NewsGroupsDataset(X_train, y_train)
test_dataset = NewsGroupsDataset(X_test, y_test)

batch_size = 16
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size)

# Enhanced Model with Bidirectional LSTM and Attention
class EnhancedModel(nn.Module):
    def __init__(self, vocab_size, embed_dim, num_classes, embedding_matrix):
        super(EnhancedModel, self).__init__()
        self.embedding = nn.Embedding.from_pretrained(torch.tensor(embedding_matrix, dtype=torch.float32), freeze=False)
        self.lstm = nn.LSTM(embed_dim, 128, batch_first=True, bidirectional=True)
        self.attention = nn.Linear(256, 1)
        self.fc = nn.Linear(256, num_classes)
        self.dropout = nn.Dropout(0.5)

    def forward(self, x):
        embed = self.embedding(x)
        lstm_out, _ = self.lstm(embed)
        attn_weights = torch.softmax(self.attention(lstm_out).squeeze(-1), dim=-1)
        attn_out = torch.sum(lstm_out * attn_weights.unsqueeze(-1), dim=1)
        out = self.dropout(attn_out)
        out = self.fc(out)
        return out

# Hyperparameters
vocab_size = len(vocab)
embed_dim = 100
num_classes = 20
learning_rate = 0.005
num_epochs = 10

# Initialize model, loss function, and optimizer
model = EnhancedModel(vocab_size, embed_dim, num_classes, embedding_matrix)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Training loop
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

for epoch in range(num_epochs):
    model.train()
    train_loss = 0
    for batch_x, batch_y in train_loader:
        batch_x, batch_y = batch_x.to(device), batch_y.to(device)

        optimizer.zero_grad()
        outputs = model(batch_x)
        loss = criterion(outputs, batch_y)
        loss.backward()
        optimizer.step()

        train_loss += loss.item()

    # Evaluation
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for batch_x, batch_y in test_loader:
            batch_x, batch_y = batch_x.to(device), batch_y.to(device)
            outputs = model(batch_x)
            _, predicted = torch.max(outputs.data, 1)
            total += batch_y.size(0)
            correct += (predicted == batch_y).sum().item()

    accuracy = correct / total
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {train_loss/len(train_loader):.4f}, Accuracy: {accuracy:.4f}")

# Final evaluation
model.eval()
correct = 0
total = 0
with torch.no_grad():
    for batch_x, batch_y in test_loader:
        batch_x, batch_y = batch_x.to(device), batch_y.to(device)
        outputs = model(batch_x)
        _, predicted = torch.max(outputs.data, 1)
        total += batch_y.size(0)
        correct += (predicted == batch_y).sum().item()

accuracy = correct / total
print(f"Final Test Accuracy: {accuracy:.4f}")


## Implementing the QNLP hilbert task for newsgroup data on the same code as the IMDB one.

In [None]:

import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import Layer, Dense, Conv1D, GlobalMaxPooling1D, Dropout, Bidirectional, LSTM, Attention, BatchNormalization
from tensorflow.keras.models import Model
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.utils import to_categorical
from sklearn.datasets import fetch_20newsgroups
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import classification_report
from collections import Counter
import re
import nltk
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer

# Download necessary NLTK data
nltk.download('stopwords')
nltk.download('wordnet')

class ComplexEmbedding(Layer):
    def __init__(self, input_dim, output_dim, **kwargs):
        super(ComplexEmbedding, self).__init__(**kwargs)
        self.input_dim = input_dim
        self.output_dim = output_dim

    def build(self, input_shape):
        self.real_embeddings = self.add_weight(
            shape=(self.input_dim, self.output_dim),
            initializer='glorot_uniform',
            trainable=True,
            name='real_embeddings'
        )
        self.imag_embeddings = self.add_weight(
            shape=(self.input_dim, self.output_dim),
            initializer='glorot_uniform',
            trainable=True,
            name='imag_embeddings'
        )
        super(ComplexEmbedding, self).build(input_shape)

    def call(self, inputs):
        inputs = tf.cast(inputs, tf.int32)
        real_part = tf.nn.embedding_lookup(self.real_embeddings, inputs)
        imag_part = tf.nn.embedding_lookup(self.imag_embeddings, inputs)
        complex_embeddings = tf.complex(real_part, imag_part)
        return complex_embeddings

class DensityMatrix(Layer):
    def __init__(self, **kwargs):
        super(DensityMatrix, self).__init__(**kwargs)

    def call(self, inputs):
        weighted_embeddings = inputs
        density_matrix = tf.einsum('bij,bik->bjk', weighted_embeddings, tf.math.conj(weighted_embeddings))
        return density_matrix

class Measurement(Layer):
    def __init__(self, num_measurements, output_dim, **kwargs):
        super(Measurement, self).__init__(**kwargs)
        self.num_measurements = num_measurements
        self.output_dim = output_dim

    def build(self, input_shape):
        self.measurement_vectors = self.add_weight(
            shape=(self.num_measurements, self.output_dim),
            initializer='glorot_uniform',
            name='measurement_vectors'
        )
        super(Measurement, self).build(input_shape)

    def call(self, inputs):
        complex_measurement = tf.complex(self.measurement_vectors, tf.zeros_like(self.measurement_vectors))
        complex_measurement = tf.linalg.l2_normalize(complex_measurement, axis=-1)
        measured_probs = tf.abs(tf.einsum('bij,ki->bk', inputs, complex_measurement))
        return measured_probs

def create_qpdn_model(vocab_size, embedding_dim, num_measurements, num_classes, max_length):
    inputs = tf.keras.Input(shape=(max_length,))
    embedding_layer = ComplexEmbedding(vocab_size, embedding_dim)
    complex_embeddings = embedding_layer(inputs)
    density_matrix_layer = DensityMatrix()
    density_matrix = density_matrix_layer(complex_embeddings)
    measurement_layer = Measurement(num_measurements, embedding_dim)
    measured_probs = measurement_layer(density_matrix)

    lstm = Bidirectional(LSTM(256, return_sequences=True))(tf.expand_dims(measured_probs, -1))
    attention = Attention()([lstm, lstm])

    conv1 = Conv1D(filters=256, kernel_size=10, activation='relu', padding='same')(attention)
    conv1 = BatchNormalization()(conv1)
    conv2 = Conv1D(filters=128, kernel_size=3, activation='relu', padding='same')(conv1)
    conv2 = BatchNormalization()(conv2)
    pool = GlobalMaxPooling1D()(conv2)
    dropout = Dropout(0.05)(pool)
    dense1 = Dense(128, activation='relu')(dropout)
    dense1 = BatchNormalization()(dense1)
    dense1 = Dropout(0.05)(dense1)
    dense2 = Dense(64, activation='relu')(dense1)
    dense2 = BatchNormalization()(dense2)
    outputs = Dense(num_classes, activation='softmax')(dense2)

    model = Model(inputs=inputs, outputs=outputs)
    return model

def preprocess_text(text):
    lemmatizer = WordNetLemmatizer()
    text = re.sub(r'\W+', ' ', text.lower())
    tokens = text.split()
    stop_words = set(stopwords.words('english'))
    tokens = [lemmatizer.lemmatize(token) for token in tokens if token not in stop_words]
    return ' '.join(tokens)

def load_20newsgroups_data(num_words, maxlen):
    newsgroups = fetch_20newsgroups(subset='all', remove=('headers', 'footers', 'quotes'))
    texts = [preprocess_text(text) for text in newsgroups.data]
    labels = newsgroups.target

    word_counts = Counter()
    for text in texts:
        word_counts.update(text.split())

    vocab = ['<PAD>', '<UNK>'] + [word for word, count in word_counts.items() if count >= 5]
    word2idx = {word: idx for idx, word in enumerate(vocab)}

    encoded_texts = []
    for text in texts:
        encoded = [word2idx.get(word, 1) for word in text.split()[:maxlen]]
        encoded += [0] * (maxlen - len(encoded))
        encoded_texts.append(encoded)

    X = np.array(encoded_texts)
    y = to_categorical(labels, num_classes=len(set(labels)))

    return train_test_split(X, y, test_size=0.2, random_state=42), len(vocab), word2idx

def main():
    max_words = 50000
    max_length = 300
    embedding_dim = 300
    num_measurements = 150
    num_classes = 20
    batch_size = 64
    epochs = 50

    (X_train, X_test, y_train, y_test), vocab_size, word2idx = load_20newsgroups_data(max_words, max_length)

    model = create_qpdn_model(vocab_size, embedding_dim, num_measurements, num_classes, max_length)
    model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.001), loss='categorical_crossentropy', metrics=['accuracy'])

    model.summary()

    # Learning rate scheduler
    def lr_schedule(epoch):
        if epoch < 10:
            return 0.001
        elif epoch < 20:
            return 0.0005
        else:
            return 0.0001

    lr_scheduler = tf.keras.callbacks.LearningRateScheduler(lr_schedule)

    # Early stopping
    early_stopping = tf.keras.callbacks.EarlyStopping(monitor='val_accuracy', patience=5, restore_best_weights=True)

    history = model.fit(
        X_train, y_train,
        validation_data=(X_test, y_test),
        batch_size=batch_size,
        epochs=epochs,
        verbose=1,
        callbacks=[lr_scheduler, early_stopping]
    )

    loss, accuracy = model.evaluate(X_test, y_test, verbose=0)
    print(f"Test accuracy: {accuracy:.4f}")

    model.save("qpdn_model_improved.h5")

    y_pred = model.predict(X_test)
    y_pred_classes = np.argmax(y_pred, axis=1)
    y_test_classes = np.argmax(y_test, axis=1)

    print(classification_report(y_test_classes, y_pred_classes))

    loaded_model = tf.keras.models.load_model("qpdn_model_improved.h5", custom_objects={
        'ComplexEmbedding': ComplexEmbedding,
        'DensityMatrix': DensityMatrix,
        'Measurement': Measurement
    })

    sample_text = "This is a sample document from the 20 newsgroups dataset."
    sample_sequence = [word2idx.get(word, 1) for word in preprocess_text(sample_text).split()]
    sample_sequence = pad_sequences([sample_sequence], maxlen=max_length)
    prediction = loaded_model.predict(sample_sequence)
    predicted_class = np.argmax(prediction, axis=1)
    print(f"Predicted class for the sample text: {predicted_class[0]}")

if __name__ == "__main__":
    main()





[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


Model: "model_3"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_4 (InputLayer)        [(None, 300)]                0         []                            
                                                                                                  
 complex_embedding_3 (Compl  (None, 300, 300)             1598040   ['input_4[0][0]']             
 exEmbedding)                                             0                                       
                                                                                                  
 density_matrix_3 (DensityM  (None, 300, 300)             0         ['complex_embedding_3[0][0]'] 
 atrix)                                                                                           
                                                                                            

  saving_api.save_model(


              precision    recall  f1-score   support

           0       0.25      0.56      0.34       151
           1       0.66      0.64      0.65       202
           2       0.56      0.56      0.56       195
           3       0.53      0.60      0.56       183
           4       0.81      0.63      0.71       205
           5       0.83      0.71      0.76       215
           6       0.68      0.59      0.63       193
           7       0.52      0.64      0.57       196
           8       0.66      0.64      0.65       168
           9       0.88      0.72      0.79       211
          10       0.85      0.82      0.83       198
          11       0.82      0.67      0.74       201
          12       0.60      0.51      0.55       202
          13       0.61      0.76      0.68       194
          14       0.71      0.67      0.69       189
          15       0.74      0.65      0.69       202
          16       0.56      0.58      0.57       188
          17       0.85    

## Implementing the same QPDN approach with reuters dataset


In [None]:
import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import Layer, Dense, Conv1D, GlobalMaxPooling1D, Dropout, Bidirectional, LSTM, Attention, BatchNormalization
from tensorflow.keras.models import Model
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.datasets import reuters
from sklearn.metrics import classification_report

# Define a custom layer for complex embeddings
class ComplexEmbedding(Layer):
    def __init__(self, input_dim, output_dim, **kwargs):
        super(ComplexEmbedding, self).__init__(**kwargs)
        self.input_dim = input_dim
        self.output_dim = output_dim

    def build(self, input_shape):
        self.real_embeddings = self.add_weight(
            shape=(self.input_dim, self.output_dim),
            initializer='glorot_uniform',
            trainable=True,
            name='real_embeddings'
        )
        self.imag_embeddings = self.add_weight(
            shape=(self.input_dim, self.output_dim),
            initializer='glorot_uniform',
            trainable=True,
            name='imag_embeddings'
        )
        super(ComplexEmbedding, self).build(input_shape)

    def call(self, inputs):
        inputs = tf.cast(inputs, tf.int32)
        real_part = tf.nn.embedding_lookup(self.real_embeddings, inputs)
        imag_part = tf.nn.embedding_lookup(self.imag_embeddings, inputs)
        complex_embeddings = tf.complex(real_part, imag_part)
        return complex_embeddings

# Define a custom layer for creating density matrices from embeddings
class DensityMatrix(Layer):
    def __init__(self, **kwargs):
        super(DensityMatrix, self).__init__(**kwargs)

    def call(self, inputs):
        weighted_embeddings = inputs
        density_matrix = tf.einsum('bij,bik->bjk', weighted_embeddings, tf.math.conj(weighted_embeddings))
        return density_matrix

# Define a custom layer for performing measurement on the density matrix
class Measurement(Layer):
    def __init__(self, num_measurements, output_dim, **kwargs):
        super(Measurement, self).__init__(**kwargs)
        self.num_measurements = num_measurements
        self.output_dim = output_dim

    def build(self, input_shape):
        self.measurement_vectors = self.add_weight(
            shape=(self.num_measurements, self.output_dim),
            initializer='glorot_uniform',
            name='measurement_vectors'
        )
        super(Measurement, self).build(input_shape)

    def call(self, inputs):
        complex_measurement = tf.complex(self.measurement_vectors, tf.zeros_like(self.measurement_vectors))
        complex_measurement = tf.linalg.l2_normalize(complex_measurement, axis=-1)
        measured_probs = tf.abs(tf.einsum('bij,ki->bk', inputs, complex_measurement))
        return measured_probs

# Define the QPDN model
def create_qpdn_model(vocab_size, embedding_dim, num_measurements, num_classes, max_length):
    inputs = tf.keras.Input(shape=(max_length,))
    embedding_layer = ComplexEmbedding(vocab_size, embedding_dim)
    complex_embeddings = embedding_layer(inputs)
    density_matrix_layer = DensityMatrix()
    density_matrix = density_matrix_layer(complex_embeddings)
    measurement_layer = Measurement(num_measurements, embedding_dim)
    measured_probs = measurement_layer(density_matrix)

    lstm = Bidirectional(LSTM(64, return_sequences=True))(tf.expand_dims(measured_probs, -1))
    attention = Attention()([lstm, lstm])

    conv1 = Conv1D(filters=256, kernel_size=10, activation='relu', padding='same')(attention)
    conv1 = BatchNormalization()(conv1)
    conv2 = Conv1D(filters=128, kernel_size=3, activation='relu', padding='same')(conv1)
    conv2 = BatchNormalization()(conv2)
    pool = GlobalMaxPooling1D()(conv2)
    dropout = Dropout(0.05)(pool)
    dense1 = Dense(64, activation='relu')(dropout)
    dense1 = Dropout(0.05)(dense1)
    dense2 = Dense(32, activation='relu')(dense1)
    outputs = Dense(num_classes, activation='softmax')(dense2)

    model = Model(inputs=inputs, outputs=outputs)
    return model

# Load and preprocess Reuters data
def load_reuters_data(num_words, maxlen):
    (X_train, y_train), (X_test, y_test) = reuters.load_data(num_words=num_words, test_split=0.25)
    X_train = pad_sequences(X_train, maxlen=maxlen)
    X_test = pad_sequences(X_test, maxlen=maxlen)
    num_classes = len(set(y_train))  # Number of classes in the dataset
    y_train = to_categorical(y_train, num_classes=num_classes)
    y_test = to_categorical(y_test, num_classes=num_classes)
    return (X_train, y_train), (X_test, y_test), num_words, num_classes

def main():
    max_words = 80000
    max_length = 200
    embedding_dim = 200
    num_measurements = 100
    batch_size = 32
    epochs = 20

    # Load the Reuters dataset
    (X_train, y_train), (X_test, y_test), vocab_size, num_classes = load_reuters_data(max_words, max_length)

    # Create the QPDN model
    model = create_qpdn_model(vocab_size, embedding_dim, num_measurements, num_classes, max_length)
    model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.0005), loss='categorical_crossentropy', metrics=['accuracy'])

    # Display the model summary
    model.summary()

    # Train the model
    history = model.fit(
        X_train, y_train,
        validation_data=(X_test, y_test),
        batch_size=batch_size,
        epochs=epochs,
        verbose=1
    )

    # Evaluate the model on the test set
    loss, accuracy = model.evaluate(X_test, y_test, verbose=0)
    print(f"Test accuracy: {accuracy:.4f}")

    # Save the trained model
    model.save("qpdn_model_reuters.h5")

    # Make predictions on the test set
    y_pred = model.predict(X_test)
    y_pred_classes = np.argmax(y_pred, axis=1)
    y_test_classes = np.argmax(y_test, axis=1)

    # Print the classification report
    print(classification_report(y_test_classes, y_pred_classes))

    # Load the saved model for testing with a sample
    loaded_model = tf.keras.models.load_model("qpdn_model_reuters.h5", custom_objects={
        'ComplexEmbedding': ComplexEmbedding,
        'DensityMatrix': DensityMatrix,
        'Measurement': Measurement
    })

    # Test with a sample text
    sample_text = "This is a sample Reuters news article text."
    sample_sequence = reuters.get_word_index()
    sample_sequence = [sample_sequence.get(word, 2) for word in sample_text.lower().split()]
    sample_sequence = pad_sequences([sample_sequence], maxlen=max_length)
    prediction = loaded_model.predict(sample_sequence)
    predicted_class = np.argmax(prediction, axis=1)
    print(f"Predicted class for the sample text: {predicted_class[0]}")

if __name__ == "__main__":
    main()


Model: "model_3"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_4 (InputLayer)        [(None, 200)]                0         []                            
                                                                                                  
 complex_embedding_3 (Compl  (None, 200, 200)             3200000   ['input_4[0][0]']             
 exEmbedding)                                             0                                       
                                                                                                  
 density_matrix_3 (DensityM  (None, 200, 200)             0         ['complex_embedding_3[0][0]'] 
 atrix)                                                                                           
                                                                                            

  saving_api.save_model(


              precision    recall  f1-score   support

           0       0.73      0.57      0.64        14
           1       0.76      0.74      0.75       134
           2       0.44      0.70      0.54        27
           3       0.92      0.90      0.91       994
           4       0.79      0.84      0.81       600
           5       0.22      0.29      0.25         7
           6       0.88      0.82      0.85        17
           7       0.17      0.25      0.20         4
           8       0.50      0.71      0.59        48
           9       0.71      0.81      0.76        31
          10       0.75      0.79      0.77        38
          11       0.65      0.61      0.63       109
          12       0.36      0.22      0.28        18
          13       0.49      0.50      0.49        46
          14       0.67      0.40      0.50         5
          15       0.15      0.22      0.18         9
          16       0.70      0.61      0.65       128
          17       0.57    

Improvement for 20 Newsgroup Dataset - QPDN Approach

In [None]:
import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import Layer, Dense, Conv1D, GlobalMaxPooling1D, Dropout, Bidirectional, LSTM, Attention, BatchNormalization, Embedding, Flatten
from tensorflow.keras.models import Model
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.regularizers import l2
from sklearn.datasets import fetch_20newsgroups
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import classification_report
from collections import Counter
import re
import nltk
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer

# Download necessary NLTK data
nltk.download('stopwords')
nltk.download('wordnet')

class ComplexEmbedding(Layer):
    def __init__(self, input_dim, output_dim, **kwargs):
        super(ComplexEmbedding, self).__init__(**kwargs)
        self.input_dim = input_dim
        self.output_dim = output_dim

    def build(self, input_shape):
        self.real_embeddings = self.add_weight(
            shape=(self.input_dim, self.output_dim),
            initializer='glorot_uniform',
            trainable=True,
            name='real_embeddings'
        )
        self.imag_embeddings = self.add_weight(
            shape=(self.input_dim, self.output_dim),
            initializer='glorot_uniform',
            trainable=True,
            name='imag_embeddings'
        )
        super(ComplexEmbedding, self).build(input_shape)

    def call(self, inputs):
        inputs = tf.cast(inputs, tf.int32)
        real_part = tf.nn.embedding_lookup(self.real_embeddings, inputs)
        imag_part = tf.nn.embedding_lookup(self.imag_embeddings, inputs)
        complex_embeddings = tf.complex(real_part, imag_part)
        return complex_embeddings

    def compute_output_shape(self, input_shape):
        return (input_shape[0], input_shape[1], self.output_dim)

class DensityMatrix(Layer):
    def __init__(self, **kwargs):
        super(DensityMatrix, self).__init__(**kwargs)

    def call(self, inputs):
        weighted_embeddings = inputs
        density_matrix = tf.einsum('bij,bik->bjk', weighted_embeddings, tf.math.conj(weighted_embeddings))
        print("Density Matrix dtype:", density_matrix.dtype)  # Print dtype of density matrix
        return density_matrix

class Measurement(Layer):
    def __init__(self, num_measurements, output_dim, **kwargs):
        super(Measurement, self).__init__(**kwargs)
        self.num_measurements = num_measurements
        self.output_dim = output_dim

    def build(self, input_shape):
        self.measurement_vectors = self.add_weight(
            shape=(self.num_measurements, self.output_dim),
            initializer='glorot_uniform',
            name='measurement_vectors'
        )
        super(Measurement, self).build(input_shape)

    def call(self, inputs):
        print("Measurement Input dtype:", inputs.dtype)  # Print dtype of input to Measurement
        complex_measurement = tf.complex(self.measurement_vectors, tf.zeros_like(self.measurement_vectors))
        complex_measurement = tf.linalg.l2_normalize(complex_measurement, axis=-1)
        measured_probs = tf.abs(tf.einsum('bij,ki->bk', tf.cast(inputs, tf.complex64), complex_measurement))
        return measured_probs

def create_qpdn_model(vocab_size, embedding_dim, num_measurements, num_classes, max_length):
    inputs = tf.keras.Input(shape=(max_length,))
    embedding_layer = ComplexEmbedding(vocab_size, embedding_dim)
    complex_embeddings = embedding_layer(inputs)
    density_matrix_layer = DensityMatrix()
    density_matrix = density_matrix_layer(complex_embeddings)
    measurement_layer = Measurement(num_measurements, embedding_dim)
    measured_probs = measurement_layer(density_matrix)

    # Wrap tf.expand_dims in a Lambda layer
    lstm_input = tf.keras.layers.Lambda(lambda x: tf.expand_dims(x, -1))(measured_probs)
    lstm = Bidirectional(LSTM(256, return_sequences=True))(lstm_input)
    attention = Attention()([lstm, lstm])

    conv1 = Conv1D(filters=256, kernel_size=10, activation='relu', padding='same', kernel_regularizer=l2(0.01))(attention)
    conv1 = BatchNormalization()(conv1)
    conv2 = Conv1D(filters=128, kernel_size=3, activation='relu', padding='same', kernel_regularizer=l2(0.01))(conv1)
    conv2 = BatchNormalization()(conv2)
    pool = GlobalMaxPooling1D()(conv2)
    dropout = Dropout(0.1)(pool)
    dense1 = Dense(128, activation='relu', kernel_regularizer=l2(0.01))(dropout)
    dense1 = BatchNormalization()(dense1)
    dense1 = Dropout(0.1)(dense1)
    dense2 = Dense(64, activation='relu', kernel_regularizer=l2(0.01))(dense1)
    dense2 = BatchNormalization()(dense2)
    outputs = Dense(num_classes, activation='softmax')(dense2)

    model = Model(inputs=inputs, outputs=outputs)
    return model

def preprocess_text(text):
    lemmatizer = WordNetLemmatizer()
    text = re.sub(r'\W+', ' ', text.lower())
    tokens = text.split()
    stop_words = set(stopwords.words('english'))
    tokens = [lemmatizer.lemmatize(token) for token in tokens if token not in stop_words]
    return ' '.join(tokens)

def load_20newsgroups_data(num_words, maxlen):
    newsgroups = fetch_20newsgroups(subset='all', remove=('headers', 'footers', 'quotes'))
    texts = [preprocess_text(text) for text in newsgroups.data]
    labels = newsgroups.target

    word_counts = Counter()
    for text in texts:
        word_counts.update(text.split())

    vocab = ['<PAD>', '<UNK>'] + [word for word, count in word_counts.items() if count >= 5]
    word2idx = {word: idx for idx, word in enumerate(vocab)}

    encoded_texts = []
    for text in texts:
        encoded = [word2idx.get(word, 1) for word in text.split()[:maxlen]]
        encoded += [0] * (maxlen - len(encoded))
        encoded_texts.append(encoded)

    X = np.array(encoded_texts)
    y = to_categorical(labels, num_classes=len(set(labels)))

    return train_test_split(X, y, test_size=0.2, random_state=42), len(vocab), word2idx

def main():
    max_words = 50000
    max_length = 300
    embedding_dim = 300
    num_measurements = 150
    num_classes = 20
    batch_size = 64
    epochs = 50

    (X_train, X_test, y_train, y_test), vocab_size, word2idx = load_20newsgroups_data(max_words, max_length)

    model = create_qpdn_model(vocab_size, embedding_dim, num_measurements, num_classes, max_length)
    model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.001), loss='categorical_crossentropy', metrics=['accuracy'])

    model.summary()

    # Learning rate scheduler
    def lr_schedule(epoch):
        if epoch < 10:
            return 0.001
        elif epoch < 20:
            return 0.0005
        else:
            return 0.0001

    lr_scheduler = tf.keras.callbacks.LearningRateScheduler(lr_schedule)

    # Early stopping
    early_stopping = tf.keras.callbacks.EarlyStopping(monitor='val_accuracy', patience=5, restore_best_weights=True)

    history = model.fit(
        X_train, y_train,
        validation_data=(X_test, y_test),
        batch_size=batch_size,
        epochs=epochs,
        verbose=1,
        callbacks=[lr_scheduler, early_stopping]
    )

    loss, accuracy = model.evaluate(X_test, y_test, verbose=0)
    print(f"Test accuracy: {accuracy:.4f}")

    model.save("qpdn_model_improved.h5")

    y_pred = model.predict(X_test)
    y_pred_classes = np.argmax(y_pred, axis=1)
    y_test_classes = np.argmax(y_test, axis=1)

    print(classification_report(y_test_classes, y_pred_classes))

    loaded_model = tf.keras.models.load_model("qpdn_model_improved.h5", custom_objects={
        'ComplexEmbedding': ComplexEmbedding,
        'DensityMatrix': DensityMatrix,
        'Measurement': Measurement
    })

    sample_text = "This is a sample document from the 20 newsgroups dataset."
    sample_sequence = [word2idx.get(word, 1) for word in preprocess_text(sample_text).split()]
    sample_sequence = pad_sequences([sample_sequence], maxlen=max_length)
    prediction = loaded_model.predict(sample_sequence)
    predicted_class = np.argmax(prediction, axis=1)
    print(f"Predicted class for the sample text: {predicted_class[0]}")

if __name__ == "__main__":
    main()

An error occurred: A KerasTensor cannot be used as input to a TensorFlow function. A KerasTensor is a symbolic placeholder for a shape and dtype, used when constructing Keras Functional models or Keras Functions. You can only use it as input to a Keras layer or a Keras operation (from the namespaces `keras.layers` and `keras.operations`). You are likely doing something like:

```
x = Input(...)
...
tf_fn(x)  # Invalid.
```

What you should do instead is wrap `tf_fn` in a layer:

```
class MyLayer(Layer):
    def call(self, x):
        return tf_fn(x)

x = MyLayer()(x)
```



In [None]:
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.models import Model
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.datasets import reuters
from sklearn.metrics import classification_report
import pickle

class ComplexEmbedding(layers.Layer):
    def __init__(self, input_dim, output_dim, **kwargs):
        super(ComplexEmbedding, self).__init__(**kwargs)
        self.input_dim = input_dim
        self.output_dim = output_dim
        self.real_embedding = layers.Embedding(input_dim, output_dim)
        self.imag_embedding = layers.Embedding(input_dim, output_dim)

    def call(self, inputs):
        real_part = self.real_embedding(inputs)
        imag_part = self.imag_embedding(inputs)
        return tf.concat([real_part, imag_part], axis=-1)

    def compute_output_shape(self, input_shape):
        return input_shape + (self.output_dim * 2,)

class DensityMatrix(layers.Layer):
    def __init__(self, **kwargs):
        super(DensityMatrix, self).__init__(**kwargs)

    def call(self, inputs):
        real_part, imag_part = tf.split(inputs, 2, axis=-1)
        density_matrix_real = tf.einsum('bij,bik->bjk', real_part, real_part) + tf.einsum('bij,bik->bjk', imag_part, imag_part)
        density_matrix_imag = tf.einsum('bij,bik->bjk', imag_part, real_part) - tf.einsum('bij,bik->bjk', real_part, imag_part)
        return tf.concat([density_matrix_real, density_matrix_imag], axis=-1)

    def compute_output_shape(self, input_shape):
        return input_shape[:-1] + (input_shape[-1], input_shape[-1])

class Measurement(layers.Layer):
    def __init__(self, num_measurements, output_dim, **kwargs):
        super(Measurement, self).__init__(**kwargs)
        self.num_measurements = num_measurements
        self.output_dim = output_dim
        self.measurement_vectors_real = self.add_weight(
            shape=(num_measurements, output_dim),
            initializer='glorot_uniform',
            name='measurement_vectors_real',
            dtype=tf.float32
        )
        self.measurement_vectors_imag = self.add_weight(
            shape=(num_measurements, output_dim),
            initializer='glorot_uniform',
            name='measurement_vectors_imag',
            dtype=tf.float32
        )

    def call(self, inputs):
        density_matrix_real, density_matrix_imag = tf.split(inputs, 2, axis=-1)
        measured_probs_real = tf.einsum('bij,ki->bk', density_matrix_real, self.measurement_vectors_real) - tf.einsum('bij,ki->bk', density_matrix_imag, self.measurement_vectors_imag)
        measured_probs_imag = tf.einsum('bij,ki->bk', density_matrix_real, self.measurement_vectors_imag) + tf.einsum('bij,ki->bk', density_matrix_imag, self.measurement_vectors_real)
        return tf.sqrt(tf.square(measured_probs_real) + tf.square(measured_probs_imag))

    def compute_output_shape(self, input_shape):
        return (input_shape[0], self.num_measurements)

def create_qpdn_model(vocab_size, embedding_dim, num_measurements, num_classes, max_length):
    inputs = layers.Input(shape=(max_length,))
    embedding_layer = ComplexEmbedding(vocab_size, embedding_dim)
    complex_embeddings = embedding_layer(inputs)
    density_matrix_layer = DensityMatrix()
    density_matrix = density_matrix_layer(complex_embeddings)
    measurement_layer = Measurement(num_measurements, embedding_dim)
    measured_probs = measurement_layer(density_matrix)

    lstm = layers.Bidirectional(layers.LSTM(64, return_sequences=True))(layers.Reshape((num_measurements, 1))(measured_probs))
    attention = layers.Attention()([lstm, lstm])

    conv1 = layers.Conv1D(filters=256, kernel_size=5, activation='relu', padding='same')(attention)
    conv1 = layers.BatchNormalization()(conv1)
    conv2 = layers.Conv1D(filters=128, kernel_size=3, activation='relu', padding='same')(conv1)
    conv2 = layers.BatchNormalization()(conv2)
    pool = layers.GlobalMaxPooling1D()(conv2)
    dropout = layers.Dropout(0.05)(pool)
    dense1 = layers.Dense(64, activation='relu')(dropout)
    dense1 = layers.Dropout(0.05)(dense1)
    dense2 = layers.Dense(32, activation='relu')(dense1)
    outputs = layers.Dense(num_classes, activation='softmax')(dense2)

    model = Model(inputs=inputs, outputs=outputs)

    # Save the complex embeddings using a Keras model
    embedding_extraction_model = Model(inputs=inputs, outputs=complex_embeddings)
    return model, embedding_extraction_model

def load_reuters_data(num_words, maxlen):
    (X_train, y_train), (X_test, y_test) = reuters.load_data(num_words=num_words, test_split=0.25)
    X_train = pad_sequences(X_train, maxlen=maxlen)
    X_test = pad_sequences(X_test, maxlen=maxlen)
    num_classes = np.max(y_train) + 1
    y_train = to_categorical(y_train, num_classes=num_classes)
    y_test = to_categorical(y_test, num_classes=num_classes)
    return (X_train, y_train), (X_test, y_test), num_words, num_classes

def main():
    max_words = 20000
    max_length = 200
    embedding_dim = 200
    num_measurements = 100
    batch_size = 32
    epochs = 20

    (X_train, y_train), (X_test, y_test), vocab_size, num_classes = load_reuters_data(max_words, max_length)

    model, embedding_extraction_model = create_qpdn_model(vocab_size, embedding_dim, num_measurements, num_classes, max_length)
    model.compile(optimizer=keras.optimizers.Adam(learning_rate=0.001), loss='categorical_crossentropy', metrics=['accuracy'])

    model.summary()

    history = model.fit(
        X_train, y_train,
        validation_data=(X_test, y_test),
        batch_size=batch_size,
        epochs=epochs,
        verbose=1
    )

    loss, accuracy = model.evaluate(X_test, y_test, verbose=0)
    print(f"Test accuracy: {accuracy:.4f}")

    model.save("qpdn_model.h5")

    y_pred = model.predict(X_test)
    y_pred_classes = np.argmax(y_pred, axis=1)
    y_test_classes = np.argmax(y_test, axis=1)

    print(classification_report(y_test_classes, y_pred_classes))

    loaded_model = keras.models.load_model("qpdn_model.h5", custom_objects={
        'ComplexEmbedding': ComplexEmbedding,
        'DensityMatrix': DensityMatrix,
        'Measurement': Measurement
    })

    # Save complex embeddings for a sample batch
    complex_embeddings = embedding_extraction_model.predict(X_test[:batch_size])
    with open("complex_embeddings.pkl", "wb") as f:
        pickle.dump(complex_embeddings, f)

    sample_text = "This is a sample Reuters news article text."
    sample_sequence = reuters.get_word_index()
    sample_sequence = [sample_sequence.get(word, 2) for word in sample_text.lower().split()]
    sample_sequence = pad_sequences([sample_sequence], maxlen=max_length)
    prediction = loaded_model.predict(sample_sequence)
    predicted_class = np.argmax(prediction, axis=1)
    print(f"Predicted class for the sample text: {predicted_class[0]}")

if __name__ == "__main__":
    main()

Epoch 1/20
[1m256/264[0m [32m━━━━━━━━━━━━━━━━━━━[0m[37m━[0m [1m4s[0m 562ms/step - accuracy: 0.3134 - loss: 2.7066

KeyboardInterrupt: 

In [None]:
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.models import Model
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.datasets import reuters
import pickle

class ComplexEmbedding(layers.Layer):
    def __init__(self, input_dim, output_dim, **kwargs):
        super(ComplexEmbedding, self).__init__(**kwargs)
        self.input_dim = input_dim
        self.output_dim = output_dim
        self.real_embedding = layers.Embedding(input_dim, output_dim)
        self.imag_embedding = layers.Embedding(input_dim, output_dim)

    def call(self, inputs):
        real_part = self.real_embedding(inputs)
        imag_part = self.imag_embedding(inputs)
        return tf.concat([real_part, imag_part], axis=-1)

    def compute_output_shape(self, input_shape):
        return input_shape + (self.output_dim * 2,)

class DensityMatrix(layers.Layer):
    def __init__(self, **kwargs):
        super(DensityMatrix, self).__init__(**kwargs)

    def call(self, inputs):
        real_part, imag_part = tf.split(inputs, 2, axis=-1)
        density_matrix_real = tf.einsum('bij,bik->bjk', real_part, real_part) + tf.einsum('bij,bik->bjk', imag_part, imag_part)
        density_matrix_imag = tf.einsum('bij,bik->bjk', imag_part, real_part) - tf.einsum('bij,bik->bjk', real_part, imag_part)
        return tf.concat([density_matrix_real, density_matrix_imag], axis=-1)

    def compute_output_shape(self, input_shape):
        return input_shape[:-1] + (input_shape[-1], input_shape[-1])

class Measurement(layers.Layer):
    def __init__(self, num_measurements, output_dim, **kwargs):
        super(Measurement, self).__init__(**kwargs)
        self.num_measurements = num_measurements
        self.output_dim = output_dim
        self.measurement_vectors_real = self.add_weight(
            shape=(num_measurements, output_dim),
            initializer='glorot_uniform',
            name='measurement_vectors_real',
            dtype=tf.float32
        )
        self.measurement_vectors_imag = self.add_weight(
            shape=(num_measurements, output_dim),
            initializer='glorot_uniform',
            name='measurement_vectors_imag',
            dtype=tf.float32
        )

    def call(self, inputs):
        density_matrix_real, density_matrix_imag = tf.split(inputs, 2, axis=-1)
        measured_probs_real = tf.einsum('bij,ki->bk', density_matrix_real, self.measurement_vectors_real) - tf.einsum('bij,ki->bk', density_matrix_imag, self.measurement_vectors_imag)
        measured_probs_imag = tf.einsum('bij,ki->bk', density_matrix_real, self.measurement_vectors_imag) + tf.einsum('bij,ki->bk', density_matrix_imag, self.measurement_vectors_real)
        return tf.sqrt(tf.square(measured_probs_real) + tf.square(measured_probs_imag))

    def compute_output_shape(self, input_shape):
        return (input_shape[0], self.num_measurements)

def create_qpdn_model(vocab_size, embedding_dim, num_measurements, num_classes, max_length):
    inputs = layers.Input(shape=(max_length,))
    embedding_layer = ComplexEmbedding(vocab_size, embedding_dim)
    complex_embeddings = embedding_layer(inputs)
    density_matrix_layer = DensityMatrix()
    density_matrix = density_matrix_layer(complex_embeddings)
    measurement_layer = Measurement(num_measurements, embedding_dim)
    measured_probs = measurement_layer(density_matrix)

    lstm = layers.Bidirectional(layers.LSTM(64, return_sequences=True))(layers.Reshape((num_measurements, 1))(measured_probs))
    attention = layers.Attention()([lstm, lstm])

    conv1 = layers.Conv1D(filters=256, kernel_size=5, activation='relu', padding='same')(attention)
    conv1 = layers.BatchNormalization()(conv1)
    conv2 = layers.Conv1D(filters=128, kernel_size=3, activation='relu', padding='same')(conv1)
    conv2 = layers.BatchNormalization()(conv2)
    pool = layers.GlobalMaxPooling1D()(conv2)
    dropout = layers.Dropout(0.05)(pool)
    dense1 = layers.Dense(64, activation='relu')(dropout)
    dense1 = layers.Dropout(0.05)(dense1)
    dense2 = layers.Dense(32, activation='relu')(dense1)
    outputs = layers.Dense(num_classes, activation='softmax')(dense2)

    model = Model(inputs=inputs, outputs=outputs)

    # Save the complex embeddings using a Keras model
    embedding_extraction_model = Model(inputs=inputs, outputs=complex_embeddings)
    return model, embedding_extraction_model

def load_reuters_data(num_words, maxlen):
    (X_train, y_train), (X_test, y_test) = reuters.load_data(num_words=num_words, test_split=0.25)
    X_train = pad_sequences(X_train, maxlen=maxlen)
    X_test = pad_sequences(X_test, maxlen=maxlen)
    num_classes = np.max(y_train) + 1
    y_train = to_categorical(y_train, num_classes=num_classes)
    y_test = to_categorical(y_test, num_classes=num_classes)
    return (X_train, y_train), (X_test, y_test), num_words, num_classes

def main():
    max_words = 20000
    max_length = 200
    embedding_dim = 200
    num_measurements = 100

    (X_train, y_train), (X_test, y_test), vocab_size, num_classes = load_reuters_data(max_words, max_length)

    _, embedding_extraction_model = create_qpdn_model(vocab_size, embedding_dim, num_measurements, num_classes, max_length)

    # Save complex embeddings for a sample batch
    complex_embeddings = embedding_extraction_model.predict(X_train[:32])  # Using a batch size of 32 for example
    with open("complex_embeddings.pkl", "wb") as f:
        pickle.dump(complex_embeddings, f)

    print("Complex embeddings saved to complex_embeddings.pkl")

if __name__ == "__main__":
    main()


[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 68ms/step
Complex embeddings saved to complex_embeddings.pkl
