Notebook written by [Zhedong Zheng](https://github.com/zhedongzheng)

<img src="img/tfidf_flow.png" width="300">

In [1]:
import tensorflow as tf
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import CountVectorizer, IDF
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [2]:
def get_idx2word(_index_from=3):
    word2idx = tf.keras.datasets.imdb.get_word_index()
    word2idx = {k:(v+_index_from) for k,v in word2idx.items()}
    word2idx["<pad>"] = 0
    word2idx["<start>"] = 1
    word2idx["<unk>"] = 2
    idx2word = {idx: w for w, idx in word2idx.items()}
    return idx2word


def make_df(x, y):
    return sess.createDataFrame(
        [(int(y_), [idx2word[idx] for idx in x_]) for x_, y_ in zip(x, y)],
        ['label', 'words'])

In [3]:
idx2word = get_idx2word()
(X_train, y_train), (X_test, y_test) = tf.keras.datasets.imdb.load_data(num_words=20000)

sess = SparkSession.builder.appName('nlp').getOrCreate()

pipeline = Pipeline(stages=[
    CountVectorizer(inputCol='words', outputCol='tf'),
    IDF(inputCol='tf', outputCol='tfidf'),
    LogisticRegression(featuresCol='tfidf'),
])

df_train = make_df(X_train, y_train)
df_test = make_df(X_test, y_test)

prediction = pipeline.fit(df_train).transform(df_test)
print("Testing Accuracy: %.3f" % MulticlassClassificationEvaluator().evaluate(prediction))

Testing Accuracy: 0.841
