In [37]:
import random
from tqdm import tqdm
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
import seaborn as sns
import torch
from torch import nn, optim, Tensor

from torch_sparse import SparseTensor, matmul

from torch_geometric.utils import structured_negative_sampling
from torch_geometric.data import download_url, extract_zip
from torch_geometric.nn.conv.gcn_conv import gcn_norm
from torch_geometric.nn.conv import MessagePassing
from torch_geometric.typing import Adj
import boto3
import json
import os
from io import StringIO 
# Topic model
from bertopic import BERTopic

In [38]:
torch.__version__

'2.0.0+cu117'

In [39]:
JSON_CONTENT_TYPE = "application/json"
BUCKET_NAME = 'ecc-scraper-data'


In [40]:
# code from pytorch geometric lightgcn_pyg.ipynb implementation
# defines LightGCN model
class LightGCN(MessagePassing):
    """LightGCN Model as proposed in https://arxiv.org/abs/2002.02126
    """

    def __init__(self, num_users, num_items, edge_user_post, embedding_dim=64, K=3, add_self_loops=False):
        """Initializes LightGCN Model

        Args:
            num_users (int): Number of users
            num_items (int): Number of items
            embedding_dim (int, optional): Dimensionality of embeddings. Defaults to 8.
            K (int, optional): Number of message passing layers. Defaults to 3.
            add_self_loops (bool, optional): Whether to add self loops for message passing. Defaults to False.
        """
        super().__init__()
        self.num_users, self.num_items = num_users, num_items
        self.embedding_dim, self.K = embedding_dim, K
        self.add_self_loops = add_self_loops

        self.users_emb = nn.Embedding(
            num_embeddings=self.num_users, embedding_dim=self.embedding_dim) # e_u^0
        self.items_emb = nn.Embedding(
            num_embeddings=self.num_items, embedding_dim=self.embedding_dim) # e_i^0
        self.edge_user_post = edge_user_post
        
        nn.init.normal_(self.users_emb.weight, std=0.1)
        nn.init.normal_(self.items_emb.weight, std=0.1)

    def forward(self, edge_index: SparseTensor):
        """Forward propagation of LightGCN Model.

        Args:
            edge_index (SparseTensor): adjacency matrix

        Returns:
            tuple (Tensor): e_u_k, e_u_0, e_i_k, e_i_0
        """
        # compute \tilde{A}: symmetrically normalized adjacency matrix
        edge_index_norm = gcn_norm(
            edge_index, add_self_loops=self.add_self_loops)

        emb_0 = torch.cat([self.users_emb.weight, self.items_emb.weight]) # E^0
        embs = [emb_0]
        emb_k = emb_0

        # multi-scale diffusion
        for i in range(self.K):
            emb_k = self.propagate(edge_index_norm, x=emb_k)
            embs.append(emb_k)

        embs = torch.stack(embs, dim=1)
        emb_final = torch.mean(embs, dim=1) # E^K

        users_emb_final, items_emb_final = torch.split(
            emb_final, [self.num_users, self.num_items]) # splits into e_u^K and e_i^K

        # returns e_u^K, e_u^0, e_i^K, e_i^0
        return users_emb_final, self.users_emb.weight, items_emb_final, self.items_emb.weight

    def message(self, x_j: Tensor) -> Tensor:
        return x_j

    def message_and_aggregate(self, adj_t: SparseTensor, x: Tensor) -> Tensor:
        # computes \tilde{A} @ x
        return matmul(adj_t, x)


In [41]:
# helper function to get N_u
def get_user_positive_items(edge_index):
    """Generates dictionary of positive items for each user

    Args:
        edge_index (torch.Tensor): 2 by N list of edges

    Returns:
        dict: dictionary of positive items for each user
    """
    user_pos_items = {}
    for i in range(edge_index.shape[1]):
        user = edge_index[0][i].item()
        item = edge_index[1][i].item()
        if user not in user_pos_items:
            user_pos_items[user] = []
        user_pos_items[user].append(item)
    return user_pos_items

def load_node_csv(old_df, index_col):
    """Loads csv containing node information

    Args:
        path (str): path to csv file
        index_col (str): column name of index column

    Returns:
        dict: mapping of csv row to node id
    """
    df = old_df.set_index(index_col)
    mapping = {index: i for i, index in enumerate(df.index.unique())}
    return mapping

