### Two Tower Model for learning user and post embeddings based on interaction data

In [None]:
import tensorflow as tf
from tensorflow.keras import layers
import tensorflow_recommenders as tfrs

### Define user and post towers, as well as two tower model for learning embeddings

In [None]:
class UserModel(tf.keras.Model):
    def __init__(self, user_dataset):
        super().__init__()
        
        # Vocabulary size (replace with the actual size of your vocabularies)
        location_vocab_size = 10000
        
        embedding_dim = 32  # Set the embedding dimension
        
        # String lookup layers to convert strings to integer indices
        self.location_lookup = tf.keras.layers.StringLookup(max_tokens=location_vocab_size)
    
        # Projection layers for BERT embeddings
        self.passions_projection = layers.Dense(embedding_dim, activation="relu")
        self.project_titles_projection = layers.Dense(embedding_dim, activation="relu")
        self.bio_projection = layers.Dense(embedding_dim, activation="relu")

        # Embedding location layer
        self.location_embedding = layers.Embedding(input_dim=location_vocab_size, output_dim=embedding_dim)
        
        # Dense layers to combine features
        self.dense_layers = tf.keras.Sequential([
            layers.Dense(128, activation="relu"),
            layers.Dense(embedding_dim)
        ])

    def build(self, input_shape):
        # This is where you initialize all variables
        self.built = True  # Mark the layer as built to prevent additional builds

    def call(self, inputs):
        # Inputs for textual data
        passions_embedding = self.passions_projection(inputs['passions'])  # Projected to 32 dimensions
        project_titles_embedding = self.project_titles_projection(inputs['project_titles'])
        bio_embedding = self.bio_projection(inputs['bio'])
        
        # Convert string categorical data to integer indices
        h3_location = inputs['h3_location']
        
        # Embedding lookups
        location_embedding = self.location_embedding(h3_location)  # Now uses the integer indices
        location_embedding = tf.reduce_mean(location_embedding, axis=1)  # Shape: (batch_size, embedding_dim)
        
        # Concatenate all embeddings
        user_embedding = tf.concat([
            tf.reduce_mean(passions_embedding, axis=1),  # Average the embeddings
            tf.reduce_mean(project_titles_embedding, axis=1),
            tf.reduce_mean(bio_embedding, axis=1),
            location_embedding
        ], axis=1)
        
        # Pass through dense layers to combine features
        return self.dense_layers(user_embedding)

In [None]:
class PostModel(tf.keras.Model):
    def __init__(self, post_dataset):
        super().__init__()
        
        embedding_dim = 32  # Set the embedding dimension

        # BERT embedding layers projected to 32 dimensions
        self.title_projection = layers.Dense(embedding_dim, activation="relu")
        self.description_projection = layers.Dense(embedding_dim, activation="relu")
        
        # Dense layers to combine all post features
        self.dense_layers = tf.keras.Sequential([
            layers.Dense(128, activation="relu"),
            layers.Dense(embedding_dim)
        ])
    
    def call(self, inputs):

        title_embedding = self.title_projection(inputs['title'])  # Projected to 32 dimensions
        description_embedding = self.description_projection(inputs['description'])
        
        # Concatenate all embeddings
        post_embedding = tf.concat([
            tf.reduce_mean(title_embedding, axis=1),  # Average the embeddings
            tf.reduce_mean(description_embedding, axis=1),
        ], axis=1)
        
        # Pass through dense layers to combine features
        return self.dense_layers(post_embedding)

In [None]:
# Define the Two-Tower Model combining User and Post Models
class TwoTowerModel(tfrs.models.Model):

    def __init__(self, user_model, post_model):
        super().__init__()
        
        # Save user and post towers
        self.user_model = user_model
        self.post_model = post_model
        
        # Retrieval task (metrics simplified)
        self.task = tfrs.tasks.Retrieval()

    

    def compute_loss(self, features, training=False):
        # Extract user and post features from the input
        user_inputs = {
            'user_id': features['user_id'],
            'passions': features['passions'],
            'project_titles': features['project_titles'],
            'bio': features['bio'],
            'h3_location': features['h3_location'],
        }
    
        post_inputs = {
            'post_id': features['post_id'],
            'title': features['title'],
            'description': features['description']
        }
    
        # Get user and post embeddings
        user_embeddings = self.user_model(user_inputs)
        post_embeddings = self.post_model(post_inputs)
    
        # Extract interaction weight
        weight = features["weight"]
    
        # Compute the retrieval loss
        loss = self.task(user_embeddings, post_embeddings, compute_metrics=not training)
    
        # Scale the loss by the weight
        weighted_loss = loss * tf.cast(weight, loss.dtype)
    
        # Aggregate the loss across the batch
        batch_loss = tf.reduce_mean(weighted_loss)
    
        return batch_loss

