In [18]:
import os
import sys
# 如果当前代码文件运行测试需要加入修改路径，避免出现后导包问题
BASE_DIR = os.path.dirname(os.path.dirname(os.getcwd()))
sys.path.insert(0, os.path.join(BASE_DIR))

PYSPARK_PYTHON = "/miniconda2/envs/reco_sys/bin/python"
# 当存在多个版本时，不指定很可能会导致出错
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON

from offline import SparkSessionBase
from setting.default import channelInfo
from pyspark.ml.feature import Word2Vec



class TrainWord2VecModel(SparkSessionBase):

    SPARK_APP_NAME = "Word2Vec"
    SPARK_URL = "yarn"
    
    ENABLE_HIVE_SUPPORT = True

    def __init__(self):
        self.spark = self._create_spark_session()


w2v = TrainWord2VecModel()

In [19]:
# 训练一个频道的模型
w2v.spark.sql("use article")

article_data = w2v.spark.sql("select * from article_data where article_id=18 limit 5")

In [20]:
# 文章数据进行分词处理,得到分词结果
# 分词
def segmentation(partition):
    import os
    import re

    import jieba
    import jieba.analyse
    import jieba.posseg as pseg
    import codecs

    abspath = "/root/words"

    # 结巴加载用户词典
    userDict_path = os.path.join(abspath, "ITKeywords.txt")
    jieba.load_userdict(userDict_path)

    # 停用词文本
    stopwords_path = os.path.join(abspath, "stopwords.txt")

    def get_stopwords_list():
        """返回stopwords列表"""
        stopwords_list = [i.strip()
                          for i in codecs.open(stopwords_path).readlines()]
        return stopwords_list

    # 所有的停用词列表
    stopwords_list = get_stopwords_list()

    # 分词
    def cut_sentence(sentence):
        """对切割之后的词语进行过滤，去除停用词，保留名词，英文和自定义词库中的词，长度大于2的词"""
        # print(sentence,"*"*100)
        # eg:[pair('今天', 't'), pair('有', 'd'), pair('雾', 'n'), pair('霾', 'g')]
        seg_list = pseg.lcut(sentence)
        seg_list = [i for i in seg_list if i.flag not in stopwords_list]
        filtered_words_list = []
        for seg in seg_list:
            # print(seg)
            if len(seg.word) <= 1:
                continue
            elif seg.flag == "eng":
                if len(seg.word) <= 2:
                    continue
                else:
                    filtered_words_list.append(seg.word)
            elif seg.flag.startswith("n"):
                filtered_words_list.append(seg.word)
            elif seg.flag in ["x", "eng"]:  # 是自定一个词语或者是英文单词
                filtered_words_list.append(seg.word)
        return filtered_words_list

    for row in partition:
        sentence = re.sub("<.*?>", "", row.sentence)    # 替换掉标签数据
        words = cut_sentence(sentence)
        yield row.article_id, row.channel_id, words

In [21]:
words_df = article_data.rdd.mapPartitions(segmentation).toDF(['article_id', 'channel_id', 'words'])

In [22]:
words_df.show()

