In [None]:
import numpy as np
import pandas as pd
from transformers import BertTokenizer, TFBertModel
from sklearn.preprocessing import normalize
from sklearn.metrics.pairwise import cosine_similarity
from tensorflow.keras.layers import Dense, Input, Concatenate
from tensorflow.keras.models import Model
from IPython.display import clear_output
from transformers import logging
import tensorflow as tf

In [2]:
data = pd.read_csv("books_data.csv", nrows=20000)
data['Title'] = data['Title'].fillna('Unknown')
data['categories'] = data['categories'].fillna('Unknown')
data['description'] = data['description'].fillna('')
data['description'] = data['description'].apply(lambda x: x.lower())
data['book_content'] = (data['Title'] + ' ') * 2 + ' ' + data['description'] + ' ' + data['authors'].apply(lambda x: ' '.join(x) if isinstance(x, list) else '')
data['book_content'] = data['book_content'].str.replace(r'[^\w\s]', '', regex=True).str.lower()

In [None]:
# Load the BERT tokenizer and model
logging.set_verbosity_error()
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
bert_model = TFBertModel.from_pretrained('bert-base-uncased')

In [4]:
# Step 6: Tokenize the book content using BERT tokenizer
def tokenize_texts(texts, max_len=128):
    return tokenizer(
        texts.tolist(),
        max_length=max_len,
        padding='max_length',
        truncation=True,
        return_tensors='tf'
    )

In [None]:
tokenized_data = tokenize_texts(data['book_content'], max_len=128)
tokenized_data_length = len(tokenized_data['input_ids'])

def batch_bert_embeddings(data, batch_size=32, max_len=128):
    # Initialize empty list to store all embeddings
    all_embeddings = []
    
    # Iterate over the dataset in batches
    for start in range(0, len(data), batch_size):
        end = min(start + batch_size, len(data))
        batch_texts = data['book_content'][start:end]
        
        # Tokenize the batch
        tokenized_data = tokenizer(
            batch_texts.tolist(),
            max_length=max_len,
            padding='max_length',
            truncation=True,
            return_tensors='tf'
        )
        
        # Generate BERT embeddings for this batch
        batch_embeddings = bert_model(
            tokenized_data['input_ids'], 
            attention_mask=tokenized_data['attention_mask']
        )[1]
        
        # Append to the list of all embeddings
        all_embeddings.append(batch_embeddings)
        clear_output(wait=True)
        print(f"Processed {end} / {tokenized_data_length}  | {end / tokenized_data_length * 100:.2f}%")
    
    # Concatenate all embeddings into a single tensor
    return tf.concat(all_embeddings, axis=0)

# Use the function to get embeddings in batches
bert_embeddings = batch_bert_embeddings(data, batch_size=32)

In [None]:
# Convert TensorFlow tensor to NumPy array
bert_embeddings_np = bert_embeddings.numpy()

# Save the embeddings to a .npy file
np.save('bert_embeddings.npy', bert_embeddings_np)

print("Embeddings saved to disk.")

In [6]:
# Create pairs of books and labels (1 for similar, 0 for dissimilar based on 'categories')
from tensorflow.keras.utils import Sequence

class PairGenerator(Sequence):
    def __init__(self, data, bert_embeddings, batch_size=32):
        self.data = data
        self.bert_embeddings = bert_embeddings
        self.batch_size = batch_size
        self.indices = list(range(len(data)))

    def __len__(self):
        # Return the number of batches per epoch
        return int(np.floor(len(self.data) / self.batch_size))

    def __getitem__(self, idx):
        # Generate one batch of pairs and labels
        pairs = []
        labels = []
        batch_indices = self.indices[idx * self.batch_size:(idx + 1) * self.batch_size]

        for i in batch_indices:
            for j in batch_indices:
                if i != j:
                    if self.data['categories'].iloc[i] == self.data['categories'].iloc[j]:
                        labels.append(1)
                    else:
                        labels.append(0)
                    pairs.append((self.bert_embeddings[i], self.bert_embeddings[j]))

        X_train_1 = np.array([p[0] for p in pairs])
        X_train_2 = np.array([p[1] for p in pairs])
        return (X_train_1, X_train_2), np.array(labels)