In [43]:
def model_fn(model_dir, s3 = boto3.client('s3')):
    objects = s3.list_objects_v2(Bucket=BUCKET_NAME)['Contents']
    objects = list( objects)

    for obj in objects:
        if(obj['Key']=='preprocessed_data.csv'):
            contents = s3.get_object(Bucket=BUCKET_NAME, Key=obj['Key'])
            contents = contents['Body'].read().decode('utf-8')
            df = pd.read_csv(StringIO(contents), index_col=0)
            break

    df.rename( columns={0 :'id'}, inplace=True )
    
    user_mapping = load_node_csv(df, index_col='current_user')
    post_mapping = load_node_csv(df, index_col='post_id')
    author_mapping = load_node_csv(df, index_col='authorUrl')

    postid_text = pd.Series(df.text.values, index=df.post_id).to_dict()
    postid_author = pd.Series(df.author.values, index=df.post_id).to_dict()
    postid_original_user = pd.Series(df.current_user.values, index=df.post_id).to_dict()

    s3.download_file(
        'ecc-model-dump',
        'saved_model.pt', 
        'saved_model.pt'   
    )   
    s3.download_file(
        'ecc-model-dump',
        'bert_topic_model', 
        'bert_topic_model'
    )
    
    topic_model = BERTopic.load("bert_topic_model")
    model = torch.load('saved_model.pt')
    
    return (model, topic_model, user_mapping, post_mapping, author_mapping, postid_text, postid_author, postid_original_user, df)

In [44]:
def input_fn(serialized_input_data, content_type=JSON_CONTENT_TYPE):
    if content_type == JSON_CONTENT_TYPE:
        input_data = json.loads(serialized_input_data)
        return input_data

    else:
        raise Exception("Requested unsupported ContentType in Accept: " + content_type)
        return

In [45]:
def predict_fn(input_data, d):
    model, topic_model = d[0], d[1]
    user_mapping, post_mapping, author_mapping = d[2],  d[3], d[4]

    postid_text, postid_author, postid_original_user = d[5], d[6], d[7]
    
    df = d[8]

    user_id = input_data['current_user']
    num_recs = int(input_data['n_recommendations'])


    if user_id not in user_mapping:
        return {'Error':'User not found'}

    user = user_mapping[user_id]
    
    e_u = model.users_emb.weight[user]
    scores = model.items_emb.weight @ e_u

    user_pos_items = get_user_positive_items(model.edge_user_post)
    
    values, indices = torch.topk(scores, k=len(user_pos_items[user]) + num_recs)

    posts = [index.cpu().item() for index in indices if index in user_pos_items[user]][:num_recs]
    post_ids = [list(post_mapping.keys())[list(post_mapping.values()).index(post)] for post in posts]
    liked_cleaned_posts = df[df['post_id'].isin(post_ids)]
    
    if len(posts) == 0 or len(post_ids) == 0:
        return {'Error':'No recommendations yet'}
    
    texts = [postid_text[id] for id in post_ids]
    authors =  [postid_author[id] for id in post_ids]
    original_users = [postid_original_user[id] for id in post_ids]
    
    data = {}
    data['user'] = user_id

    # TODO: Implement by merging topic modelling
    data['recommended_topics'] = []

    # stores liked and recommended posts
    data['liked_posts'] = []
    data['recommended_posts'] = []
    
    for i in range(len(post_ids)):
        stat = liked_cleaned_posts.iloc[i]
        print(liked_cleaned_posts.columns)
        data['liked_posts'].append(
            {
                'author': authors[i],
                'original_user': original_users[i],
                'post_id': f"https://www.linkedin.com/feed/update/{post_ids[i]}/",
                'post_body': texts[i],
                'top_words': stat['top_words'],
                'topic': stat['topic_name']
            }
        )

    posts = [index.cpu().item() for index in indices if index not in user_pos_items[user]][:num_recs]
    post_ids = [list(post_mapping.keys())[list(post_mapping.values()).index(post)] for post in posts]
    texts = [postid_text[id] for id in post_ids]
    authors =  [postid_author[id] for id in post_ids]
    original_users = [postid_original_user[id] for id in post_ids]
    
    reco_cleaned_posts = df[df['post_id'].isin(post_ids)]
    
    for i in range(len(post_ids)):
        stat = reco_cleaned_posts.iloc[i]
        data['recommended_posts'].append(
            {
                'author': authors[i],
                'original_user': original_users[i],
                'post_id': f"https://www.linkedin.com/feed/update/{post_ids[i]}/",
                'post_body': texts[i],
                'top_words': stat['top_words'],
                'topic': stat['topic_name']
            }
        )
    
    
    return data

