# Rank Text Similarity
### Techniques: tf-idf + svd
### Technos: pyspark

### Prerequisites
* Spark 2.4.4 is installed

### Setup environment variables

* workload: 1 hour to config (first time)

In [25]:
# setup SPARK environment variables
import os
import sys  
os.environ['SPARK_HOME'] = '/usr/local/Cellar/apache-spark/2.4.4/libexec'
os.environ['PYSPARK_PYTHON'] = '/Applications/anaconda3/envs/nlp_text_similarity/bin/python'  
os.environ['PYSPARK_DRIVER_PYTHON'] = '/Applications/anaconda3/envs/nlp_text_similarity/bin/python'  
os.environ['JAVA_HOME'] = '/Library/Java/JavaVirtualMachines/adoptopenjdk-8.jdk/Contents/Home'
os.environ['PYTHONPATH'] = os.environ['SPARK_HOME'] + '/python/lib/'
sys.path.insert(0, os.environ['SPARK_HOME'] + '/python/lib/py4j-0.10.7-src.zip')
sys.path.insert(0, os.environ['SPARK_HOME'] + '/python/lib/pyspark.zip')

In [35]:
# list dependencies
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
import pandas as pd
from pyspark.sql.types import StringType

In [28]:
# dataset path
os.chdir('..')
project_path = os.getcwd()
dataset_path = 'data/02_preprocessed/train.csv'  

In [36]:
# create sparkSession
sparkSession = SparkSession.builder.appName("my_app").master("local[*]").getOrCreate() 

In [37]:
# print spark session config
sparkSession

In [38]:
# Load imdb dataset as pandas dataframe (<1 sec)
dataset_df = pd.read_csv(dataset_path)
dataset_df.head()


Unnamed: 0,sentiment,review
0,0,Working with one of the best Shakespeare sourc...
1,0,"Well...tremors I, the original started off in ..."
2,0,Ouch! This one was a bit painful to sit throug...
3,0,"I've seen some crappy movies in my life, but t..."
4,0,"""Carriers"" follows the exploits of two guys an..."


In [39]:
# filter dataset (keep reviews)
reviews_dataset_df = dataset_df['review']
reviews_dataset_df.head()

0    Working with one of the best Shakespeare sourc...
1    Well...tremors I, the original started off in ...
2    Ouch! This one was a bit painful to sit throug...
3    I've seen some crappy movies in my life, but t...
4    "Carriers" follows the exploits of two guys an...
Name: review, dtype: object

In [41]:
# create spark dataframe (2 sec)
dataset_spark = sparkSession.createDataFrame(data=reviews_dataset_df, schema=StringType())
dataset_spark.show()

+--------------------+
|               value|
+--------------------+
|Working with one ...|
|Well...tremors I,...|
|Ouch! This one wa...|
|I've seen some cr...|
|"Carriers" follow...|
|I had been lookin...|
|Effect(s) without...|
|This picture star...|
|I chose to see th...|
|This film has to ...|
|I felt brain dead...|
|A young scientist...|
|Inept, boring, an...|
|From the first ti...|
|I find it hard to...|
|I actually saw Ch...|
|I went to school ...|
|I haven't seen th...|
|I haven't seen an...|
|One would think t...|
+--------------------+
only showing top 20 rows



## Feature engineering: TF-IDF

In [None]:
from pyspark.mllib.feature import HashingTF, IDF

hashingTF = HashingTF(numFeatures=10000)
tf = hashingTF.transform(dataset_spark)

# While applying HashingTF only needs a single pass to the data, applying IDF needs two passes:
# First to compute the IDF vector and second to scale the term frequencies by IDF.
tf.cache()
idf = IDF().fit(tf)
tfidf = idf.transform(tf)

# spark.mllib's IDF implementation provides an option for ignoring terms
# which occur in less than a minimum number of documents.
# In such cases, the IDF for these terms is set to 0.
# This feature can be used by passing the minDocFreq value to the IDF constructor.
idfIgnore = IDF(minDocFreq=2).fit(tf)
tfidfIgnore = idfIgnore.transform(tf)

In [None]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer

sentenceData = spark.createDataFrame([
    (0.0, "Hi I heard about Spark"),
    (0.0, "I wish Java could use case classes"),
    (1.0, "Logistic regression models are neat")
], ["label", "sentence"])

tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsData = tokenizer.transform(sentenceData)

hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(wordsData)
# alternatively, CountVectorizer can also be used to get term frequency vectors

idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

rescaledData.select("label", "features").show()

In [None]:
## References

https://towardsdatascience.com/multi-class-text-classification-with-pyspark-7d78d022ed35  