<a href="https://colab.research.google.com/github/jianfeiZhao/BI_projs/blob/master/pyspark_tfidf.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [10]:
!pip install pyspark



In [11]:
# 使用pyspark计算文档的TFIDF
from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer, HashingTF, IDF

# 创建SparkSession，2.0版本之后只需要创建一个SparkSession即可
spark=SparkSession \
        .builder \
        .appName('tfidf_app') \
        .getOrCreate()

# 加载数据
documents = spark.createDataFrame([
    (0, "我 非常 喜欢 看 电视剧", "data1"),
    (1, "我 喜欢 看 电视剧", "data2"),
    (2, "我 喜欢 吃 苹果","data3"),
    (3, "我 喜欢 吃 苹果 看 电视剧","data4")], ["id", "doc_text", "other"])

# 转化为视图
documents.registerTempTable("doc_table")
df = spark.sql("SELECT id, doc_text FROM doc_table")
print('df=')
df.show() # 打印前20行
print(df.collect()) # 转化为列表
# id
#df.select('id').show() 

df=
+---+-------------------------+
| id|                 doc_text|
+---+-------------------------+
|  0|   我 非常 喜欢 看 电视剧|
|  1|        我 喜欢 看 电视剧|
|  2|          我 喜欢 吃 苹果|
|  3|我 喜欢 吃 苹果 看 电视剧|
+---+-------------------------+

[Row(id=0, doc_text='我 非常 喜欢 看 电视剧'), Row(id=1, doc_text='我 喜欢 看 电视剧'), Row(id=2, doc_text='我 喜欢 吃 苹果'), Row(id=3, doc_text='我 喜欢 吃 苹果 看 电视剧')]


In [12]:
# 将desc进行分词
tokenizer = Tokenizer(inputCol="doc_text", outputCol="doc_words")
df = tokenizer.transform(df)
df.show()

+---+-------------------------+----------------------------+
| id|                 doc_text|                   doc_words|
+---+-------------------------+----------------------------+
|  0|   我 非常 喜欢 看 电视剧|[我, 非常, 喜欢, 看, 电视剧]|
|  1|        我 喜欢 看 电视剧|      [我, 喜欢, 看, 电视剧]|
|  2|          我 喜欢 吃 苹果|        [我, 喜欢, 吃, 苹果]|
|  3|我 喜欢 吃 苹果 看 电视剧| [我, 喜欢, 吃, 苹果, 看,...|
+---+-------------------------+----------------------------+



In [13]:
# 计算每篇文档的TF-IDF
hashingTF = HashingTF(inputCol='doc_words', outputCol="doc_words_tf")
tf = hashingTF.transform(df).cache()
idf = IDF(inputCol='doc_words_tf', outputCol="doc_words_tfidf").fit(tf)
tfidf = idf.transform(tf).cache()
print('\n 每个文档的TFIDF')
tfidf.select('doc_words_tfidf').show(truncate=False)


 每个文档的TFIDF
+----------------------------------------------------------------------------------------------------------------------------------------+
|doc_words_tfidf                                                                                                                         |
+----------------------------------------------------------------------------------------------------------------------------------------+
|(262144,[62541,75305,96624,163839,227575],[0.22314355131420976,0.0,0.0,0.9162907318741551,0.22314355131420976])                         |
|(262144,[62541,75305,96624,227575],[0.22314355131420976,0.0,0.0,0.22314355131420976])                                                   |
|(262144,[75305,95584,96624,230496],[0.0,0.5108256237659907,0.0,0.5108256237659907])                                                     |
|(262144,[62541,75305,95584,96624,227575,230496],[0.22314355131420976,0.0,0.5108256237659907,0.0,0.22314355131420976,0.5108256237659907])|
+-------------

In [14]:
# 数据规范化，默认为2阶范式
from pyspark.ml.feature import Normalizer
normalizer = Normalizer(inputCol="doc_words_tfidf", outputCol="norm") #默认.setP(2.0)
tfidf = normalizer.transform(tfidf)
#tfidf.select('norm').show(truncate=False)

import pyspark.sql.functions as psf 
from pyspark.sql.types import DoubleType
dot_udf = psf.udf(lambda x,y: float(x.dot(y)), DoubleType())
tfidf.alias("a1").join(tfidf.alias("a2"), psf.col("a1.id") < psf.col("a2.id"))\
    .select(
        psf.col("a1.id").alias("id1"), 
        psf.col("a2.id").alias("id2"), 
        dot_udf("a1.norm", "a2.norm").alias("dot"))\
    .sort("id1", "id2")\
    .show()

+---+---+-------------------+
|id1|id2|                dot|
+---+---+-------------------+
|  0|  1|0.32563128587164397|
|  0|  2|                0.0|
|  0|  3| 0.1303511225242502|
|  1|  2|                0.0|
|  1|  3| 0.4003028215649755|
|  2|  3| 0.9163829172606391|
+---+---+-------------------+



上表显示的是各个句子之间的相似度结果。