In [None]:
import matplotlib.pyplot as plt
import numpy as np

#
# Plots the evolution of the training accuracy and loss
#
def plot(h,epochs):
    LOSS = 0; ACCURACY = 1
    training = np.zeros((2,epochs)); testing = np.zeros((2,epochs))
    training[LOSS] = h.history['loss']
    testing[LOSS] = h.history['val_loss']    # validation loss
    training[ACCURACY] = h.history['mae']
    testing[ACCURACY] = h.history['val_mae']  # validation accuracy

    epochs = range(1,epochs+1)
    fig, axs = plt.subplots(1,2, figsize=(17,5))
    for i, label in zip((LOSS, ACCURACY),('loss', 'mae')):   
        axs[i].plot(epochs, training[i], 'b-', label='Training ' + label)
        axs[i].plot(epochs, testing[i], 'y-', label='Test ' + label)
        axs[i].set_title('Training and test ' + label)
        axs[i].set_xlabel('Epochs')
        axs[i].set_ylabel(label)
        axs[i].legend()
        axs[i].grid(True)
    plt.show()

In [None]:
import pandas as pd
from sklearn.model_selection import train_test_split

# Load data
data = pd.read_csv('1_generate_scenarios/Musical_Instruments.csv')

# Map user and item IDs to integers
user_map = {uid: idx + 1 for idx, uid in enumerate(data['user_id'].unique())}
item_map = {iid: idx + 1 for idx, iid in enumerate(data['parent_asin'].unique())}

data['user_id_num'] = data['user_id'].map(user_map)
data['item_id'] = data['parent_asin'].map(item_map)

# Count users/items
NUM_USERS = len(user_map)
NUM_ITEMS = len(item_map)

print('Number of users:', NUM_USERS)
print('Number of items:', NUM_ITEMS)

# Prepare data for the model
data = data[['user_id_num', 'item_id', 'rating']]

# Split
train, test = train_test_split(data, test_size=0.3, random_state=50)

# Convert to float32 arrays for Keras
train = train.to_numpy(dtype=np.float32)
test = test.to_numpy(dtype=np.float32)

# Define index positions
USER = 0
ITEM = 1
RATING = 2


In [None]:
#
# Running DEEPMF to obtain the embedding weights (both users and items)
#
from keras.models import Model, Sequential
from keras.layers import Embedding, Flatten, Input, Dropout, Dense, Concatenate, Dot
from keras.optimizers import Adam

latent_dim = 5  # 5 neurons in the embedding can adequately code both the user and items

item_input = Input(shape=[1],name='item-input')
item_embedding = Embedding(NUM_ITEMS + 1, latent_dim, name='item-embedding')(item_input)
item_vec = Flatten(name='item-flatten')(item_embedding)

user_input = Input(shape=[1],name='user-input')
user_embedding = Embedding(NUM_USERS + 1, latent_dim, name='user-embedding')(user_input)
user_vec = Flatten(name='user-flatten')(user_embedding)

dot = Dot(axes=1,name='item-user-concat')([item_vec, user_vec])

model_deepMF = Model([user_input, item_input], dot)
model_deepMF.compile(optimizer='adam', metrics=['mae'], loss='mean_squared_error')

model_deepMF.summary()

EPOCHS = 10
history_deepMF = model_deepMF.fit([train[:,USER],train[:,ITEM]],train[:,RATING], 
                    validation_data=([test[:,USER],test[:,ITEM]], test[:,RATING]), 
                    epochs=EPOCHS, verbose=1)
plot(history_deepMF,EPOCHS)

In [None]:
# Save the model
model_deepMF.save('ModelDeepMFV2.keras')

In [None]:
from keras.models import Model

# we create two models, from model_deepMF, to get user's and item's embeddings
model_user_embeddings = Model(inputs=user_input, outputs=user_embedding)
model_item_embeddings = Model(inputs=item_input, outputs=item_embedding)

# obtaining all the existing users an items activation maps
user_embeddings = model_user_embeddings.predict(np.array(range(NUM_USERS+1)))
item_embeddings = model_item_embeddings.predict(np.array(range(NUM_ITEMS+1)))
    

In [None]:
import random

#
# creates the dataset of real samples: <user embedding, item embedding, rating>
#
def get_dataset(data):
    embedding_dataset = np.zeros((len(data)+1,latent_dim * 2)) 
    ratings = np.zeros((len(data)+1))
    for i in range(len(data)):
        user_embedding = user_embeddings[int(data[i,USER])][0]
        item_embedding = item_embeddings[int(data[i,ITEM])][0]
        ratings[i] = (data[i,RATING]- 3.0) / 4.0   # normalized -0.5 to 0.5
        embedding_dataset[i] = np.concatenate((user_embedding, item_embedding))
    # add ratings
    embedding_dataset = np.insert(embedding_dataset, latent_dim * 2, ratings, axis=1)
    return embedding_dataset

