<a href="https://colab.research.google.com/github/philipjpark/Product-Recommendation/blob/main/ProductRecommendation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [3]:
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
!pip install kafka-python
!apt-get install -y librdkafka-dev
!pip install confluent-kafka
from kafka import KafkaConsumer, KafkaProducer

kz = pd.read_csv("/kz.csv").drop_duplicates()
#on_bad_lines skips certain lines

kz['event_time'] = pd.to_datetime(kz['event_time']).dt.date

kz.shape

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Reading package lists... Done
Building dependency tree       
Reading state information... Done
librdkafka-dev is already the newest version (1.2.1-1ubuntu1).
0 upgraded, 0 newly installed, 0 to remove and 24 not upgraded.
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


(323985, 8)

Notes: Here I am importing necessary libraries such as pandas, seaborn, and matplotlib.pyplot, and installing kafka-python, librdkafka-dev, and confluent-kafka packages in order to proceed with my real-time analysis for product recommendations.

In [4]:
kz.columns.values.tolist()

['event_time',
 'order_id',
 'product_id',
 'category_id',
 'category_code',
 'brand',
 'price',
 'user_id']

In [5]:
data_types= kz.dtypes

print(data_types)


event_time        object
order_id           int64
product_id         int64
category_id      float64
category_code     object
brand             object
price            float64
user_id          float64
dtype: object


In [6]:
kz.groupby('user_id', as_index=False).size().sort_values(by="size", ascending=False)

Unnamed: 0,user_id,size
21717,1.515916e+18,199
21662,1.515916e+18,192
21661,1.515916e+18,186
21722,1.515916e+18,184
21706,1.515916e+18,181
...,...,...
9334,1.515916e+18,1
9335,1.515916e+18,1
9337,1.515916e+18,1
9338,1.515916e+18,1


Notes: Here I am adding up all the instances in which a user made a purchase. Then I am arranging it from the most voluminous purchasers to the least and printing out the data types of each feature.

In [7]:
train = kz.loc[kz['event_time'] < pd.Timestamp('2020-09-01'),:]

valid = kz.loc[kz['event_time'] >= pd.Timestamp('2020-09-01'),:]

  train = kz.loc[kz['event_time'] < pd.Timestamp('2020-09-01'),:]
  valid = kz.loc[kz['event_time'] >= pd.Timestamp('2020-09-01'),:]


Notes: I am splitting the original data into two parts: a training set (train) containing all data before September 1st, 2020, and a validation set (valid) containing all data on or after September 1st, 2020. This is done to evaluate the model's performance.

In [8]:
import numpy as np

def avg_prec(actual, predicted, k=15):
    if len(predicted)>k:
        predicted = predicted[:k]  #limiting the number of predicted items to the top-k items. 

    score = 0.0
    num_hits = 0.0

    for i,p in enumerate(predicted):
        if p in actual and p not in predicted[:i]:
            num_hits += 1.0
            score += num_hits / (i+1.0)   #here we have a check for the predicted items to see if they are in the actual items list

    if not actual:
        return 0.0

    return score / min(len(actual), k)
def mean_avg_prec(actual, predicted, k=10):
    return np.mean([avg_prec(a,p,k) for a,p in zip(actual, predicted)])
    
actual_items = [3, 7, 4, 2, 5]
actual_items = [str(i) for i in actual_items]
recommended_items = [12, 7, 53, 90, 3, 23, 14, 37, 18, 67]
recommended_items = [str(i) for i in recommended_items]

avg_prec(actual_items, recommended_items) 
#The average precision of a recommendation system based on the actual items that the user interacted with ('actual_items') 
#Then we have items that the recommendation system recommended to the user ('recommended_items').


0.18

Notes: The avg_prec(), calculates the average precision of a recommendation system based on the actual items that the user interacted with (actual_items) and the items that were recommended to the user (recommended_items).

