In [35]:
import math
from pyspark.sql import functions as F

In [8]:
corpus = [('i love ice cream', 1),
         ('i love dribbling', 2),
         ('cold is cold and hot is hot', 3)]

Convert the corpus into an RDD.

In [9]:
rdd = sc.parallelize(corpus)

In [10]:
rdd.collect()

[('i love ice cream', 1), ('i love dribbling', 2), ('cold is cold and hot is hot', 3)]

In [15]:
document_id_map = rdd.flatMap(lambda document: [((word, document[1]), 1) for word in document[0].split(' ')])

In [16]:
document_id_map.collect()

[(('i', 1), 1), (('love', 1), 1), (('ice', 1), 1), (('cream', 1), 1), (('i', 2), 1), (('love', 2), 1), (('dribbling', 2), 1), (('cold', 3), 1), (('is', 3), 1), (('cold', 3), 1), (('and', 3), 1), (('hot', 3), 1), (('is', 3), 1), (('hot', 3), 1)]

In [17]:
tf = document_id_map.reduceByKey(lambda x, y: x + y)

In [18]:
tf.collect()

[(('is', 3), 2), (('i', 2), 1), (('dribbling', 2), 1), (('cream', 1), 1), (('i', 1), 1), (('love', 2), 1), (('and', 3), 1), (('hot', 3), 2), (('love', 1), 1), (('ice', 1), 1), (('cold', 3), 2)]

In [26]:
word_to_doc_tf_map = tf.map(lambda x: (x[0][0], (x[0][1], x[1])))

In [21]:
word_to_one = tf.map(lambda x: (x[0][0], 1))

In [22]:
df = word_to_one.reduceByKey(lambda x, y: x + y)

In [23]:
df.collect()

[('dribbling', 1), ('hot', 1), ('cream', 1), ('love', 2), ('cold', 1), ('and', 1), ('i', 2), ('ice', 1), ('is', 1)]

In [27]:
tf_df = word_to_doc_tf_map.join(df)

In [28]:
tf_df.collect()

[('dribbling', ((2, 1), 1)), ('cream', ((1, 1), 1)), ('hot', ((3, 2), 1)), ('love', ((1, 1), 2)), ('love', ((2, 1), 2)), ('and', ((3, 1), 1)), ('cold', ((3, 2), 1)), ('i', ((2, 1), 2)), ('i', ((1, 1), 2)), ('ice', ((1, 1), 1)), ('is', ((3, 2), 1))]

In [29]:
term_document_id_tf_df = tf_df.map(lambda x: (x[0], x[1][0][0], x[1][0][1], x[1][1]))

In [30]:
term_document_id_tf_df.collect()

[('dribbling', 2, 1, 1), ('cream', 1, 1, 1), ('hot', 3, 2, 1), ('love', 2, 1, 2), ('love', 1, 1, 2), ('and', 3, 1, 1), ('cold', 3, 2, 1), ('i', 2, 1, 2), ('i', 1, 1, 2), ('is', 3, 2, 1), ('ice', 1, 1, 1)]

In [32]:
term_document_id_tf_df_idf = term_document_id_tf_df.map(lambda x: (x[0], x[1], x[2], x[3], math.log10(len(corpus)) / x[3]))

In [33]:
final_df = term_document_id_tf_df_idf.toDF(['term', 'document_id', 'tf', 'df', 'idf'])

In [34]:
final_df.show()

+---------+-----------+---+---+-------------------+
|     term|document_id| tf| df|                idf|
+---------+-----------+---+---+-------------------+
|dribbling|          2|  1|  1|0.47712125471966244|
|    cream|          1|  1|  1|0.47712125471966244|
|      hot|          3|  2|  1|0.47712125471966244|
|     love|          1|  1|  2|0.23856062735983122|
|     love|          2|  1|  2|0.23856062735983122|
|      and|          3|  1|  1|0.47712125471966244|
|     cold|          3|  2|  1|0.47712125471966244|
|        i|          2|  1|  2|0.23856062735983122|
|        i|          1|  1|  2|0.23856062735983122|
|      ice|          1|  1|  1|0.47712125471966244|
|       is|          3|  2|  1|0.47712125471966244|
+---------+-----------+---+---+-------------------+

In [36]:
final_df = final_df.withColumn('tfidf', F.col('tf') * F.col('idf'))

In [37]:
final_df.show()

+---------+-----------+---+---+-------------------+-------------------+
|     term|document_id| tf| df|                idf|              tfidf|
+---------+-----------+---+---+-------------------+-------------------+
|dribbling|          2|  1|  1|0.47712125471966244|0.47712125471966244|
|    cream|          1|  1|  1|0.47712125471966244|0.47712125471966244|
|      hot|          3|  2|  1|0.47712125471966244| 0.9542425094393249|
|     love|          2|  1|  2|0.23856062735983122|0.23856062735983122|
|     love|          1|  1|  2|0.23856062735983122|0.23856062735983122|
|      and|          3|  1|  1|0.47712125471966244|0.47712125471966244|
|     cold|          3|  2|  1|0.47712125471966244| 0.9542425094393249|
|        i|          2|  1|  2|0.23856062735983122|0.23856062735983122|
|        i|          1|  1|  2|0.23856062735983122|0.23856062735983122|
|       is|          3|  2|  1|0.47712125471966244| 0.9542425094393249|
|      ice|          1|  1|  1|0.47712125471966244|0.47712125471