In [1]:
import os
import sys
# 如果当前代码文件运行测试需要加入修改路径，避免出现后导包问题
BASE_DIR = os.path.dirname(os.path.dirname(os.getcwd()))
sys.path.insert(0, os.path.join(BASE_DIR))

PYSPARK_PYTHON = "/miniconda2/envs/reco_sys/bin/python"
# 当存在多个版本时，不指定很可能会导致出错
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON

from offline import SparkSessionBase

class UpdateRecall(SparkSessionBase):

    SPARK_APP_NAME = "updateRecall"
    ENABLE_HIVE_SUPPORT = True

    def __init__(self):
        self.spark = self._create_spark_session()

ur = UpdateRecall()

In [2]:
# 读取数据库，进行类型转换
ur.spark.sql("use profile")
user_article_basic = ur.spark.sql("select * from user_article_basic").select(['user_id', 'article_id', 'clicked'])

In [3]:
user_article_basic.show()

+-------------------+-------------------+-------+
|            user_id|         article_id|clicked|
+-------------------+-------------------+-------+
|1105045287866466304|              14225|  false|
|1106476833370537984|              14208|  false|
|1109980466942836736|              19233|  false|
|1109980466942836736|              44737|  false|
|1109993249109442560|              17283|  false|
|1111189494544990208|              19322|  false|
|1111524501104885760|              44161|  false|
|1112727762809913344|              18172|   true|
|1113020831425888256|1112592065390182400|  false|
|1114863735962337280|              17665|  false|
|1114863741448486912|              14208|  false|
|1114863751909081088|              13751|  false|
|1114863846486441984|              17940|  false|
|1114863941936218112|              15196|  false|
|1114863998437687296|              19233|  false|
|1114864164158832640|             141431|  false|
|1114864237131333632|              13797|  false|


In [4]:
# 需要对clicked进行类型转换
def boolen_to_int(row):
    return row.user_id, row.article_id, int(row.clicked)
    
    
user_article_basic = user_article_basic.rdd.map(boolen_to_int).toDF(['user_id', 'article_id', 'clicked'])

In [5]:
user_article_basic

DataFrame[user_id: bigint, article_id: bigint, clicked: bigint]

In [7]:
# 转换user_id和article_id形式
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

userindexer = StringIndexer(inputCol='user_id', outputCol='als_user_id')
articleindexer = StringIndexer(inputCol='article_id', outputCol='als_article_id')
pip = Pipeline(stages=[userindexer, articleindexer])
pip_model = pip.fit(user_article_basic)
user_article_ = pip_model.transform(user_article_basic)

In [8]:
from pyspark.ml.recommendation import ALS
als = ALS(userCol='als_user_id', itemCol='als_article_id', ratingCol='clicked')
model = als.fit(user_article_)
res = model.recommendForAllUsers(100)

In [9]:
res.show()