The metric measures how accurate and relevant the recommended items are, compared to the actual items that the user interacted with. Then it computes the score by looking at the number of recommended items that are also in the actual items list, and their position compared to the recommendation list. It then takes the average of these scores for all the recommended items, up to a max of k number of items.

The mean_avg_prec(), takes a list of actual items and a list of recommended items for multiple users, and computes the average of their average precision scores using the avg_prec().

**Approach Based on Rating**

In [9]:
product_ratings = train.groupby('product_id', as_index=False).size()['size'] 
#number of product rating reviews

num_ratings = train.drop_duplicates(['product_id', 'user_id']).groupby('product_id', as_index=False).size()['size'] 
#number of unique users that reviewed products but also removing duplicates

last_rating = train.groupby('product_id', as_index=False).agg({'event_time': max})
#most recent review made for the product. 

In [10]:
rating_count_df = pd.DataFrame({'number of ratings': product_ratings, 'unique user reviews': num_ratings})
rating_count_df = rating_count_df.join(last_rating).sort_values(by="unique user reviews", ascending=False)

rating_count_df

Unnamed: 0,number of ratings,unique user reviews,product_id,event_time
60,328,290,1515966223509088620,2020-07-08
126,715,252,1515966223509089045,2020-07-08
18,864,218,1515966223509088521,2020-07-08
287,762,212,1515966223509089486,2020-07-08
12059,218,201,2273948308235878951,2020-07-08
...,...,...,...,...
7300,1,1,1515966223510394144,2020-01-16
7296,3,1,1515966223510394072,2020-01-29
7292,1,1,1515966223510394014,2020-05-18
7287,1,1,1515966223510393966,2020-01-24


In [11]:
def create_ranked_df(train):
        product_ratings = train.groupby('product_id', as_index=False).size()['size']
        num_ratings = train.drop_duplicates(['product_id', 'user_id']).groupby('product_id', as_index=False).size()['size']
        last_rating = train.groupby('product_id', as_index=False).agg({'event_time': max}).rename(columns={'event_time': 'last_rating'})
        
        rating_count_df = pd.DataFrame({'avg_rating': product_ratings, 'num_ratings': num_ratings})
        rating_count_df = rating_count_df.join(last_rating)

        recs =  train.loc[:, ['product_id', 'category_id', 'category_code', 'brand', 'price']].drop_duplicates().merge(rating_count_df, on=['product_id'])

        ranked = recs.sort_values(['avg_rating', 'num_ratings', 'last_rating'], ascending=False)

        ranked = ranked[ranked['num_ratings'] > 4] 
        #filters out products with less than 5 reviews because it is not popular enough to be considered
        
        return ranked
    

def popular_recommendations(user_id, n_top, ranked):

    top = list(ranked['product_id'][:n_top])

    return top
#The function generates popular recommendations for a given user based on the previously established metrics.

In [12]:
ranked = create_ranked_df(train)

popular_recommendations('1', 20, ranked)

[1515966223509117074,
 2273948186349404174,
 1515966223523303307,
 2273948316473492113,
 1515966223509106786,
 2273948218662322995,
 2273948216221238252,
 2273948218662322996,
 1515966223509089067,
 1515966223509089598,
 2273948222722409184,
 1515966223509088521,
 2273948227654910595,
 1515966223509089646,
 1515966223509089283,
 1515966223509131886,
 1515966223509090011,
 2273948227654910594,
 1515966223523303304,
 2273948316532212462]

Notes: The create_ranked_df() takes a train DataFrame as input and generates a new DataFrame called ranked based on various attributes. The function then filters out products with less than 5 reviews because they are not popular enough.

The popular_recommendations() takes a user ID, a number of top recommendations to generate, and the ranked DataFrame as input. A list of the product IDs for the top n_top products, which are determined by sorting the ranked DataFrame by average rating, number of ratings, and most recent review, and selecting the top n_top products based on that ranking is outputted. 