# train and test embeddings and ratings old_datasets (positive values)
embedding_dataset = get_dataset(np.array(data))

In [None]:
 #
# GAN to create the fake samples.
# Both the generator and the discriminator models are really small because the source samples are not
# large and sparse vectors; they are small and dense: 5 real number to code the user, 5 real numbers to 
# code the item and a real number to code the normalized rating.
#
from keras.models import Model, Sequential
from keras.layers import Embedding, Flatten, Input, Dropout, Dense, Concatenate, Dot, LeakyReLU, BatchNormalization
from keras.optimizers import Adam

class GAN():
    def __init__(self):
        self.latent_dim = latent_dim
        self.noise_dim = 100

        optimizer_d = Adam(0.0006, 0.5)
        optimizer_g = Adam(0.00001, 0.5)

        # Build and compile the discriminator
        self.discriminator = self.build_discriminator()
        self.discriminator.compile(loss='binary_crossentropy',
            optimizer=optimizer_d,
            metrics=['accuracy'])

        # Build the generator
        self.generator = self.build_generator()

        # The generator takes noise as input and generates user
        z = Input(shape=(self.noise_dim,))
        fake_sample = self.generator(z)

        # For the combined model we will only train the generator
        self.discriminator.trainable = False

        # The discriminator takes generated users as input and determines validity
        validity = self.discriminator(fake_sample)

        # The combined model  (stacked generator and discriminator)
        # Trains the generator to fool the discriminator
        self.combined = Model(z, validity)
        self.combined.compile(loss='binary_crossentropy', optimizer=optimizer_g)


    def build_generator(self):

        model = Sequential()
        model.add(Dense(10, input_dim=self.noise_dim))
        model.add(LeakyReLU(alpha=0.2))
        model.add(BatchNormalization(momentum=0.8))
        model.add(Dense(20, input_dim=self.noise_dim))
        model.add(LeakyReLU(alpha=0.2))
        model.add(Dropout(0.2))
        model.add(Dense(self.latent_dim*2+1, activation='linear'))
        model.summary()

        noise = Input(shape=(self.noise_dim,))
        fake_sample = model(noise)

        return Model(noise, fake_sample)

    
    def build_discriminator(self):

        model = Sequential()

        model.add(Dense(4, input_dim=self.latent_dim*2 + 1))
        model.add(LeakyReLU(alpha=0.2))
        model.add(Dense(1, activation='sigmoid'))
        model.summary()

        sample = Input(shape=(self.latent_dim*2 + 1,))
        validity = model(sample)

        return Model(sample, validity)

    
    def train(self, dataset, epochs, batch_size=128, sample_interval=50, num_training_samples=100000):

        # Adversarial ground truths
        valid = np.ones((batch_size, 1))
        fake = np.zeros((batch_size, 1))

        for epoch in range(epochs):

            # ---------------------
            #  Train Discriminator
            # ---------------------

            # Select a random batch of real samples
            idx = np.random.randint(0, num_training_samples, batch_size)
            real_samples = dataset[idx]
           
            noise = np.random.normal(0, 1, (batch_size, self.noise_dim))

            # Generate a batch of fake users
            fake_samples = self.generator.predict(noise, verbose=0)

            # Train the discriminator
            d_loss_real = self.discriminator.train_on_batch(real_samples, valid)
            d_loss_fake = self.discriminator.train_on_batch(fake_samples, fake)
            d_loss = 0.5 * np.add(d_loss_real, d_loss_fake)

            # ---------------------
            #  Train Generator
            # ---------------------

            noise = np.random.normal(0, 1, (batch_size, self.noise_dim))

            # Train the generator (to have the discriminator label samples as valid)
            g_loss = self.combined.train_on_batch(noise, valid)

            if epoch % sample_interval == 0:
                print ("%d [D loss: %f, acc.: %.2f%%] [G loss: %f]" % (epoch, d_loss[0], 100*d_loss[1], g_loss))
                noise = np.random.normal(0, 1, (1, self.noise_dim))
            
            if epoch > 0 and epoch % 2000 == 0:
                filename = f"gan_generator_epoch{epoch}.h5"
                self.generator.save(filename)
                print(f"✅ Generator gespeichert: {filename}")

if __name__ == '__main__':
    gan = GAN()
    gan.train(embedding_dataset, epochs=20000, batch_size=32, sample_interval=200, 
              num_training_samples = len(embedding_dataset))
    gan.generator.save('GANRS.h5')
    