In [1]:
import os 
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.feature import Word2Vec, Word2VecModel, BucketedRandomProjectionLSH, BucketedRandomProjectionLSHModel
import jieba
import re 
import pandas as pd

In [2]:
jieba.initialize()
spark = (SparkSession
         .builder
         .appName("test-dockerlinuxcontainer")
         .enableHiveSupport()
         .config("spark.executor.instances", "50")
         .config("spark.executor.memory","48g")
         .config("spark.executor.cores","24")
         .config("spark.driver.memory","48g")
         .config("spark.sql.shuffle.partitions","500")
         .config("spark.yarn.appMasterEnv.yarn.nodemanager.container-executor.class","DockerLinuxContainer")
         .config("spark.executorEnv.yarn.nodemanager.container-executor.class","DockerLinuxContainer")
         .config("spark.yarn.appMasterEnv.yarn.nodemanager.docker-container-executor.image-name","bdp-docker.jd.com:5000/wise_mart_bag:latest")
         .config("spark.executorEnv.yarn.nodemanager.docker-container-executor.image-name","bdp-docker.jd.com:5000/wise_mart_bag:latest")
         .getOrCreate())

Building prefix dict from /media/cfs/zhangyuwei37/.pylib/lib/python3.6/site-packages/jieba/dict.txt ...
Loading model from cache /tmp/jieba.cache
Loading model cost 1.161816120147705 seconds.
Prefix dict has been built succesfully.


In [5]:
"""
说明：word2vec训练词向量，进而得到评论向量，然后LSH快速求评论向量近邻。
参考：
https://blog.csdn.net/weixin_43250857/article/details/107468470
https://blog.csdn.net/u013090676/article/details/82716911
https://spark.apache.org/docs/latest/api/python/pyspark.ml.html?highlight=word2vec#pyspark.ml.feature.Word2VecModel
"""

df = pd.read_csv('./DMSC.csv')


In [6]:
df.dtypes

ID                int64
Movie_Name_EN    object
Movie_Name_CN    object
Crawl_Date       object
Number            int64
Username         object
Date             object
Star              int64
Comment          object
Like              int64
dtype: object

In [7]:
df_schema = StructType([StructField("ID", StringType(), True)\
                       ,StructField("Movie_Name_EN", StringType(), True)
                       ,StructField("Movie_Name_CN", StringType(), True)
                       ,StructField("Crawl_Date", StringType(), True)
                       ,StructField("Number", IntegerType(), True)
                       ,StructField("Username", StringType(), True)
                       ,StructField("Date", StringType(), True)
                       ,StructField("Star", IntegerType(), True)
                       ,StructField("Comment", StringType(), True)
                       ,StructField("Like", IntegerType(), True)])
 
df = spark.createDataFrame(df, schema=df_schema)

In [8]:
df.dtypes

[('ID', 'string'),
 ('Movie_Name_EN', 'string'),
 ('Movie_Name_CN', 'string'),
 ('Crawl_Date', 'string'),
 ('Number', 'int'),
 ('Username', 'string'),
 ('Date', 'string'),
 ('Star', 'int'),
 ('Comment', 'string'),
 ('Like', 'int')]

In [9]:
def jieba_f(line):
    """
    分词（将字符串拆分），变成字符串数组
    """
    remove_chars_pattern = re.compile('[·’!"#$%&\'()＃！（）*+,-./:;<=>?@，：?★、…．＞【】［］《》？“”‘’[\\]^_`{|}~]+')
    try:
        words = [remove_chars_pattern.sub('', word) for word in jieba.lcut(line, cut_all=False)]
        return words
    except:
        return []
jieba_udf = udf(jieba_f, ArrayType(StringType()))
df = df.withColumn('Words', jieba_udf(col('Comment')))

In [10]:
df.show()