# Create the pair generator for training
pair_gen = PairGenerator(data, bert_embeddings, batch_size=64)

In [7]:
# Define the CNN model to process BERT embeddings
def create_cnn_model():
    input_layer = Input(shape=(768,))  # BERT's pooled output has 768 dimensions
    dense_layer = Dense(128, activation='relu')(input_layer)
    return Model(inputs=input_layer, outputs=dense_layer)

cnn_branch = create_cnn_model()

In [None]:
# Create the Siamese network using the CNN model
input_1 = Input(shape=(768,))  # BERT embeddings for the first book
input_2 = Input(shape=(768,))  # BERT embeddings for the second book

# Pass both inputs through the shared CNN model
output_1 = cnn_branch(input_1)
output_2 = cnn_branch(input_2)

# Concatenate outputs and add Dense layers for classification
concatenated = Concatenate()([output_1, output_2])
dense_layer1 = Dense(128, activation='relu')(concatenated)
dense_layer2 = Dense(64, activation='relu')(dense_layer1)
output_layer = Dense(1, activation='sigmoid')(dense_layer2)

# Build and compile the Siamese model
siamese_model = Model(inputs=[input_1, input_2], outputs=output_layer)
siamese_model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

# Train the Siamese network

siamese_model.fit(pair_gen, epochs=10)

# Generate BERT embeddings for all books to compute similarity
book_embeddings = cnn_branch.predict(bert_embeddings)

In [None]:
siamese_model.save('siamese_model.h5')

In [16]:
normalized_book_embeddings = normalize(book_embeddings)
cosine_sim_matrix = cosine_similarity(normalized_book_embeddings)

In [17]:
# Define the book recommendation function based on cosine similarity
def recommend_books(book_title, threshold, cosine_sim_matrix):
    # Get the index of the book that matches the title
    idx = data[data['Title'] == book_title].index[0]

    # Get the cosine similarity scores for all books with this book
    sim_scores = list(enumerate(cosine_sim_matrix[idx]))

    # Sort the books based on the similarity scores
    sim_scores = sorted(sim_scores, key=lambda x: x[1], reverse=True)
    sim_scores = [(i, "{:.5f}".format(score)) for i, score in sim_scores if score >= threshold]

    # Get the book titles and their similarity scores
    book_recommendations = [(data['Title'].iloc[i[0]], i[1]) for i in sim_scores]

    return book_recommendations


In [18]:
from sklearn.metrics.pairwise import euclidean_distances

# Compute Euclidean distance matrix
euclidean_dist_matrix = euclidean_distances(book_embeddings)

# Define a recommendation function based on Euclidean distance
def recommend_books_by_euclidean(book_title, threshold, euclidean_dist_matrix):
    idx = data[data['Title'] == book_title].index[0]
    
    dist_scores = list(enumerate(euclidean_dist_matrix[idx]))
    
    # Sort the books based on Euclidean distance (lower is more similar)
    dist_scores = sorted(dist_scores, key=lambda x: x[1], reverse=True)
    
    
    # Filter recommendations based on threshold (optional)
    dist_scores = [(i, "{:.5f}".format(score)) for i, score in dist_scores if score >= threshold]
    recommendations = [(data['Title'].iloc[i[0]], "{:.5f}".format(score)) 
                       for i, score in dist_scores if score >= threshold]

    return recommendations

In [None]:

book_title = ''
while (book_title != 'q'):
  clear_output(wait=True)
  book_title = input("Enter the title of a book: ")
  # recommended_books = recommend_books(book_title, threshold=0.1)
  recommended_books = recommend_books(book_title, 0.5, cosine_sim_matrix)
  f = open('output.txt', 'w')
  f.write('Counts: ' + str(len(recommended_books)) + '\n\n')
  for book in recommended_books:
    f.write(book[1] + ' | ' + str(book[0]) + '\n')

  print('Found: ' + str(len(recommended_books)))