+----------+----------+--------------------+
|article_id|channel_id|               words|
+----------+----------+--------------------+
|        18|        17|[web, pa, react, ...|
+----------+----------+--------------------+



In [24]:
# 直接diaoyong调用word2vec训练
w2v_model = Word2Vec(vectorSize=100, inputCol='words', outputCol='model', minCount=3)

In [None]:
model = w2v_model.fit(words_df)
model.save("hdfs://hadoop-master:9000/headlines/models/test.word2vec")

In [25]:
# 1、加载某个频道模型，得到每个词的向量
from pyspark.ml.feature import Word2VecModel
wv = Word2VecModel.load("hdfs://hadoop-master:9000/headlines/models/word2vec_model/channel_18_python.word2vec")
vectors = wv.getVectors()


In [26]:
vectors.show()

+------------------+--------------------+
|              word|              vector|
+------------------+--------------------+
|                广义|[0.28907623887062...|
|                钟爱|[-0.0529650673270...|
|c1c3387c24028915fc|[0.08250344544649...|
|          failCnt0|[-0.0034321683924...|
|       freeman1974|[0.01132440567016...|
|                伙伴|[-0.1075697541236...|
|  testStationarity|[0.09605087339878...|
|                箭头|[0.08957882970571...|
|        fieldsfrom|[-0.0121747571974...|
|      RoundrobinLB|[-0.0941602289676...|
|              COCO|[-0.2620599269866...|
|                拜拜|[-0.0820834264159...|
|          quotient|[0.08328679203987...|
|                货币|[-0.0276580695062...|
|                人物|[-0.1581292450428...|
|               wsy|[0.09347543865442...|
|           serious|[0.01220745686441...|
|               跨进程|[-0.0330988727509...|
|        fromParams|[0.00353708816692...|
|        MongoDB数据库|[-0.1521783322095...|
+------------------+--------------

In [29]:

#2、获取频道的文章画像，得到文章画像的关键词(接着之前增量更新的文章article_profile)
# 获取这些文章20个关键词名称，对应名称找到词向量
article_profile = w2v.spark.sql("select * from article_profile where channel_id=18 limit 10")

#3、计算得到文章每个词的向量
article_profile.registerTempTable('profile')
keyword_weight = w2v.spark.sql("select article_id, channel_id, keyword, weight from profile LATERAL VIEW explode(keywords) AS keyword, weight")
keyword_weight.show()


#4、计算得到文章的平均词向量即文章的向量

+----------+----------+--------+-------------------+
|article_id|channel_id| keyword|             weight|
+----------+----------+--------+-------------------+
|     13098|        18|    repr| 0.6326590117716192|
|     13098|        18|      __| 2.5401122038114203|
|     13098|        18|      属性|0.23645924932468856|
|     13098|        18|     pre| 0.6040062287555379|
|     13098|        18|    code| 0.9531379029975557|
|     13098|        18|     def| 0.5063435861497416|
|     13098|        18|   color| 1.1337936117177925|
|     13098|        18|      定义| 0.1554380122061322|
|     13098|        18| Student| 0.5033771372284416|
|     13098|        18|getPrice| 0.7404427038950527|
|     13098|        18|      方法|0.08080845613717194|
|     13098|        18|     div| 0.3434819820586186|
|     13098|        18|     str|0.35999033790156054|
|     13098|        18|      pa| 0.6651385256756351|
|     13098|        18|   slots| 0.6992789472129189|
|     13098|        18| cnblogs|0.339265861020

In [30]:
# 合并文章关键词与词向量
_keywords_vector = keyword_weight.join(vectors, vectors.word==keyword_weight.keyword, 'inner')

In [31]:
_keywords_vector.show()

+----------+----------+-------+-------------------+------+--------------------+
|article_id|channel_id|keyword|             weight|  word|              vector|
+----------+----------+-------+-------------------+------+--------------------+
|     12936|        18| strong|  4.705249850191452|strong|[0.06607607007026...|
|     12936|        18| python| 1.5221131438694515|python|[-0.0696719884872...|
|     12936|        18|   bool|  1.698618363235006|  bool|[0.08196849375963...|
|     12936|        18|   编程语言| 1.0816836469752635|  编程语言|[-0.1638689935207...|
|     12936|        18|   True| 1.1097628840375606|  True|[0.24163006246089...|
|     12936|        18|     遗漏|   1.63385196709268|    遗漏|[-0.2000092417001...|
|     12936|        18|   elif| 2.0815467371010805|  elif|[0.23405943810939...|
|     12936|        18|    int|   0.69289212656367|   int|[-0.0279460791498...|
|     12936|        18|    str| 1.1603775123519366|   str|[-0.0372486524283...|
|     12936|        18|     pa|0.3674386

In [32]:
def compute_vector(row):
    return row.article_id, row.channel_id, row.keyword, row.weight * row.vector
articleKeywordVectors = _keywords_vector.rdd.map(compute_vector).toDF(["article_id", "channel_id", "keyword", "weightingVector"])




In [33]:
articleKeywordVectors.show()

+----------+----------+--------+--------------------+
|article_id|channel_id| keyword|     weightingVector|
+----------+----------+--------+--------------------+
|     13098|        18|    repr|[0.13308673769053...|
|     13098|        18|      __|[0.03018926765933...|
|     13098|        18|      属性|[-0.0191257052454...|
|     13098|        18|     pre|[0.34502730264365...|
|     13098|        18|    code|[0.34601303844503...|
|     13098|        18|     def|[0.07761504849378...|
|     13098|        18|   color|[0.61312345712161...|
|     13098|        18|      定义|[-0.0010762159703...|
|     13098|        18| Student|[0.09441257176805...|
|     13098|        18|getPrice|[-0.0847735848446...|
|     13098|        18|      方法|[-0.0048283701284...|
|     13098|        18|     div|[0.04037546778136...|
|     13098|        18|     str|[-0.0134091549740...|
|     13098|        18|      pa|[0.08286002598213...|
|     13098|        18|   slots|[-0.2270685226558...|
|     13098|        18| cnbl

In [34]:
# 4、计算得到文章的平均词向量即文章的向量
articleKeywordVectors.registerTempTable('temptable')
articleKeywordVectors = w2v.spark.sql("select article_id, min(channel_id) channel_id, collect_set(weightingVector) vectors from temptable group by article_id")




In [35]:
articleKeywordVectors.show()

+----------+----------+--------------------+
|article_id|channel_id|             vectors|
+----------+----------+--------------------+
|     13098|        18|[[0.6131234571216...|
|     13248|        18|[[2.5336782538801...|
|     13401|        18|[[0.1199601801400...|
|     13723|        18|[[0.0767033252676...|
|     14719|        18|[[-0.091019116457...|
|     14846|        18|[[-0.069822823712...|
|     15173|        18|[[-0.359728582063...|
|     15194|        18|[[-0.006211698526...|
|     15237|        18|[[-0.023426829636...|
|     15322|        18|[[0.0780172035455...|
+----------+----------+--------------------+



In [36]:
# 求平均值
def compute_avg_vectors(row):
    x = 0
    for i in row.vectors:
        x += i
    
    # 求平均值
    return row.article_id, row.channel_id, x / len(row.vectors)

article_vector = articleKeywordVectors.rdd.map(compute_avg_vectors).toDF(['article_id', 'channel_id', 'vector'])

In [37]:
article_vector.show()

+----------+----------+--------------------+
|article_id|channel_id|              vector|
+----------+----------+--------------------+
|     12936|        18|[0.15785672885486...|
|     13206|        18|[0.36658417091938...|
|     14029|        18|[0.08382564595017...|
|     14259|        18|[-0.1518681660977...|
|     14805|        18|[0.11028526511434...|
|     15921|        18|[0.10679691438887...|
|     17370|        18|[0.08871408187970...|
|     17595|        18|[0.21126698350251...|
|     18026|        18|[0.32436757418235...|
|     18117|        18|[-0.1335141347559...|
+----------+----------+--------------------+



In [38]:
article_vector

DataFrame[article_id: bigint, channel_id: bigint, vector: vector]

In [39]:
def toArray(row):
    return row.article_id, row.channel_id, [float(i) for i in row.vector.toArray()]

article_vector = article_vector.rdd.map(toArray).toDF(['article_id', 'channel_id', 'vector'])

In [40]:
article_vector

DataFrame[article_id: bigint, channel_id: bigint, vector: array<double>]

In [42]:
# 1、拿到Python频道的所有文章数据，10片测试
from pyspark.ml.linalg import Vectors

def toVector(row):
    return row.article_id, Vectors.dense(row.vector)
train = article_vector.rdd.map(toVector).toDF(['article_id', 'vector'])

In [43]:
train

DataFrame[article_id: bigint, vector: vector]

In [44]:
# 计算相似的文章
from pyspark.ml.feature import BucketedRandomProjectionLSH
brp = BucketedRandomProjectionLSH(inputCol='vector', outputCol='hashes', numHashTables=4.0, bucketLength=10.0)
model = brp.fit(train)
similar = model.approxSimilarityJoin(train, train, 2.0, distCol='EuclideanDistance')

In [45]:
similar.show()

+--------------------+--------------------+------------------+
|            datasetA|            datasetB| EuclideanDistance|
+--------------------+--------------------+------------------+
|[17595,[0.2112669...|[17595,[0.2112669...|               0.0|
|[12936,[0.1578567...|[17370,[0.0887140...| 1.361573657055773|
|[15921,[0.1067969...|[14805,[0.1102852...|0.7717752375745968|
|[15921,[0.1067969...|[12936,[0.1578567...|1.5205099193938123|
|[15921,[0.1067969...|[14259,[-0.151868...|  1.86883894754822|
|[13206,[0.3665841...|[13206,[0.3665841...|               0.0|
|[14029,[0.0838256...|[17595,[0.2112669...|1.9271039464531845|
|[12936,[0.1578567...|[12936,[0.1578567...|               0.0|
|[12936,[0.1578567...|[18117,[-0.133514...|1.8511391009238651|
|[18117,[-0.133514...|[18117,[-0.133514...|               0.0|
|[18117,[-0.133514...|[14805,[0.1102852...|1.4227262480154705|
|[17370,[0.0887140...|[15921,[0.1067969...|0.8802763327501671|
|[14029,[0.0838256...|[14029,[0.0838256...|            

In [46]:
similar

DataFrame[datasetA: struct<article_id:bigint,vector:vector,hashes:array<vector>>, datasetB: struct<article_id:bigint,vector:vector,hashes:array<vector>>, EuclideanDistance: double]

In [None]:
def save_hbase(partitions):
    import happybase
    pool = happybase.ConnectionPool(size=3, host='hadoop-master')
    
    with pool.connection() as conn:
        article_similar = conn.table('article_similar')
        for row in partitions:
            if row.datasetA.article_id == row.datasetB.article_id:
                pass
            else:
                article_similar.put(str(row.datasetA.article_id).encode(),
                                   {'similar:{}'.format(row.datasetB.article_id).encode(): b'%0.4f' % (row.EuclideanDistance)})

similar.foreachPartition(save_hbase)