+---+--------------------+-------------+----------+------+----------------+----------+----+------------------------------------+----+----------------------------+
| ID|       Movie_Name_EN|Movie_Name_CN|Crawl_Date|Number|        Username|      Date|Star|                             Comment|Like|                       Words|
+---+--------------------+-------------+----------+------+----------------+----------+----+------------------------------------+----+----------------------------+
|  0|Avengers Age of U...|  复仇者联盟2|2017-01-22|     1|            然潘|2015-05-13|   3|          连奥创都知道整容要去韩国。|2404|  [ , 连, 奥创, 都, 知道,...|
|  1|Avengers Age of U...|  复仇者联盟2|2017-01-22|     2|      更深的白色|2015-04-24|   2| 非常失望，剧本完全敷衍了事，主线...|1231|  [ , 非常, 失望, , 剧本,...|
|  2|Avengers Age of U...|  复仇者联盟2|2017-01-22|     3|    有意识的贱民|2015-04-26|   2|     2015年度最失望作品。以为面面...|1052|     [ , 2015, 年度, 最, ...|
|  3|Avengers Age of U...|  复仇者联盟2|2017-01-22|     4|  不老的李大爷耶|2015-04-23|   4|   《铁人2》中勾引钢铁侠，《妇联1...|1045

In [11]:
def train_word2vec(df, model_path='./word2vec_model'):
    """
    word2vec训练词向量(输入：Words词序列，输出：embedding词嵌入向量)
    """
    word2vec = Word2Vec(vectorSize=20, numPartitions=4, maxIter=3, seed=33, inputCol='Words', outputCol='Embedding')
    try:
        word2vec_model = Word2VecModel.load(model_path)
    except:
        word2vec_model = word2vec.fit(df)
        word2vec_model.save(model_path)
    return word2vec_model
word2vec_model = train_word2vec(df)
# 使用词embedding的平均得到评论embedding
df = word2vec_model.transform(df)

In [16]:
def train_lsh(df, model_path = './lsh_model'):
    """
    # 训练LSH实现评论embedding快速近邻向量检索
    """
    lsh = BucketedRandomProjectionLSH(inputCol='Embedding', outputCol='Buckets', numHashTables=2, bucketLength=0.1)
    try:
        lsh_model = BucketedRandomProjectionLSHModel.load(model_path)
    except:
        lsh_model = lsh.fit(df)
        lsh_model.save(model_path)
    return lsh_model
lsh_model = train_lsh(df)
# 评论向量embedding计算分桶
df = lsh_model.transform(df)

In [18]:
df.select("Words", "Embedding", "Buckets").show()

+----------------------------+--------------------+----------------+
|                       Words|           Embedding|         Buckets|
+----------------------------+--------------------+----------------+
|  [ , 连, 奥创, 都, 知道,...|[-0.1797905573621...|  [[1.0], [2.0]]|
|  [ , 非常, 失望, , 剧本,...|[-0.1429192681069...|[[-1.0], [-1.0]]|
|     [ , 2015, 年度, 最, ...|[-0.0452436775913...|[[-1.0], [-1.0]]|
|     [ , , 铁人, 2, , 中,...|[-0.0900899582276...|  [[0.0], [0.0]]|
| [ , 虽然, 从头, 打到, 尾...|[0.15214901122575...| [[-3.0], [2.0]]|
| [ , 剧情, 不如, 第一集, ...|[-0.1095046793364...| [[-1.0], [0.0]]|
| [ , 只有, 一颗, 彩蛋, 必...|[-0.1215881768105...|[[-1.0], [-1.0]]|
| [ , 看腻, 了, 这些, 打来...|[-0.1153682228177...|  [[0.0], [0.0]]|
|   [ , 漫威粉, 勿, 喷, , ...|[-0.1377823332038...| [[-1.0], [1.0]]|
| [ , 属于, 超级, 英雄, 的...|[-0.0585340855664...| [[-1.0], [0.0]]|
| [ , , 一个, 没有, 黑暗面...|[-0.0583722080308...|[[-1.0], [-1.0]]|
|[ , 请漫威, 华丽, 地滚出,...|[-0.1466444059716...| [[1.0], [-2.0]]|
| [ , 承认, 这货, 很烂, 很...|[-0.13180895320

In [21]:
# 求每个评论的embedding近邻
# 下面将计算并找出与每条评论距离在0.5之内的其他评论，即1条评论对应N行（也可能完美没有满足相似阈值的评论）
comment_distance = lsh_model.approxSimilarityJoin(df, df, 0.5, 'Distance').select(
    col('datasetA.ID').alias('ID1'), col('datasetA.Movie_Name_CN').alias('Movie_Name_CN1'), col('datasetA.Comment').alias('Comment1'),
    col('datasetB.ID').alias('ID2'), col('datasetB.Movie_Name_CN').alias('Movie_Name_CN2'), col('datasetB.Comment').alias('Comment2'),
    'Distance'
).filter('datasetA.ID!=datasetB.ID')

In [23]:
comment_distance.show()

+-------+--------------+----------------------------+-------+--------------+--------------------------------------+-------------------+
|    ID1|Movie_Name_CN1|                    Comment1|    ID2|Movie_Name_CN2|                              Comment2|           Distance|
+-------+--------------+----------------------------+-------+--------------+--------------------------------------+-------------------+
|1328138|        寻龙诀| 能匹配的上初中对文字的迷恋 | 613149|    西游降魔篇|   那么多层次包在一起，得花多大的心...| 0.4264067964674386|
|1328138|        寻龙诀| 能匹配的上初中对文字的迷恋 | 618841|    西游降魔篇|   一生所爱的旋律一想起来就让人动情...|0.40728640510643804|
|1328138|        寻龙诀| 能匹配的上初中对文字的迷恋 | 657580|    西游降魔篇|   我是先看降魔再看大话的，说实话，...|0.38695478157665064|
|1328138|        寻龙诀| 能匹配的上初中对文字的迷恋 | 660416|    西游降魔篇|   估计星爷觉得西游降魔在爱情方面很...| 0.4373297648012288|
|1328138|        寻龙诀| 能匹配的上初中对文字的迷恋 | 854203|      爱乐之城|   人生处处可歌舞，用音乐和舞蹈表达...|0.45061194795632464|
|1328138|        寻龙诀| 能匹配的上初中对文字的迷恋 | 697071|    西游伏妖篇|   求星爷不要在拍西游了，让那个经典...| 0.496721820439127

In [24]:
# 为每个评论保留top 3相似的评论
comment_distance.createOrReplaceTempView('comment_distance')
sql = '''
-- 保留每个评论的top 3相似评论
WITH 
    comment_with_rank AS 
    (
        select 
            *,
            row_number() over (partition by ID1 order by Distance asc) Ranking
        from 
            comment_distance
    ),
-- 每个评论留1条详情
    comment_info AS 
    (
        select
            * 
        from 
            comment_with_rank
        where 
            Ranking=1
    ),
-- 每条评论top3拉平为列
    comment_with_top3 AS 
    (
        select 
            ID1,
            collect_set(ID2) Similar_IDs,
            collect_set(Movie_Name_CN2) Similar_Movie_Name_CNs,
            collect_set(Comment2) Similar_Comments
        from 
            comment_with_rank
        where 
            Ranking <= 3
        group by 
            ID1
    )
-- 输出结果
select
    a.ID1 ID,
    b.Movie_Name_CN1 Movie_Name_CN,
    b.Comment1 Comment,
    a.Similar_IDs,
    a.Similar_Movie_Name_CNs,
    a.Similar_Comments
from 
    comment_with_top3 a
left join 
    comment_info b
on 
    a.ID1=b.ID1
'''
similar_comment = spark.sql(sql)
similar_comment.show(truncate=False)

+-------+-------------+---------------------------------------------------------------------------------------------------+---------------------------+-----------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+
|ID     |Movie_Name_CN|Comment                                                                                            |Similar_IDs                |Similar_Movie_Name_CNs             |Similar_Comments                                                                                                                                           |
+-------+-------------+---------------------------------------------------------------------------------------------------+---------------------------+-----------------------------------+-----------------------------------------------------------------------------------------------------------------------------