In [2]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("YourAppName").getOrCreate()

from pyspark.ml.feature import Tokenizer

In [3]:
sentences_df = spark.createDataFrame([
    (1, "This is an introduction to Spark MLlib"),
    (2, "MLlib includes libries for classification and regression"),
    (3, "It alspo contains supporting tools for pipelines")],
    ["id", "sentence"])

In [5]:
sentences_df.show()

+---+--------------------+
| id|            sentence|
+---+--------------------+
|  1|This is an introd...|
|  2|MLlib includes li...|
|  3|It alspo contains...|
+---+--------------------+



In [6]:
sent_token = Tokenizer(inputCol="sentence", outputCol="words")

In [7]:
sent_tokenized_df = sent_token.transform(sentences_df)  # No need to run fit function

In [9]:
sent_tokenized_df.show()

+---+--------------------+--------------------+
| id|            sentence|               words|
+---+--------------------+--------------------+
|  1|This is an introd...|[this, is, an, in...|
|  2|MLlib includes li...|[mllib, includes,...|
|  3|It alspo contains...|[it, alspo, conta...|
+---+--------------------+--------------------+



* TF-IDF: Term Frequency and Inverse Document Frequency

In [12]:
from pyspark.ml.feature import HashingTF, IDF

In [10]:
sentences_df.take(1)

[Row(id=1, sentence='This is an introduction to Spark MLlib')]

In [11]:
sent_tokenized_df.take(1)

[Row(id=1, sentence='This is an introduction to Spark MLlib', words=['this', 'is', 'an', 'introduction', 'to', 'spark', 'mllib'])]

In [13]:
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)

In [14]:
sent_hfTF_df = hashingTF.transform(sent_tokenized_df)

In [16]:
sent_hfTF_df.take(1) 
# It matched each word to an index number

[Row(id=1, sentence='This is an introduction to Spark MLlib', words=['this', 'is', 'an', 'introduction', 'to', 'spark', 'mllib'], rawFeatures=SparseVector(20, {6: 2.0, 8: 1.0, 9: 1.0, 10: 1.0, 13: 1.0, 15: 1.0}))]

In [17]:
idf = IDF(inputCol="rawFeatures", outputCol="idf_features")

In [19]:
idfModel = idf.fit(sent_hfTF_df)

In [20]:
tfidf_df = idfModel.transform(sent_hfTF_df)

In [21]:
tfidf_df.take(1)

[Row(id=1, sentence='This is an introduction to Spark MLlib', words=['this', 'is', 'an', 'introduction', 'to', 'spark', 'mllib'], rawFeatures=SparseVector(20, {6: 2.0, 8: 1.0, 9: 1.0, 10: 1.0, 13: 1.0, 15: 1.0}), idf_features=SparseVector(20, {6: 0.5754, 8: 0.2877, 9: 0.6931, 10: 0.6931, 13: 0.6931, 15: 0.0}))]