In [46]:
def output_fn(prediction_output, accept=JSON_CONTENT_TYPE):
    print("recommendations", prediction_output)

    if accept == JSON_CONTENT_TYPE:
        return json.dumps(prediction_output), accept

    raise Exception("Requested unsupported ContentType being accepted: " + accept)

In [47]:
s3 = boto3.client('s3', 
                          aws_access_key_id='AKIA2Z2BTOGT36MIGFYG',
                          aws_secret_access_key='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',
                          region_name='us-east-2'
                      )

In [48]:
model_map = model_fn(model_dir=None,s3=s3)

In [55]:
input_data = input_fn('{"current_user":"skini25", "n_recommendations":2}')

In [56]:
prediction = predict_fn(input_data, model_map)

Index(['post_id', 'timestamp', 'source', 'current_user', 'authorUrl',
       'authorImage', 'author', 'text', 'n_reposts', 'n_reactions',
       'n_comments', 'reaction', 'post_without_stopwords', 'post_lemmatized',
       'topic_id', 'top_words', 'topic_name'],
      dtype='object')
Index(['post_id', 'timestamp', 'source', 'current_user', 'authorUrl',
       'authorImage', 'author', 'text', 'n_reposts', 'n_reactions',
       'n_comments', 'reaction', 'post_without_stopwords', 'post_lemmatized',
       'topic_id', 'top_words', 'topic_name'],
      dtype='object')


In [57]:
prediction

{'user': 'skini25',
 'recommended_topics': [],
 'liked_posts': [{'author': 'Jenna Race',
   'original_user': 'skini25',
   'post_id': 'https://www.linkedin.com/feed/update/urn:li:activity:7046607637613858816/',
   'post_body': 'SpaceX is #hiring a Senior Accountant, Revenue for #starlink. Position is on-site based in Hawthorne, CA. Looking for someone with 4+ years of experience in accounting for revenue across a multinational sales cycle. SQL and Python experience a huge plus.Job post here for more details on requirements--> https://lnkd.in/guy8SjUHKnow someone? DM or comment below! #accountingjobs #financejobs #hiringnow',
   'top_words': 'cybersecurity - award - entrylevel - innovation - pharmacy - leap - employees - commitment - tutort - gregmat',
   'topic': '15_cybersecurity_award_entrylevel_innovation'},
  {'author': 'Matt Gray',
   'original_user': 'skini25',
   'post_id': 'https://www.linkedin.com/feed/update/urn:li:activity:7049046438068056064/',
   'post_body': 'Twitter just

In [None]:
# !pip list --format=freeze > requirements.txt

In [None]:
import sagemaker
from sagemaker import get_execution_role
from sagemaker.pytorch.model import PyTorchModel

In [None]:
sm_sess = sagemaker.Session()
sess = boto3.Session()
role = get_execution_role()
print(role, sess.region_name)
instance_type = "ml.t2.medium"

# We'll use a pytorch inference DLC image that ships with sagemaker-pytorch-inference-toolkit v2.0.6. This version includes support for Torchserve environment variables used below.
image_uri = sagemaker.image_uris.retrieve(
    framework="pytorch",
    region=sess.region_name ,
    py_version="py39",
    image_scope="inference",
    version="1.13.1",
    instance_type=instance_type,
)

In [None]:
image_uri

In [None]:
's3://ecc-model-dump/gnn.tar.gz'

pytorch_model = PyTorchModel(
     model_data='s3://ecc-model-dump/gnn.tar.gz', 
     role=role,
     entry_point='inference.py',
     image_uri=image_uri,
     framework_version="1.13.1",
     code_location='s3://ecc-model-dump/models'
)

In [None]:
predictor = pytorch_model.deploy(
    initial_instance_count=1,
    instance_type=instance_type
)

In [None]:
import multiprocessing


def invoke(endpoint_name):
    predictor = sagemaker.predictor.Predictor(
        endpoint_name,
        sm_sess,
        serializer=sagemaker.serializers.JSONSerializer(),
        deserializer=sagemaker.deserializers.BytesDeserializer(),
    )
    return predictor.predict(
        '{"current_user":"kssreesha", "n_recommendations":2}'
    )


endpoint_name = 'pytorch-inference-2023-04-21-22-52-19-808'
# predictor.endpoint_name
pool = multiprocessing.Pool(1)
results = pool.map(invoke, 1 * [endpoint_name])
pool.close()
pool.join()
print(results)

In [None]:
predictor.delete_endpoint(predictor.endpoint_name)