In [1]:
import os
import sys
# 如果当前代码文件运行测试需要加入修改路径，避免出现后导包问题
BASE_DIR = os.path.dirname(os.path.dirname(os.getcwd()))
sys.path.insert(0, os.path.join(BASE_DIR))
print(BASE_DIR)
PYSPARK_PYTHON = "/miniconda2/envs/reco_sys/bin/python"
# 当存在多个版本时，不指定很可能会导致出错
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
from offline import SparkSessionBase

/root/toutiao_project/reco_sys


In [2]:
class OriginArticleData(SparkSessionBase):
    
    SPARK_APP_NAME = "mergeArticle"
    SPARK_URL = "yarn"

    ENABLE_HIVE_SUPPORT = True
    
    def __init__(self):
        self.spark = self._create_spark_session()

In [3]:
oa = OriginArticleData()

In [4]:
# 进行文章 前两个表 的合并
oa.spark.sql("use toutiao")
# news_article_basic 与news_article_content, article_id
titlce_content = oa.spark.sql("select a.article_id, a.channel_id, a.title, b.content from news_article_basic a inner join news_article_content b on a.article_id=b.article_id where a.article_id=116636")


In [5]:
titlce_content.show()

+----------+----------+---------------+--------------------+
|article_id|channel_id|          title|             content|
+----------+----------+---------------+--------------------+
|    116636|        18|动态再平衡投资策略历史数据回测|<p>赚钱是个俗气的话题，但又是人...|
+----------+----------+---------------+--------------------+



In [6]:
# 进行title_content 与 文章频道名称合并
titlce_content.registerTempTable('temptable')

channel_title_content = oa.spark.sql("select t.*, n.channel_name from temptable t left join news_channel n on t.channel_id=n.channel_id")




In [7]:
channel_title_content.show()

+----------+----------+---------------+--------------------+------------+
|article_id|channel_id|          title|             content|channel_name|
+----------+----------+---------------+--------------------+------------+
|    116636|        18|动态再平衡投资策略历史数据回测|<p>赚钱是个俗气的话题，但又是人...|      python|
+----------+----------+---------------+--------------------+------------+



In [8]:
# 合并三个内容到一个字符串
import pyspark.sql.functions as F

sentence_df = channel_title_content.select("article_id", "channel_id", "channel_name", "title", "content", 
                            F.concat_ws(',', 
                                       channel_title_content.channel_name,
                                       channel_title_content.title,
                                       channel_title_content.content).alias('sentence'))


In [9]:
sentence_df.show()
# sentence_df.write.insertInto("article_data")

+----------+----------+------------+---------------+--------------------+--------------------+
|article_id|channel_id|channel_name|          title|             content|            sentence|
+----------+----------+------------+---------------+--------------------+--------------------+
|    116636|        18|      python|动态再平衡投资策略历史数据回测|<p>赚钱是个俗气的话题，但又是人...|python,动态再平衡投资策略历...|
+----------+----------+------------+---------------+--------------------+--------------------+



In [10]:
# 读取文章，进行每篇张分词
oa.spark.sql("use article")
article_data = oa.spark.sql("select * from article_data limit 10")
article_data.show()


+----------+----------+------------+--------------------+--------------------+--------------------+
|article_id|channel_id|channel_name|               title|             content|            sentence|
+----------+----------+------------+--------------------+--------------------+--------------------+
|         1|        17|          前端|     Vue props用法小结原荐|<p><strong>Vue pr...|前端,Vue props用法小结原...|
|         2|        17|          前端|vue.js响应式原理解析与实现—...|<p>上次我们已经分析了vue.j...|前端,vue.js响应式原理解析与...|
|         3|        17|          前端|JavaScript中浅拷贝和深拷...|<p>要理解 JavaScript...|前端,JavaScript中浅拷贝...|
|         4|        17|          前端|基于vue2.0 +vuex+ e...|<p>效果演示地址,</p><p>...|前端,基于vue2.0 +vuex...|
|         5|        17|          前端|immutability因Reac...|<p><img src="http...|前端,immutability因R...|
|         6|        17|          前端|简单了解 node npm cnp...|<span id="OSC_h1_...|前端,简单了解 node npm ...|
|         7|        17|          前端|       Web工程师以太坊入门原荐|<p>我经常构建使用以太坊的Web...|前端,Web工程师以太坊入门原荐,...|


