In [1]:
%config ZMQInteractiveShell.ast_node_interactivity='all'
import os
import sys

BASE_DIR = "/opt/codes/lesson1"
sys.path.insert(0, os.path.join(BASE_DIR))
print(BASE_DIR)

PYSPARK_PYTHON = "/usr/bin/python3.6"
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
os.environ["JAVA_HOME"] = '/usr/lib/jvm/java-8-openjdk-amd64'
from offline import SparkSessionBase  # import init文件

/opt/codes/lesson1


In [2]:
class OriginArticleData(SparkSessionBase):
    
    SPARK_APP_NAME = "Lesson1"
    # SPARK_URL = "yarn"

    ENABLE_HIVE_SUPPORT = True
    
    def __init__(self):
        self.spark = self._create_spark_session()

In [3]:
oa = OriginArticleData()

True (('spark.app.name', 'Lesson1'), ('spark.executor.memory', '8g'), ('spark.executor.cores', 8), ('spark.executor.instances', 8), ('hive.metastore.uris', 'thrift://172.18.0.2:9083'))


In [4]:
# 读取文章，进行每篇张分词
oa.spark.sql("use article")
article_data = oa.spark.sql("select * from article_data")
# article_data.count()
# article_data.show()


DataFrame[]

In [5]:
# 文章数据进行分词处理,得到分词结果
# 分词
def segmentation(partition):
    import os
    import re

    import jieba
    import jieba.analyse
    import jieba.posseg as pseg
    import codecs

    abspath = "/opt/words/"

    # 结巴加载用户词典
    userDict_path = os.path.join(abspath, "ITKeywords.txt")
    jieba.load_userdict(userDict_path)

    # 停用词文本
    stopwords_path = os.path.join(abspath, "stopwords.txt")

    def get_stopwords_list():
        """返回stopwords列表"""
        stopwords_list = [i.strip()
                          for i in codecs.open(stopwords_path, encoding='utf-8').readlines()]
        return stopwords_list

    # 所有的停用词列表
    stopwords_list = get_stopwords_list()

    # 分词
    def cut_sentence(sentence):
        """对切割之后的词语进行过滤，去除停用词，保留名词，英文和自定义词库中的词，长度大于2的词"""
        # print(sentence,"*"*100)
        # eg:[pair('今天', 't'), pair('有', 'd'), pair('雾', 'n'), pair('霾', 'g')]
        seg_list = pseg.lcut(sentence)
        seg_list = [i for i in seg_list if i.flag not in stopwords_list]
        filtered_words_list = []
        for seg in seg_list:
            # print(seg)
            if len(seg.word) <= 1:
                continue
            elif seg.flag == "eng":
                if len(seg.word) <= 2:
                    continue
                else:
                    filtered_words_list.append(seg.word)
            elif seg.flag.startswith("n"):
                filtered_words_list.append(seg.word)
            elif seg.flag in ["x", "eng"]:  # 是自定一个词语或者是英文单词
                filtered_words_list.append(seg.word)
        return filtered_words_list

    for row in partition:
        sentence = re.sub("<.*?>", "", row.sentence)    # 替换掉标签数据
        words = cut_sentence(sentence)
        yield row.article_id, row.channel_id, words


In [6]:
words_df = article_data.rdd.mapPartitions(segmentation).toDF(['article_id', 'channel_id', 'words'])
# words_df.count()
# words_df.show()

In [7]:
words_df.head()

