# PySpark NLP

# Initializing SparkSession

In [181]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.sql import SparkSession

spark = SparkSession\
.builder\
.appName("Python Spark SQL basic example")\
.config("spark.some.config.option","some-value")\
.getOrCreate()

spark

# Reading Data

In [182]:
df = spark.read.format("csv").load("NLP_01.csv", header=True, inferSchema=True)

In [183]:
df.show()

+-----+--------------------+
|label|            sentence|
+-----+--------------------+
|    0|Hi I heard about ...|
|    0|I wish Java could...|
|    0|Logistic Regressi...|
+-----+--------------------+



# Tokenizer

In [184]:
tokenizer = Tokenizer(inputCol='sentence',outputCol='words')

In [185]:
wordsData = tokenizer.transform(df)
wordsData.show()

+-----+--------------------+--------------------+
|label|            sentence|               words|
+-----+--------------------+--------------------+
|    0|Hi I heard about ...|[hi, i, heard, ab...|
|    0|I wish Java could...|[i, wish, java, c...|
|    0|Logistic Regressi...|[logistic, regres...|
+-----+--------------------+--------------------+



In [186]:
wordsData.select('label','words').take(3)

[Row(label=0, words=['hi', 'i', 'heard', 'about', 'spark']),
 Row(label=0, words=['i', 'wish', 'java', 'could', 'use', 'case', 'classes']),
 Row(label=0, words=['logistic', 'regression', 'models', 'are', 'neat'])]

# HashingTF

In [187]:
hashingTF = HashingTF(inputCol='words',outputCol='rawFeatures',numFeatures=1000)

In [188]:
featurizedData = hashingTF.transform(wordsData)
featurizedData.show()

+-----+--------------------+--------------------+--------------------+
|label|            sentence|               words|         rawFeatures|
+-----+--------------------+--------------------+--------------------+
|    0|Hi I heard about ...|[hi, i, heard, ab...|(1000,[286,568,67...|
|    0|I wish Java could...|[i, wish, java, c...|(1000,[80,133,307...|
|    0|Logistic Regressi...|[logistic, regres...|(1000,[59,286,604...|
+-----+--------------------+--------------------+--------------------+



In [189]:
featurizedData.select('label','rawFeatures').take(3)

[Row(label=0, rawFeatures=SparseVector(1000, {286: 1.0, 568: 1.0, 673: 1.0, 756: 1.0, 956: 1.0})),
 Row(label=0, rawFeatures=SparseVector(1000, {80: 1.0, 133: 1.0, 307: 1.0, 342: 1.0, 495: 1.0, 756: 1.0, 967: 1.0})),
 Row(label=0, rawFeatures=SparseVector(1000, {59: 1.0, 286: 1.0, 604: 1.0, 763: 1.0, 871: 1.0}))]

# IDF

In [190]:
idf = IDF( inputCol='rawFeatures', outputCol='Features' )
idfModel = idf.fit(featurizedData)

In [191]:
rescaledData = idfModel.transform(featurizedData)
rescaledData.select('label','Features').show()

+-----+--------------------+
|label|            Features|
+-----+--------------------+
|    0|(1000,[286,568,67...|
|    0|(1000,[80,133,307...|
|    0|(1000,[59,286,604...|
+-----+--------------------+



In [192]:
rescaledData.select('label','Features').take(3)

[Row(label=0, Features=SparseVector(1000, {286: 0.2877, 568: 0.6931, 673: 0.6931, 756: 0.2877, 956: 0.6931})),
 Row(label=0, Features=SparseVector(1000, {80: 0.6931, 133: 0.6931, 307: 0.6931, 342: 0.6931, 495: 0.6931, 756: 0.2877, 967: 0.6931})),
 Row(label=0, Features=SparseVector(1000, {59: 0.6931, 286: 0.2877, 604: 0.6931, 763: 0.6931, 871: 0.6931}))]

# Stop

In [193]:
spark.stop()

#####   