In [11]:
# 文章数据进行分词处理,得到分词结果
# 分词
def segmentation(partition):
    import os
    import re

    import jieba
    import jieba.analyse
    import jieba.posseg as pseg
    import codecs

    abspath = "/root/words"

    # 结巴加载用户词典
    userDict_path = os.path.join(abspath, "ITKeywords.txt")
    jieba.load_userdict(userDict_path)

    # 停用词文本
    stopwords_path = os.path.join(abspath, "stopwords.txt")

    def get_stopwords_list():
        """返回stopwords列表"""
        stopwords_list = [i.strip()
                          for i in codecs.open(stopwords_path).readlines()]
        return stopwords_list

    # 所有的停用词列表
    stopwords_list = get_stopwords_list()

    # 分词
    def cut_sentence(sentence):
        """对切割之后的词语进行过滤，去除停用词，保留名词，英文和自定义词库中的词，长度大于2的词"""
        # print(sentence,"*"*100)
        # eg:[pair('今天', 't'), pair('有', 'd'), pair('雾', 'n'), pair('霾', 'g')]
        seg_list = pseg.lcut(sentence)
        seg_list = [i for i in seg_list if i.flag not in stopwords_list]
        filtered_words_list = []
        for seg in seg_list:
            # print(seg)
            if len(seg.word) <= 1:
                continue
            elif seg.flag == "eng":
                if len(seg.word) <= 2:
                    continue
                else:
                    filtered_words_list.append(seg.word)
            elif seg.flag.startswith("n"):
                filtered_words_list.append(seg.word)
            elif seg.flag in ["x", "eng"]:  # 是自定一个词语或者是英文单词
                filtered_words_list.append(seg.word)
        return filtered_words_list

    for row in partition:
        sentence = re.sub("<.*?>", "", row.sentence)    # 替换掉标签数据
        words = cut_sentence(sentence)
        yield row.article_id, row.channel_id, words


In [12]:
words_df = article_data.rdd.mapPartitions(segmentation).toDF(['article_id', 'channel_id', 'words'])

In [13]:
words_df.show()