Row(article_id=1, channel_id=17, words=['Vue', 'props', '用法', '小结', 'Vue', 'props', '用法', '组件', '选项', 'props', 'Vue', '选项', '父子', '组件', '关系', '总结', 'props', 'down', 'events', '组件', 'props', '传递数据', '组件', '组件', 'events', '组件', '发送消息', '父子', '组件', '组件', 'pa', 'rent', 'child', '组件', '环境', '书写', '组件', '可维护性', '定义', '父子', '组件', 'Vue', '对象', 'var', 'childNode', 'template', 'div', 'childNode', 'div', 'var', 'pa', 'rentNode', 'template', 'div', 'child', 'child', 'child', 'child', 'div', 'components', 'child', 'childNode', '全栈', '交流', 'Ian', '人员', '技术', '瓶颈', '思维能力', 'new', 'Vue', 'example', 'components', 'pa', 'rent', 'pa', 'rentNode', 'div', 'example', 'pa', 'rent', 'pa', 'rent', 'div', 'childNode', '定义', 'template', 'div', '内容', 'childNode', '字符串', 'pa', 'rentNode', 'template', '定义', 'div', 'class', 'pa', 'rent', 'child', '组件', '静态', 'props', '组件', '实例', '作用域', '组件', '模板', '饮用', '组件', '数据', '组件', '组件', '数据', '组件', 'props', '选项', '组件', '向子', '组件', '传递数据', '方式', '动态', '静态', '静态', '方式', '组件', '

In [7]:
# 先计算分词之后的每篇文章的词频，得到CV模型
# 统计所有文章不同的词，组成一个词列表 words_list = [1,2,3,,34,4,45,56,67,78,8.......,,,,.]
from pyspark.ml.feature import CountVectorizer
cv = CountVectorizer(inputCol='words', outputCol='countFeatures', vocabSize=1000, minDF=5)
cv_model = cv.fit(words_df)

# 然后根据词频计算IDF以及词，得到IDF模型


In [None]:
cv_model.write().overwrite().save("hdfs://hadoop-master:9000/headlines/models/test.model")

In [None]:
from pyspark.ml.feature import CountVectorizerModel
cv_m = CountVectorizerModel.load("hdfs://hadoop-master:9000/headlines/models/test.model")

In [None]:
cv_result = cv_m.transform(words_df)

In [None]:
cv_result.show()
cv_result.head()

In [12]:
# IDF 模型
from pyspark.ml.feature import IDF
idf = IDF(inputCol="countFeatures", outputCol="idfFeatures")
idfModel = idf.fit(cv_result)
idfModel.write().overwrite().save("hdfs://hadoop-master:9000/headlines/models/testIDF.model")

In [13]:
# IDF对CV结果进行计算TFIDF
from pyspark.ml.feature import IDFModel
idf_model = IDFModel.load("hdfs://hadoop-master:9000/headlines/models/testIDF.model")
tfidf_res = idf_model.transform(cv_result)

In [14]:
tfidf_res.show()
tfidf_res.head()

+----------+----------+---------------------------+--------------------+--------------------+
|article_id|channel_id|                      words|       countFeatures|         idfFeatures|
+----------+----------+---------------------------+--------------------+--------------------+
|         1|        17|     [Vue, props, 用法, ...|(1266,[0,1,3,4,5,...|(1266,[0,1,3,4,5,...|
|         2|        17|  [vue, 响应式, 原理, mo...|(1266,[0,1,2,3,4,...|(1266,[0,1,2,3,4,...|
|         3|        17|    [JavaScript, 浅拷贝,...|(1266,[0,1,5,7,12...|(1266,[0,1,5,7,12...|
|         4|        17|       [vue2, vuex, elem...|(1266,[1,2,4,9,12...|(1266,[1,2,4,9,12...|
|         5|        17|       [immutability, Re...|(1266,[1,3,4,5,6,...|(1266,[1,3,4,5,6,...|
|         6|        17|       [node, npm, cnpm,...|(1266,[1,2,9,12,1...|(1266,[1,2,9,12,1...|
|         7|        17|[Web, 工程师, 以太坊, 入...|(1266,[1,2,3,4,6,...|(1266,[1,2,3,4,6,...|
|         8|        17|       [Web, pa, api, we...|(1266,[1,2,9,30,3...|(1266

Row(article_id=1, channel_id=17, words=['Vue', 'props', '用法', '小结', 'Vue', 'props', '用法', '组件', '选项', 'props', 'Vue', '选项', '父子', '组件', '关系', '总结', 'props', 'down', 'events', '组件', 'props', '传递数据', '组件', '组件', 'events', '组件', '发送消息', '父子', '组件', '组件', 'pa', 'rent', 'child', '组件', '环境', '书写', '组件', '可维护性', '定义', '父子', '组件', 'Vue', '对象', 'var', 'childNode', 'template', 'div', 'childNode', 'div', 'var', 'pa', 'rentNode', 'template', 'div', 'child', 'child', 'child', 'child', 'div', 'components', 'child', 'childNode', '全栈', '交流', 'Ian', '人员', '技术', '瓶颈', '思维能力', 'new', 'Vue', 'example', 'components', 'pa', 'rent', 'pa', 'rentNode', 'div', 'example', 'pa', 'rent', 'pa', 'rent', 'div', 'childNode', '定义', 'template', 'div', '内容', 'childNode', '字符串', 'pa', 'rentNode', 'template', '定义', 'div', 'class', 'pa', 'rent', 'child', '组件', '静态', 'props', '组件', '实例', '作用域', '组件', '模板', '饮用', '组件', '数据', '组件', '组件', '数据', '组件', 'props', '选项', '组件', '向子', '组件', '传递数据', '方式', '动态', '静态', '静态', '方式', '组件', '

In [None]:
row = tfidf_res.head(1)[0]
row.idfFeatures
_ = list(zip(row.idfFeatures.indices, row.idfFeatures.values))
_ = sorted(_, key=lambda x: x[1], reverse=True)
_
result = _[:20]
result

In [34]:
# 1265词的 {索引 以及 权重}
def func(partition):
    TOPK = 20
    # 找到索引与IDF值并进行排序
    for row in partition:
        # 一行有很多个词，所以要用zip展开，分别拿到"索引-权重"的tuple值
        _ = list(zip(row.idfFeatures.indices, row.idfFeatures.values))
        _ = sorted(_, key=lambda x: x[1], reverse=True)
        result = _[:TOPK]  # 一行返回TOPK个重要的词
        for word_index, tfidf in result:  # 重新拼接值返回，对于当前行row的article_id、channel_id是一样的
            yield row.article_id, row.channel_id, int(word_index), round(float(tfidf), 4)
kewords_tfidf = tfidf_res.rdd.mapPartitions(func).toDF(['article_id', 'channel_id', 'index', 'weights'])

In [19]:
kewords_tfidf.count()
kewords_tfidf.show()
kewords_tfidf.head()

200

+----------+----------+-----+-------+
|article_id|channel_id|index|weights|
+----------+----------+-----+-------+
|         1|        17|   19|68.1899|
|         1|        17|   10|58.4677|
|         1|        17|    8|55.6381|
|         1|        17|   26|52.8472|
|         1|        17|   39|39.2092|
|         1|        17|   27|31.1828|
|         1|        17|   55|30.6855|
|         1|        17|   58|30.6855|
|         1|        17|   66| 27.276|
|         1|        17|  138|15.3427|
|         1|        17|  139|15.3427|
|         1|        17|  150| 13.638|
|         1|        17|  171| 13.638|
|         1|        17|  175| 13.638|
|         1|        17|    4|11.2996|
|         1|        17|  206|10.2285|
|         1|        17|  239| 8.5237|
|         1|        17|  267| 8.5237|
|         1|        17|   33| 8.1357|
|         1|        17|  294|  6.819|
+----------+----------+-----+-------+
only showing top 20 rows



Row(article_id=1, channel_id=17, index=19, weights=68.1899)

In [44]:
# 解析出词表中所有词的idf值
keywords_list_with_idf = list(zip(cv_model.vocabulary, idf_model.idf.toArray()))
# 重新组装数据格式[keyword, idf-weight, index]，比如，['this', 0.6061358035703155, 0]
def append_index(data):
    for index in range(len(data)):
        data[index] = list(data[index]) # 将元组转为list，比如('this', 0.6061358035703155)
        data[index].append(index)       # 加入单词的索引index
        data[index][1] = float(data[index][1])  # 转换单词的idf权重值的格式
append_index(keywords_list_with_idf)
rdd = oa.spark.sparkContext.parallelize(keywords_list_with_idf)  # 创建rdd
idf_keywords = rdd.toDF(["keywords", "idf", "index"])
# 把生成的idf数据写入hive表中
# idf_keywords.write.insertInto('idf_keywords_values')
idf_keywords.createOrReplaceTempView("tmp_idf_keywords")
new_sql = """
    insert overwrite table idf_keywords_values
    select *
    from tmp_idf_keywords
    """
oa.spark.sql(new_sql)

DataFrame[]

In [45]:
# 利用keywordsIndex = ktt.spark.sql("select keyword, index idx from idf_keywords_values")中标，知道索引对应的词
idf_keywords_values = oa.spark.sql("select keyword, index idx from idf_keywords_values")
idf_keywords_values.count()

1266

In [46]:
keyword_str_tfidf = kewords_tfidf.join(idf_keywords_values, idf_keywords_values.idx==kewords_tfidf.index).select(["article_id", "channel_id", "keyword", "weights"])

keyword_str_tfidf.count()
keyword_str_tfidf.show()

200

+----------+----------+-----------+-------+
|article_id|channel_id|    keyword|weights|
+----------+----------+-----------+-------+
|         1|        17|        div|68.1899|
|         1|        17|      child|58.4677|
|         1|        17|       组件|55.6381|
|         1|        17|      props|52.8472|
|         1|        17|forChildMsg|39.2092|
|         1|        17|        msg|31.1828|
|         1|        17|ownChildMsg|30.6855|
|         1|        17|  childNode|30.6855|
|         1|        17|   template| 27.276|
|         1|        17|   rentNode|15.3427|
|         1|        17|        Vue|15.3427|
|         1|        17|       prop| 13.638|
|         1|        17|       rent| 13.638|
|         1|        17|     String| 13.638|
|         1|        17|       数据|11.2996|
|         1|        17| components|10.2285|
|         1|        17|       模板| 8.5237|
|         1|        17|       警告| 8.5237|
|         1|        17|        for| 8.1357|
|         1|        17|       type|  6.8

In [47]:
def sort_by_tfidf(partition):
    TOPK = 20
    for row in partition:
        # 找到索引与IDF值并进行排序
        _dict = list(zip(row.idfFeatures.indices, row.idfFeatures.values))
        _dict = sorted(_dict, key=lambda x: x[1], reverse=True)
        result = _dict[:TOPK]
        for word_index, tfidf in result:
            yield row.article_id, row.channel_id, int(word_index), round(float(tfidf), 4)

keywords_by_tfidf = tfidf_res.rdd.mapPartitions(sort_by_tfidf).toDF(["article_id", "channel_id", "index", "weights"])
keywords_index = oa.spark.sql("select keyword, index idx from idf_keywords_values")
keywords_result = keywords_by_tfidf.join(keywords_index, keywords_index.idx == keywords_by_tfidf.index).select(["article_id", "channel_id", "keyword", "weights"])
keywords_result.write.insertInto("tfidf_keywords_values")

In [48]:
# texrank
# 分词
def textrank(partition):
    import os

    import jieba
    import jieba.analyse
    import jieba.posseg as pseg
    import codecs

    abspath = "/opt/words"

    # 结巴加载用户词典
    userDict_path = os.path.join(abspath, "ITKeywords.txt")
    jieba.load_userdict(userDict_path)

    # 停用词文本
    stopwords_path = os.path.join(abspath, "stopwords.txt")

    def get_stopwords_list():
        """返回stopwords列表"""
        stopwords_list = [i.strip()
                          for i in codecs.open(stopwords_path, encoding='utf-8').readlines()]
        return stopwords_list

    # 所有的停用词列表
    stopwords_list = get_stopwords_list()

    class TextRank(jieba.analyse.TextRank):
        def __init__(self, window=20, word_min_len=2):
            super(TextRank, self).__init__()
            self.span = window  # 窗口大小
            self.word_min_len = word_min_len  # 单词的最小长度
            # 要保留的词性，根据jieba github ，具体参见https://github.com/baidu/lac
            self.pos_filt = frozenset(
                ('n', 'x', 'eng', 'f', 's', 't', 'nr', 'ns', 'nt', "nw", "nz", "PER", "LOC", "ORG"))

        def pairfilter(self, wp):
            """过滤条件，返回True或者False"""

            if wp.flag == "eng":
                if len(wp.word) <= 2:
                    return False

            if wp.flag in self.pos_filt and len(wp.word.strip()) >= self.word_min_len \
                    and wp.word.lower() not in stopwords_list:
                return True
    # TextRank过滤窗口大小为5，单词最小为2
    textrank_model = TextRank(window=5, word_min_len=2)
    allowPOS = ('n', "x", 'eng', 'nr', 'ns', 'nt', "nw", "nz", "c")

    for row in partition:
        tags = textrank_model.textrank(row.sentence, topK=20, withWeight=True, allowPOS=allowPOS, withFlag=False)
        for tag in tags:
            yield row.article_id, row.channel_id, tag[0], tag[1]

In [49]:
textrank_keywords_df = article_data.rdd.mapPartitions(textrank).toDF(["article_id", "channel_id", "keyword", "textrank"])

textrank_keywords_df.write.insertInto("textrank_keywords_values")

In [51]:
textrank_keywords_df.count()
textrank_keywords_df.show()
textrank_keywords_df.head()

200

+----------+----------+-----------+-------------------+
|article_id|channel_id|    keyword|           textrank|
+----------+----------+-----------+-------------------+
|         1|        17|       组件|                1.0|
|         1|        17|      props| 0.5154370285370792|
|         1|        17|        msg| 0.4702870805040915|
|         1|        17|       数据|0.45582871346014814|
|         1|        17|      child|0.31296871088663686|
|         1|        17|     strong| 0.3089686862986876|
|         1|        17|       code| 0.3032954542098871|
|         1|        17|        Vue|0.24087919593391022|
|         1|        17|         pa|0.22048638072881815|
|         1|        17|         ul| 0.2018632319447092|
|         1|        17|  childNode|0.19610401758526286|
|         1|        17|     String|0.17134793062324802|
|         1|        17|forChildMsg| 0.1668240799844303|
|         1|        17|       defa| 0.1655549274585362|
|         1|        17|        pre|0.155246572790472

Row(article_id=1, channel_id=17, keyword='组件', textrank=1.0)

In [52]:
# 文章画像 关键词与权重合并
# textrank * idf
idf_keywords_values = oa.spark.sql("select * from idf_keywords_values")

In [54]:
idf_keywords_values.count()
idf_keywords_values.show()

1266

+-------+------------------+-----+
|keyword|               idf|index|
+-------+------------------+-----+
|   this|0.6061358035703155|    0|
|     pa|               0.0|    1|
|   node|0.6061358035703155|    2|
|   data|0.6061358035703155|    3|
|   数据|0.4519851237430572|    4|
|    let|0.7884573603642703|    5|
|   keys|1.0116009116784799|    6|
|    obj|1.0116009116784799|    7|
|   组件|1.0116009116784799|    8|
|    npm|0.7884573603642703|    9|
|  child|1.2992829841302609|   10|
|   节点|1.7047480922384253|   11|
|    log|0.3184537311185346|   12|
|   属性|1.0116009116784799|   13|
|    key|0.7884573603642703|   14|
|console|0.4519851237430572|   15|
|  value|0.7884573603642703|   16|
|    var|0.7884573603642703|   17|
| return|0.4519851237430572|   18|
|    div|1.7047480922384253|   19|
+-------+------------------+-----+
only showing top 20 rows



In [55]:
keywords_res = textrank_keywords_df.join(idf_keywords_values, on=['keyword'], how='left')

In [56]:
keywords_res.count()
keywords_res.show()

200

+-----------+----------+----------+-------------------+-------------------+-----+
|    keyword|article_id|channel_id|           textrank|                idf|index|
+-----------+----------+----------+-------------------+-------------------+-----+
|       组件|         1|        17|                1.0| 1.0116009116784799|    8|
|      props|         1|        17| 0.5154370285370792| 1.7047480922384253|   26|
|        msg|         1|        17| 0.4702870805040915| 1.2992829841302609|   27|
|       数据|         1|        17|0.45582871346014814| 0.4519851237430572|    4|
|      child|         1|        17|0.31296871088663686| 1.2992829841302609|   10|
|     strong|         1|        17| 0.3089686862986876|               null| null|
|       code|         1|        17| 0.3032954542098871|               null| null|
|        Vue|         1|        17|0.24087919593391022| 1.7047480922384253|  139|
|         pa|         1|        17|0.22048638072881815|                0.0|    1|
|         ul|       

In [59]:
keywords_weights = keywords_res.withColumn('weights', keywords_res.textrank * keywords_res.idf).select(["article_id", "channel_id", "keyword", "weights"])
keywords_weights.count()
keywords_weights.show()

200

+----------+----------+-----------+-------------------+
|article_id|channel_id|    keyword|            weights|
+----------+----------+-----------+-------------------+
|         1|        17|       组件| 1.0116009116784799|
|         1|        17|      props| 0.8786902910676285|
|         1|        17|        msg| 0.6110360013552643|
|         1|        17|       数据|0.20602779745892363|
|         1|        17|      child| 0.4066349206201904|
|         1|        17|     strong|               null|
|         1|        17|       code|               null|
|         1|        17|        Vue| 0.4106383497282593|
|         1|        17|         pa|                0.0|
|         1|        17|         ul|0.04050803514258234|
|         1|        17|  childNode|0.33430794985876744|
|         1|        17|     String| 0.2921050578389841|
|         1|        17|forChildMsg|  0.284393032092888|
|         1|        17|       defa|0.28222944674561046|
|         1|        17|        pre|               nu

In [61]:
keywords_weights.registerTempTable('temp')
keywords_weights = oa.spark.sql("select article_id, min(channel_id) channel_id, collect_list(keyword) keywords, collect_list(weights) weights from temp group by article_id")

keywords_weights.count()
keywords_weights.show()
keywords_weights.head()

Row(article_id=7, channel_id=17, keywords=['code', '以太坊', 'web3', '合约', 'pre', 'var', '交易', 'eth', 'config', '智能', 'oschina', 'affid', 'ethereum', 'github', 'https', 'href', '区块链', '内容', 'truffle', 'http'], weights=[0.7166252318348719, 0.6165939326860989, 0.594581360981357, 0.22097663204437684, 0.47769652324354406, 0.439316793855322, 0.23920844096338223, 0.3644739102690279, 0.35928887983850694, 0.3203133406779015, 0.18140689390402723, 0.17533670142413396, 0.2859854782648112, 0.09034120068041016, 0.2393229613380347, 0.1086271150745705])

In [66]:
# 合并关键词和权重到字典
def _func(row):
    return row.article_id, row.channel_id, dict(zip(row.keywords, row.weights))

article_kewords = keywords_weights.rdd.map(_func).toDF(['article_id', 'channel_id', 'keywords'])

In [67]:
article_kewords.count()
article_kewords.show()
article_kewords.head()

10

+----------+----------+----------------------+
|article_id|channel_id|              keywords|
+----------+----------+----------------------+
|         7|        17|  [github -> 0.0903...|
|         6|        17|  [jpg -> 0.4829933...|
|         9|        17|  [loadStyle -> 0.0...|
|         5|        17|  [r2 -> 0.07908227...|
|         1|        17|  [msg -> 0.6110360...|
|        10|        17|  [nofollow -> 0.83...|
|         3|        17|  [jpg -> 0.4052402...|
|         8|        17|  [app -> 0.4392507...|
|         2|        17|[属性 -> 1.00270783...|
|         4|        17|  [jpg -> 0.9989083...|
+----------+----------+----------------------+



Row(article_id=7, channel_id=17, keywords={'github': 0.09034120068041016, 'pre': 0.47769652324354406, 'code': 0.7166252318348719, '交易': 0.23920844096338223, 'var': 0.439316793855322, 'oschina': 0.18140689390402723, 'web3': 0.594581360981357, '智能': 0.3203133406779015, 'ethereum': 0.2859854782648112, '合约': 0.22097663204437684, '以太坊': 0.6165939326860989, 'eth': 0.3644739102690279, 'affid': 0.17533670142413396, 'href': 0.1086271150745705, 'https': 0.2393229613380347, 'config': 0.35928887983850694})

In [68]:
# 计算tfidf与texrank共同词作为主题词
topic_sql = "select t.article_id article_id2, collect_set(t.keyword) topics from tfidf_keywords_values t inner join textrank_keywords_values r where t.keyword=r.keyword group by article_id2"
article_topics = oa.spark.sql(topic_sql)


In [69]:
article_topics.count()
article_topics.show()
article_topics.head()

10

+-----------+--------------------------+
|article_id2|                    topics|
+-----------+--------------------------+
|          7|[交易, 内容, eth, 合约,...|
|          6|      [vue, https, &#, ...|
|          9|  [item, 数据驱动, goLi...|
|          5|      [r2, obj, console...|
|          1|    [props, child, 组件...|
|         10|   [WebSocket, 源码, 域...|
|          3|   [Array, obj, 浅拷贝,...|
|          8|    [模式, mode, optimi...|
|          2|    [match, DOM, 节点, ...|
|          4| [bin, 数据库, 项目, wi...|
+-----------+--------------------------+



Row(article_id2=7, topics=['交易', '内容', 'eth', '合约', 'config', 'web3', 'var', '区块链', 'truffle', '智能', '以太坊'])

In [70]:
# 关键词与主题词结果合并，得到文章的最终完整画像
article_profile = article_kewords.join(article_topics, article_kewords.article_id==article_topics.article_id2).select(["article_id", "channel_id", "keywords", "topics"])

# article_profile.write.insertInto("article_profile")



In [71]:
article_profile.count()
article_profile.show()
article_profile.head()

10

+----------+----------+----------------------+--------------------------+
|article_id|channel_id|              keywords|                    topics|
+----------+----------+----------------------+--------------------------+
|         7|        17|  [github -> 0.0903...|[交易, 内容, eth, 合约,...|
|         6|        17|  [jpg -> 0.4829933...|      [vue, https, &#, ...|
|         9|        17|  [loadStyle -> 0.0...|  [item, 数据驱动, goLi...|
|         5|        17|  [r2 -> 0.07908227...|      [r2, obj, console...|
|         1|        17|  [msg -> 0.6110360...|    [props, child, 组件...|
|        10|        17|  [nofollow -> 0.83...|   [WebSocket, 源码, 域...|
|         3|        17|  [jpg -> 0.4052402...|   [Array, obj, 浅拷贝,...|
|         8|        17|  [app -> 0.4392507...|    [模式, mode, optimi...|
|         2|        17|[属性 -> 1.00270783...|    [match, DOM, 节点, ...|
|         4|        17|  [jpg -> 0.9989083...| [bin, 数据库, 项目, wi...|
+----------+----------+----------------------+---------------------

Row(article_id=7, channel_id=17, keywords={'github': 0.09034120068041016, 'pre': 0.47769652324354406, 'code': 0.7166252318348719, '交易': 0.23920844096338223, 'var': 0.439316793855322, 'oschina': 0.18140689390402723, 'web3': 0.594581360981357, '智能': 0.3203133406779015, 'ethereum': 0.2859854782648112, '合约': 0.22097663204437684, '以太坊': 0.6165939326860989, 'eth': 0.3644739102690279, 'affid': 0.17533670142413396, 'href': 0.1086271150745705, 'https': 0.2393229613380347, 'config': 0.35928887983850694}, topics=['交易', '内容', 'eth', '合约', 'config', 'web3', 'var', '区块链', 'truffle', '智能', '以太坊'])