### Load TFRecords from GCS buckets for training

In [None]:
from google.cloud import storage

# Define GCS paths for user, post, and interaction data
USER_TFRECORD_PATH = 'gs://user_bucket/*.tfrecord'
POST_TFRECORD_PATH = 'gs://post_bucket/*.tfrecord'
INTERACTION_TFRECORD_PATH = 'gs://interaction_bucket/*.tfrecord'
USER_EMBEDDINGS_OUTPUT_PATH = 'gs://vector_bucket/user_embeddings.npy'
POST_EMBEDDINGS_OUTPUT_PATH = 'gs://vector_bucket/post_embeddings.npy'

# Load data from TFRecord files
def load_tfrecord_data(file_pattern, feature_description):
    raw_dataset = tf.data.TFRecordDataset(tf.io.gfile.glob(file_pattern))
    return raw_dataset.map(lambda x: tf.io.parse_single_example(x, feature_description))

# Define the feature description dictionaries
user_feature_description = {
    'user_id': tf.io.FixedLenFeature([], tf.string),
    'passions': tf.io.FixedLenFeature([], tf.string),
    'project_titles': tf.io.FixedLenFeature([], tf.string),
    'bio': tf.io.FixedLenFeature([], tf.string),
    'h3_location': tf.io.FixedLenFeature([], tf.string),
}

post_feature_description = {
    'post_id': tf.io.FixedLenFeature([], tf.string),
    'title': tf.io.FixedLenFeature([], tf.string),
    'description': tf.io.FixedLenFeature([], tf.string),
}

interaction_feature_description = {
    'user_id': tf.io.FixedLenFeature([], tf.string),
    'post_id': tf.io.FixedLenFeature([], tf.string),
    'weight': tf.io.FixedLenFeature([], tf.float32),
}

# Load user, post, and interaction datasets
user_dataset = load_tfrecord_data(USER_TFRECORD_PATH, user_feature_description)
post_dataset = load_tfrecord_data(POST_TFRECORD_PATH, post_feature_description)
interaction_dataset = load_tfrecord_data(INTERACTION_TFRECORD_PATH, interaction_feature_description)

# Prepare the interaction data for training
train_data = interaction_dataset.map(lambda x: {
    "user_id": x["user_id"],
    "post_id": x["post_id"],
    "weight": x["weight"]
})

### Train the model and save embeddings to vector bucket in GCS for Vertex AI Vector Search

In [None]:
# Initialize and compile the two-tower model
user_model = UserModel(user_dataset)
post_model = PostModel(post_dataset)
two_tower_model = TwoTowerModel(user_model=user_model, post_model=post_model)
two_tower_model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.001))

# Train the model
BATCH_SIZE = 256
EPOCHS = 5

two_tower_model.fit(train_data.batch(BATCH_SIZE), epochs=EPOCHS)

# Function to save embeddings to Google Cloud Storage
def save_embeddings(embeddings, output_path, name):
    # Convert embeddings to tensors
    embeddings_tensor = tf.convert_to_tensor(embeddings)
    
    # Save to GCS using Google Cloud Storage client
    client = storage.Client()
    bucket = client.bucket("code_bucket")
    blob = bucket.blob(f"{output_path}/{name}_embeddings.npy")
    blob.upload_from_string(tf.io.serialize_tensor(embeddings_tensor).numpy())

# Generate and save user embeddings
user_embeddings = []
for user in user_dataset:
    user_input = {
        'passions': user['passions'],
        'project_titles': user['project_titles'],
        'bio': user['bio'],
        'h3_location': user['h3_location']
    }
    user_embeddings.append(two_tower_model.user_model(user_input).numpy())
save_embeddings(user_embeddings, USER_EMBEDDINGS_OUTPUT_PATH, "user")

# Generate and save post embeddings
post_embeddings = []
for post in post_dataset:
    post_input = {
        'title': post['title'],
        'description': post['description']
    }
    post_embeddings.append(two_tower_model.post_model(post_input).numpy())
save_embeddings(post_embeddings, POST_EMBEDDINGS_OUTPUT_PATH, "post")

# Save the trained model for inference or further usage
two_tower_model.save("gs://model_bucket/models/ttm_model")