+----------+----------+--------------------+
|article_id|channel_id|               words|
+----------+----------+--------------------+
|         1|        17|[Vue, props, 用法, ...|
|         2|        17|[vue, 响应式, 原理, mo...|
|         3|        17|[JavaScript, 浅拷贝,...|
|         4|        17|[vue2, vuex, elem...|
|         5|        17|[immutability, Re...|
|         6|        17|[node, npm, cnpm,...|
|         7|        17|[Web, 工程师, 以太坊, 入...|
|         8|        17|[Web, pa, api, we...|
|         9|        17|[vue, 中用, 数据驱动, 视...|
|        10|        17|[程序, WebSocket, 长...|
+----------+----------+--------------------+



In [14]:
# 先计算分词之后的每篇文章的词频，得到CV模型
# 统计所有文章不同的词，组成一个词列表 words_list = [1,2,3,,34,4,45,56,67,78,8.......,,,,.]
from pyspark.ml.feature import CountVectorizer
cv = CountVectorizer(inputCol='words', outputCol='countFeatures', vocabSize=2000, minDF=1.0)
cv_model = cv.fit(words_df)

# 然后根据词频计算IDF以及词，得到IDF模型


In [15]:
cv_model.write().overwrite().save("hdfs://hadoop-master:9000/headlines/models/test.model")

In [16]:
from pyspark.ml.feature import CountVectorizerModel
cv_m = CountVectorizerModel.load("hdfs://hadoop-master:9000/headlines/models/test.model")

In [17]:
cv_result = cv_m.transform(words_df)

In [18]:
cv_result.show()

+----------+----------+--------------------+--------------------+
|article_id|channel_id|               words|       countFeatures|
+----------+----------+--------------------+--------------------+
|      4273|        15|[javascript, reac...|(986,[2,4,9,10,11...|
|      4274|        19|[java, java, 笔记, ...|(986,[0,1,8,16,17...|
|      4275|        19|[java, 传统, 方式, 类继...|(986,[1,2,8,16,18...|
|      4276|        15|[javascript, Vue,...|(986,[1,2,4,6,8,1...|
|      4278|        15|[javascript, 作用域链...|(986,[2,3,10,18,2...|
|      4279|        19|[java, springboot...|(986,[1,8,11,23,2...|
|      4280|        19|[java, Jedis, 工具类...|(986,[1,2,4,5,7,8...|
|      4281|        19|[java, java, 记录, ...|(986,[2,16,23,32,...|
|      4282|        15|[javascript, VueS...|(986,[2,4,6,10,16...|
|      4283|        15|[javascript, 体积, ...|(986,[2,3,4,10,11...|
+----------+----------+--------------------+--------------------+



In [19]:
# IDF 模型
from pyspark.ml.feature import IDF
idf = IDF(inputCol="countFeatures", outputCol="idfFeatures")
idfModel = idf.fit(cv_result)
idfModel.write().overwrite().save("hdfs://hadoop-master:9000/headlines/models/testIDF.model")

In [20]:
# 可以进行转换
cv_m.vocabulary

['&#',
 'String',
 '代码',
 '作用域',
 'pa',
 'key',
 '客户端',
 'jedis',
 'public',
 'Hooks',
 '函数',
 'ul',
 '组件',
 'scope',
 'return',
 '模块',
 '方法',
 'import',
 '时候',
 'count',
 'res',
 '.h',
 'this',
 'java',
 '问题',
 'Long',
 'Override',
 'class',
 'com',
 '声明',
 'web',
 'name',
 '线程',
 'constructor',
 'value',
 '逻辑',
 'props',
 'useEffect',
 'node',
 'start',
 '插件',
 '项目',
 'field',
 'rams',
 'vue',
 'useState',
 'arg',
 'Jedis',
 'event',
 'command',
 'jedisPool',
 '服务端',
 'action',
 '例子',
 '官方',
 'Enumeration',
 '参数',
 'state',
 'util',
 'close',
 'function',
 '情况',
 'catch',
 'title',
 'const',
 '文件',
 'new',
 'set',
 'jedisCluster',
 'void',
 'redis',
 'getResource',
 'clients',
 '标识',
 'onParseClientResp',
 'Thread',
 'enu',
 '.a',
 'from',
 'hooks',
 '页面',
 '全局',
 'get',
 '大家',
 'end',
 'fireEvent',
 'react',
 'document',
 'times',
 'clicked',
 'You',
 'listener',
 '赋值',
 'Vector',
 'server',
 '结果',
 'client',
 'var',
 'bean',
 'isOnline',
 '降级',
 'tml',
 'android',
 'toast',
 'Strin

In [21]:
idfModel.idf.toArray()[:20]

array([1.29928298, 1.70474809, 0.31845373, 1.70474809, 0.        ,
       0.78845736, 1.29928298, 2.39789527, 1.70474809, 2.39789527,
       0.6061358 , 0.2006707 , 1.01160091, 1.70474809, 0.45198512,
       1.70474809, 0.45198512, 2.39789527, 0.45198512, 2.39789527])

In [22]:
# IDF对CV结果进行计算TFIDF
from pyspark.ml.feature import IDFModel
idf_model = IDFModel.load("hdfs://hadoop-master:9000/headlines/models/testIDF.model")
tfidf_res = idf_model.transform(cv_result)

In [23]:
tfidf_res.show()

+----------+----------+--------------------+--------------------+--------------------+
|article_id|channel_id|               words|       countFeatures|         idfFeatures|
+----------+----------+--------------------+--------------------+--------------------+
|      4273|        15|[javascript, reac...|(986,[2,4,9,10,11...|(986,[2,4,9,10,11...|
|      4274|        19|[java, java, 笔记, ...|(986,[0,1,8,16,17...|(986,[0,1,8,16,17...|
|      4275|        19|[java, 传统, 方式, 类继...|(986,[1,2,8,16,18...|(986,[1,2,8,16,18...|
|      4276|        15|[javascript, Vue,...|(986,[1,2,4,6,8,1...|(986,[1,2,4,6,8,1...|
|      4278|        15|[javascript, 作用域链...|(986,[2,3,10,18,2...|(986,[2,3,10,18,2...|
|      4279|        19|[java, springboot...|(986,[1,8,11,23,2...|(986,[1,8,11,23,2...|
|      4280|        19|[java, Jedis, 工具类...|(986,[1,2,4,5,7,8...|(986,[1,2,4,5,7,8...|
|      4281|        19|[java, java, 记录, ...|(986,[2,16,23,32,...|(986,[2,16,23,32,...|
|      4282|        15|[javascript, VueS...

In [24]:
# 1265词的 {索引 以及 权重}
def func(partition):
    TOPK = 20
    for row in partition:
        # 找到索引与IDF值并进行排序
        _ = list(zip(row.idfFeatures.indices, row.idfFeatures.values))
        _ = sorted(_, key=lambda x: x[1], reverse=True)
        result = _[:TOPK]
        for word_index, tfidf in result:
            yield row.article_id, row.channel_id, int(word_index), round(float(tfidf), 4)
kewords_tfidf = tfidf_res.rdd.mapPartitions(func).toDF(['article_id', 'channel_id', 'index', 'weights'])

In [25]:
kewords_tfidf.show()

+----------+----------+-----+-------+
|article_id|channel_id|index|weights|
+----------+----------+-----+-------+
|         1|        17|  244|68.1899|
|         1|        17|  829|58.4677|
|         1|        17|   12|55.6381|
|         1|        17|   36|52.8472|
|         1|        17|  165|31.1828|
|         1|        17|  186| 27.276|
|         1|        17|  121|15.3427|
|         1|        17|    1| 13.638|
|         1|        17|  810| 13.638|
|         1|        17|  114|11.2996|
|         1|        17|  319|10.2285|
|         1|        17|  137| 8.1357|
|         1|        17|  192|  6.819|
|         1|        17|  287|  6.819|
|         1|        17|  296|  6.819|
|         1|        17|  779|  6.819|
|         1|        17|   27| 5.5192|
|         1|        17|  143| 5.5192|
|         1|        17|  756| 5.5192|
|         1|        17|  570| 5.1142|
+----------+----------+-----+-------+
only showing top 20 rows



In [26]:
# 利用keywordsIndex = ktt.spark.sql("select keyword, index idx from idf_keywords_values")中标，知道索引对应的词
idf_keywords_values = oa.spark.sql("select keyword, index idx from idf_keywords_values")

In [27]:
keyword_str_tfidf = kewords_tfidf.join(idf_keywords_values, idf_keywords_values.idx==kewords_tfidf.index).select(["article_id", "channel_id", "keyword", "weights"])

keyword_str_tfidf.show()




+----------+----------+-------+-------+
|article_id|channel_id|keyword|weights|
+----------+----------+-------+-------+
|         4|        17|  https| 2.4245|
|         2|        17|   人工智能|34.5497|
|         5|        17|   人工智能| 6.0614|
|         6|        17|     脚本| 5.1142|
|         8|        17|     价格| 8.5237|
|         4|        17|  close| 2.5986|
|         5|        17|     功能| 4.0464|
|         6|        17|     功能| 7.0812|
|         8|        17|     功能| 3.0348|
|         1|        17|  right|  6.819|
|         3|        17|     &#|37.6792|
|         6|        17|     &#| 7.7957|
|         1|        17|     阶段|  6.819|
|         3|        17|   lang| 4.0464|
|         8|        17|     架构| 1.7047|
|         6|        17|    互联网| 5.1971|
|         2|        17|     用户|78.7977|
|        10|        17|     用户| 4.8491|
|         5|        17|     思路| 5.1142|
|         2|        17|   else| 9.4615|
+----------+----------+-------+-------+
only showing top 20 rows



In [28]:
# texrank
# 分词
def textrank(partition):
    import os

    import jieba
    import jieba.analyse
    import jieba.posseg as pseg
    import codecs

    abspath = "/root/words"

    # 结巴加载用户词典
    userDict_path = os.path.join(abspath, "ITKeywords.txt")
    jieba.load_userdict(userDict_path)

    # 停用词文本
    stopwords_path = os.path.join(abspath, "stopwords.txt")

    def get_stopwords_list():
        """返回stopwords列表"""
        stopwords_list = [i.strip()
                          for i in codecs.open(stopwords_path).readlines()]
        return stopwords_list

    # 所有的停用词列表
    stopwords_list = get_stopwords_list()

    class TextRank(jieba.analyse.TextRank):
        def __init__(self, window=20, word_min_len=2):
            super(TextRank, self).__init__()
            self.span = window  # 窗口大小
            self.word_min_len = word_min_len  # 单词的最小长度
            # 要保留的词性，根据jieba github ，具体参见https://github.com/baidu/lac
            self.pos_filt = frozenset(
                ('n', 'x', 'eng', 'f', 's', 't', 'nr', 'ns', 'nt', "nw", "nz", "PER", "LOC", "ORG"))

        def pairfilter(self, wp):
            """过滤条件，返回True或者False"""

            if wp.flag == "eng":
                if len(wp.word) <= 2:
                    return False

            if wp.flag in self.pos_filt and len(wp.word.strip()) >= self.word_min_len \
                    and wp.word.lower() not in stopwords_list:
                return True
    # TextRank过滤窗口大小为5，单词最小为2
    textrank_model = TextRank(window=5, word_min_len=2)
    allowPOS = ('n', "x", 'eng', 'nr', 'ns', 'nt', "nw", "nz", "c")

    for row in partition:
        tags = textrank_model.textrank(row.sentence, topK=20, withWeight=True, allowPOS=allowPOS, withFlag=False)
        for tag in tags:
            yield row.article_id, row.channel_id, tag[0], tag[1]

In [29]:
textrank = article_data.rdd.mapPartitions(textrank).toDF(["article_id", "channel_id", "keyword", "textrank"])