The create_ranked_df() on the train DataFrame generates the ranked DataFrame, and then calls the popular_recommendations function with user ID# 1, n_top set to 20, and the ranked DataFrame as input. The function returns a list of the 20 most popular products to recommend to a user. 

***Implementing Real-Time Analysis***

Modified Version of Project Leader Leon's Code to implement Kafka.
  

In [22]:
import threading
import time
from queue import Queue
from collections import defaultdict
import heapq


class Message:
    def __init__(self, user_id, product_id, event_time):
        self.user_id = user_id
        self.product_id = product_id
        self.event_time = event_time

    def __str__(self):
        return f"Message:\n\tUserID: {self.user_id}\n\tProductID {self.product_id}\n\tEvent_Time: {self.event_time}"


class KafkaBroker:
    def __init__(self, topic):
        self.topic = topic
        self.data = {}

    def produce_message(self, message):
        if self.topic in self.data:
            self.data[self.topic].append(message)
        else:
            self.data[self.topic] = [message]

    def consume_messages(self):
        if self.topic in self.data and len(self.data[self.topic]) > 0:
            message = self.data[self.topic].pop(0)
            return message
        return None


class KafkaProducer:
    def __init__(self, broker, topic, df):
        self.broker = broker
        self.topic = topic
        self.df = df

    def send(self):
        for index, row in self.df.iterrows():
            message = Message(row['user_id'], row['product_id'], row['event_time'])
            self.broker.produce_message(message)


class KafkaConsumer:
    def __init__(self, broker, topic, user_id, n_top):
        self.broker = broker
        self.topic = topic
        self.queue = Queue()
        self.thread = threading.Thread(target=self.run)
        self.thread.start()
        self.counter = 0
        self.user_id = user_id
        self.n_top = n_top
        self.user_profiles = defaultdict(lambda: defaultdict(int))
        self.events = defaultdict(list)

    def run(self):
        while True:
            message = self.broker.consume_messages()
            if message is not None:
                self.queue.put(message)

    def poll(self, timeout=1):
        try:
            return self.queue.get(timeout=timeout)
        except:
            return None

    def calculate_and_output_recommendations(self, user_id):
        for event in self.events[user_id]:
            self.user_profiles[user_id][event.product_id] += 1 / (time.time() - event.event_time)

        top_n = []
        for product_id, score in self.user_profiles[user_id].items():
            if len(top_n) < self.n_top:
                heapq.heappush(top_n, (score, product_id))
            elif score > top_n[0][0]:
                heapq.heapreplace(top_n, (score, product_id))
        top_n = [product_id for score, product_id in heapq.nlargest(self.n_top, top_n)]

        print(f"Top-{self.n_top} recommendations for user {user_id}: {top_n}")



Explanation of the Real-Time Analysis Code:

The Message class is a class that represents a message with a user_id, product_id, and event_time.

The KafkaBroker class represents the message broker. It keeps track of all messages that are produced to the broker. The produce_message() takes a Message object and adds it to the 'data' dictionary.

The consume_messages() gets the first message in the dictionary for the current topic, removes it, and returns it. If there are no messages, it returns None.

The KafkaProducer class is responsible for producing messages. It takes a KafkaBroker object, a topic string, and a pandas dataframe as input. The send()iterates through the rows of the dataframe and creates a Message object for each row. The produce_message() connects to the KafkaBroker object by producing the messages. 

The KafkaConsumer class consumes messages. It takes a KafkaBroker object, a topic string, a user_id string, and an integer n_top as input. When a KafkaConsumer object is created, it creates a queue and a thread. This continuously runs the run(). The run() calls the consume_messages() of the KafkaBroker to get messages from the broker and adds them to the queue.

The poll() is used to retrieve messages from the queue. When there is no message in the queue, it returns None.

The calculate_and_output_recommendations() is called to calculate and output the top-n recommendations for the user with the given user_id. It first updates the user's attributes with the latest events by going through the events list for the user and computing a score for each product. The top_n recommendations for the user are then computed using the heapq module. The top recommendations are printed based on the precision metrics. 