+-----------+--------------------+
|als_user_id|     recommendations|
+-----------+--------------------+
|         31|[[255,0.1802026],...|
|         65|[[0,0.0], [10,0.0...|
|         53|[[0,0.0], [10,0.0...|
|         34|[[255,0.06407572]...|
|         28|[[255,0.010229035...|
|         26|[[255,0.093366005...|
|         27|[[255,0.18162085]...|
|         44|[[255,0.19217382]...|
|         12|[[255,0.4196051],...|
|         22|[[255,0.14635552]...|
|         47|[[255,0.18675834]...|
|          1|[[255,0.22854869]...|
|         52|[[255,0.24745138]...|
|         13|[[207,0.14864287]...|
|          6|[[255,0.26363456]...|
|         16|[[255,0.240065], ...|
|          3|[[207,0.9093479],...|
|         20|[[255,0.21114598]...|
|         40|[[336,0.060219713...|
|         57|[[255,0.18626273]...|
+-----------+--------------------+
only showing top 20 rows



In [10]:
# 解析推荐结果中的用户ID index到原来的用户ID，文章IDindex 到原始文章ID
user_alsuser = user_article_.groupBy(['user_id']).max('als_user_id').withColumnRenamed('max(als_user_id)', 'als_user_id')
article_als_article = user_article_.groupBy(['article_id']).max('als_article_id').withColumnRenamed('max(als_article_id)', 'als_article_id')






In [11]:
user_alsuser.show()

+-------------------+-----------+
|            user_id|als_user_id|
+-------------------+-----------+
|1106473203766657024|       26.0|
|1113049054452908032|       44.0|
|1114863751909081088|       37.0|
|1115534909935452160|       42.0|
|1113100263847100416|       54.0|
|1103195673450250240|        5.0|
|1105045287866466304|       28.0|
|1114864237131333632|        4.0|
|1111524501104885760|       49.0|
|1109995264376045568|       19.0|
|1105105185656537088|       46.0|
|1110071654421102592|       64.0|
|1114863965080387584|       65.0|
|1114864128259784704|       17.0|
|1114864233264185344|       40.0|
|1115436666438287360|       29.0|
|1114863846486441984|        2.0|
|1115089292662669312|       13.0|
|1113316420155867136|       72.0|
|1114863902073552896|       16.0|
+-------------------+-----------+
only showing top 20 rows



In [12]:
res = res.join(user_alsuser, on=['als_user_id'], how='left')

In [13]:
res.show()

+-----------+--------------------+-------------------+
|als_user_id|     recommendations|            user_id|
+-----------+--------------------+-------------------+
|          8|[[263,0.35747305]...|1109976363453906944|
|         67|[[207,0.48211247]...|1114096769035141120|
|         70|[[255,0.38544005]...|1115534898262704128|
|          0|[[255,0.5872822],...|1106396183141548032|
|         69|[[0,0.0], [10,0.0...|1114094806092480512|
|          7|[[255,0.17604727]...|1111189494544990208|
|         49|[[0,0.0], [10,0.0...|1111524501104885760|
|         29|[[255,0.10245587]...|1115436666438287360|
|         64|[[255,0.09147509]...|1110071654421102592|
|         47|[[255,0.18675834]...|1112995431274512384|
|         42|[[255,0.24499115]...|1115534909935452160|
|         44|[[255,0.19217382]...|1113049054452908032|
|         35|[[255,0.6367457],...|                  4|
|         62|[[207,0.09850311]...|1114863741448486912|
|         18|[[255,0.19392662]...|1114864164158832640|
|         

In [14]:
import pyspark.sql.functions as F

res = res.withColumn('recommendation', F.explode('recommendations')).drop('recommendations').drop('als_user_id')

In [15]:
res.show()

+-------------------+-----------------+
|            user_id|   recommendation|
+-------------------+-----------------+
|1109976363453906944| [263,0.35747305]|
|1109976363453906944| [181,0.21230958]|
|1109976363453906944| [207,0.14445771]|
|1109976363453906944|  [224,0.1438985]|
|1109976363453906944|  [336,0.1406238]|
|1109976363453906944| [185,0.08199209]|
|1109976363453906944| [210,0.07963402]|
|1109976363453906944| [23,0.077265725]|
|1109976363453906944| [204,0.07539557]|
|1109976363453906944| [184,0.07539557]|
|1109976363453906944|[287,0.074107625]|
|1109976363453906944|[275,0.074107625]|
|1109976363453906944|[305,0.074107625]|
|1109976363453906944|[335,0.074107625]|
|1109976363453906944|[246,0.074107625]|
|1109976363453906944|[280,0.074107625]|
|1109976363453906944|[269,0.074107625]|
|1109976363453906944|[267,0.074107625]|
|1109976363453906944|[342,0.074107625]|
|1109976363453906944|[341,0.074107625]|
+-------------------+-----------------+
only showing top 20 rows



In [16]:
# 取出article_id去进行方向解析原来的ID
def get_als_article_id(row):
    return row.user_id, row.recommendation[0]
res = res.rdd.map(get_als_article_id).toDF(['user_id', 'als_article_id'])

In [17]:
res.show()

+-------------------+--------------+
|            user_id|als_article_id|
+-------------------+--------------+
|1109976363453906944|           263|
|1109976363453906944|           181|
|1109976363453906944|           207|
|1109976363453906944|           224|
|1109976363453906944|           336|
|1109976363453906944|           185|
|1109976363453906944|           210|
|1109976363453906944|            23|
|1109976363453906944|           204|
|1109976363453906944|           184|
|1109976363453906944|           287|
|1109976363453906944|           275|
|1109976363453906944|           305|
|1109976363453906944|           335|
|1109976363453906944|           246|
|1109976363453906944|           280|
|1109976363453906944|           269|
|1109976363453906944|           267|
|1109976363453906944|           342|
|1109976363453906944|           341|
+-------------------+--------------+
only showing top 20 rows



In [18]:
res = res.join(article_als_article, on=['als_article_id'], how='left')

In [19]:
res.show()

+--------------+-------------------+----------+
|als_article_id|            user_id|article_id|
+--------------+-------------------+----------+
|           299|1109976363453906944|     13890|
|           299|1114096769035141120|     13890|
|           299|1115534898262704128|     13890|
|           299|1106396183141548032|     13890|
|           299|1115534909935452160|     13890|
|           299|                  4|     13890|
|           299|1114863741448486912|     13890|
|           299|1114864874141253632|     13890|
|           299|1115534631668547584|     13890|
|           299|1113053603926376448|     13890|
|           299|1115504747848138752|     13890|
|           299|                 23|     13890|
|           299|1105093883106164736|     13890|
|           299|1114863991156375552|     13890|
|           299|1114864434305564672|     13890|
|           299|1109980466942836736|     13890|
|           299|                 38|     13890|
|           299|1114863846486441984|    

In [20]:
ur.spark.sql("use toutiao")
news_article_basic = ur.spark.sql("select article_id, channel_id from news_article_basic")

In [21]:
res = res.join(news_article_basic, on=['article_id'], how='left')

In [22]:
res.show()

+----------+--------------+-------------------+----------+
|article_id|als_article_id|            user_id|channel_id|
+----------+--------------+-------------------+----------+
|     13890|           299|1106396183141548032|        18|
|     13890|           299|1111189494544990208|        18|
|     13890|           299|1115436666438287360|        18|
|     13890|           299|1110071654421102592|        18|
|     13890|           299|1112995431274512384|        18|
|     13890|           299|1113049054452908032|        18|
|     13890|           299|1114864164158832640|        18|
|     13890|           299|1108264901190615040|        18|
|     13890|           299|1114863751909081088|        18|
|     13890|           299|1114865014205841408|        18|
|     13890|           299|1114867217272406016|        18|
|     13890|           299|1114864237131333632|        18|
|     13890|           299|1112715153402494976|        18|
|     13890|           299|1112727762809913344|        1

In [23]:
# 得到每篇文章的channel_id
# 按照用户和channel_id分组
res = res.groupBy(['user_id', 'channel_id']).agg(F.collect_list('article_id')).withColumnRenamed('collect_list(article_id)', 'article_list')
res.show()



+-------------------+----------+--------------------+
|            user_id|channel_id|        article_list|
+-------------------+----------+--------------------+
|1110071654421102592|        18|[13890, 14915, 18...|
|1111524501104885760|         5|            [141440]|
|1114863941936218112|         7|            [141437]|
|1113244157343694848|         7|            [141437]|
|1112995431274512384|         5|            [141440]|
|1114864874141253632|        18|[13890, 14915, 18...|
|                 33|        13|            [141431]|
|1114864434305564672|        13|            [141431]|
|1108264901190615040|         7|            [141437]|
|1106473203766657024|      null|[1112592065390182...|
|1106396183141548032|         7|            [141437]|
|1105093883106164736|         5|            [141440]|
|1115629498121846784|        18|[13890, 14915, 18...|
|1103195673450250240|         5|            [141440]|
|1114863735962337280|        13|            [141431]|
|1114864237131333632|       

In [24]:
res = res.dropna()

In [32]:
def save_offline_recall_hbase(partition):
    """保存用户模型召回的结果
    """
    import happybase
    pool = happybase.ConnectionPool(size=10, host='hadoop-master', port=9090)
    for row in partition:
        with pool.connection() as conn:
            # 读取历史召回结果
            history_recall = conn.table('history_recall')
            data = history_recall.cells('reco:his:{}'.format(row.user_id).encode(),
                                        'channel:{}'.format(row.channel_id).encode())
            # 进行过滤
            history_list = []
            if len(data) >=2:
                for l in data[:-1]:
                    history_list.extend(l)
            else:
                history_list = []
            
            reco_set = list(set(row.article_list) - set(history_list))
            
            # 存储到召回结果表，一份到历史召回表
            if reco_set:
                cb_recall = conn.table('cb_recall')
                cb_recall.put('recall:user:{}'.format(row.user_id).encode(), 
                              {'als:{}'.format(row.channel_id).encode(): str(reco_set).encode()})
                # 历史表中也需要插入一份
                history_recall.put("reco:his:{}".format(row.user_id).encode(),
                                   {'channel:{}'.format(row.channel_id): str(reco_set).encode()})
            conn.close()
    
res.foreachPartition(save_offline_recall_hbase)

In [25]:
# 基于内容召回的结果推荐
# 过滤点击过的文章
ur.spark.sql("use profile")
user_article_basic = ur.spark.sql("select * from user_article_basic").filter('clicked=True')

In [27]:
user_article_basic.show()

+-------------------+-------------------+-------------------+----------+------+-------+---------+--------+---------+
|            user_id|        action_time|         article_id|channel_id|shared|clicked|collected|exposure|read_time|
+-------------------+-------------------+-------------------+----------+------+-------+---------+--------+---------+
|1112727762809913344|2019-04-03 12:51:57|              18172|        18| false|   true|     true|    true|    19413|
|                  1|2019-03-07 16:57:34|              44386|        18| false|   true|    false|    true|    17850|
|1109976363453906944|2019-03-25 11:52:31|              13728|        18| false|   true|    false|    true|    14218|
|1114864354622177280|2019-04-09 16:39:22|              17304|        18| false|   true|    false|    true|         |
|                 23|2019-04-03 08:10:23|              44739|        18| false|   true|    false|    true|     7013|
|                  1|2019-03-17 10:32:01|              17632|   

In [35]:
def save_clicked_similar_article_recall(partition):
    """保存用户点击过的文章的相似文章到召回表中
    """
    import happybase
    pool = happybase.ConnectionPool(size=10, host='hadoop-master')

    # 进行为相似文章获取
    with pool.connection() as conn:

        # key:   article_id,    column:  similar:article_id
        similar_table = conn.table('article_similar')
        # 循环partition
        for row in partition:
            # 获取相似文章结果表
            similar_article = similar_table.row(str(row.article_id).encode(),
                                                columns=[b'similar'])
            # 相似文章相似度排序过滤，召回不需要太大的数据， 百个，千
            _srt = sorted(similar_article.items(), key=lambda item: item[1], reverse=True)
            if _srt:
                # 每次行为推荐10篇文章
                reco_article = [int(i[0].split(b':')[1]) for i in _srt][:10]

                # 获取历史看过的该频道文章
                history_table = conn.table('history_recall')
                # 多个版本
                data = history_table.cells('reco:his:{}'.format(row.user_id).encode(),
                                           'channel:{}'.format(row.channel_id).encode())

                history = []
                if len(data) >= 2:
                    for l in data[:-1]:
                        history.extend(eval(l))
                else:
                    history = []

                # 过滤reco_article与history
                reco_res = list(set(reco_article) - set(history))

                # 进行推荐，放入基于内容的召回表当中以及历史看过的文章表当中
                if reco_res:
                    # content_table = conn.table('cb_content_recall')
                    content_table = conn.table('cb_recall')
                    content_table.put("recall:user:{}".format(row.user_id).encode(),
                                      {'content:{}'.format(row.channel_id).encode(): str(reco_res).encode()})

                    # 放入历史推荐过文章
                    history_table.put("reco:his:{}".format(row.user_id).encode(),
                                      {'channel:{}'.format(row.channel_id).encode(): str(reco_res).encode()})

        conn.close()

user_article_basic.foreachPartition(save_clicked_similar_article_recall)