In [1]:
import random
import math
import time
from tqdm import tqdm

In [2]:
def timmer(func):
    def wrapper(*args, **kwargs):
        start_time = time.time()
        res = func(*args, **kwargs)
        stop_time = time.time()
        print('Func %s, run time: %s'%(func.__name__, stop_time-start_time))
        return res
    return wrapper

In [3]:
class Dataset():
    
    def __init__(self, fp):
        self.fp = fp
    
    @timmer
    def loadData(self):
        data = []
        for l in open(self.fp):
            data.append(tuple(map(int, l.strip().split("::")[:2])))
        return data
    
    @timmer
    def splitData(self, M, k, seed=1):
        '''
        :params: data, 加载的所有(user, item)数据条目
        :params: M, 划分的数目，最后需要取M折的平均
        :params: k, 本次是第几次划分，k~[0, M)
        :params: seed, random的种子数，对于不同的k应设置成一样的
        :return: train, test
        '''
        train, test = [], []
        random.seed(seed)
        for user, item in self.data:
            if random.randint(0, M-1) == k:
                test.append((user, item))
            else:
                train.append((user, item))
        
        # 处理成字典的形式， user->set(items)
        def convert_dict(data):
            data_dict = {}
            for user, item in data:
                if user not in data_dict:
                    data_dict[user] = set()
                data_dict[user].add(item)
            data_dict = {k: list(data_dict[k]) for k in data_dict}
            return data_dict
        
        return convert_dict(train), convert_dict(test)

In [4]:
class Metric():
    def __init__(self, train, test, GetRecommendation):
        self.train = train
        self.test = test
        self.GetRecommendation = GetRecommendation
        self.recs = self.getRec()
    
    # 为test中的每个用户推荐
    def getRec(self):
        recs = {}
        for user in self.test:
            rank = self.GetRecommendation(user)
            recs[user] = rank
        return recs
    
    # 定义精确率指标计算方式
    def precision(self):
        all, hit = 0, 0
        for user in self.test:
            test_items = set(self.test[user])
            rank = self.recs[user]
            for item, score in rank:
                if item in test_items:
                    hit += 1
            all += len(rank)
        return round(hit / all*100, 2)
    
    # 召回率指标计算方式
    def recall(self):
        all, hit = 0, 0
        for user in self.test:
            test_items = set(self.test[user])
            rank = self.recs[user]
            for item , score in rank:
                if item in test_items:
                    hit += 1
            all += len(test_items)
        return round(hit/ all*100, 2)
    
    # 定义覆盖率指标的计算方式
    def coverage(self):
        all_items, recom_item = set(), set()
        for user in self.test:
            for item in self.train[user]:
                all_item.add(item)
            rank = self.recs[user]
            for item, score in rank:
                recom_item.add(item)
        return round(len(recom_item)/len(all_items)*100, 2)
    
    # 定义新颖度指标计算方式
    def popularity(self):
        # 计算物品的流行度
        item_pop ={}
        for user in self.train:
            for item in self.train[user]:
                if item not in item_pop:
                    item_pop[item] = 0
                item_pop[item] += 1
        num ,pop = 0, 0
        for user in self.test:
            rank = self.recs[user]
            for item, score in rank:
                pop += math.log(1+item_pop[item])
                num += 1
        return round(pop / num, 6)

    def eval(self):
        metrics = {'Precision': self.precision(),
                   'Recall': self.recall(),
                   'Coverage': self.coverage(),
                   'Popularity': self.popularity()
                  }
        print('Metrics:', metric)
        return metric

In [5]:
def Random(train, K, N):
    """
    :params: train, 训练数据集
    :params: K, 可忽略
    :params: N, 超参数，设置取TopN推荐物品数目
    :return: GetRecommendation，推荐接口函数
    """
    items = {}
    for user in train:
        for item in train[user]:
            items[item] = 1
    
    def GetRecommendation(user):
        user_items = set(train[user])
        rec_items = {k: items[k] for k in items if k not in user_items}
        rec_items = list(rec_items.items())
        random.shuffle(rec_items)
        return rec_items[:N]
    
    return GetRecommendation

In [6]:
if __name__ == "__main__":
    dataset = Dataset('./ml-1m/ratings.dat')
    data = dataset.loadData()
    train, test = dataset.splitData(data)

Func loadData, run time: 2.5411453247070312
