In [14]:
import findspark
findspark.init()
import pyspark as ps
import warnings
from pyspark.sql import SQLContext
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [15]:
try:
    sc = ps.SparkContext("local[*]",'test')
    sqlContext = SQLContext(sc)
    print("Just created a SparkContext")
except ValueError:
    warnings.warn("SparkContext already exists in this scope")

Just created a SparkContext


In [16]:
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('data/cleanedReviews.csv')
df = df.dropna()

In [5]:
(train_set, val_set) = df.randomSplit([0.9, 0.1], seed = 300)

In [6]:
def tf_idf(textCol,labelCol):
    tokenizer = Tokenizer(inputCol=textCol, outputCol="words")
    hashtf = HashingTF(numFeatures=1000, inputCol="words", outputCol='tf')
    idf = IDF(inputCol='tf', outputCol="features", minDocFreq=5)
    label_stringIdx = StringIndexer(inputCol = labelCol, outputCol = "label")
    lr = LogisticRegression(maxIter=100)
    pipeline = Pipeline(stages=[tokenizer, hashtf, idf, label_stringIdx, lr])
    return pipeline

def cv_idf(textCol,labelCol):
    tokenizer = Tokenizer(inputCol=textCol, outputCol="words")
    cv = CountVectorizer(vocabSize=1000, inputCol="words", outputCol='cv')
    idf = IDF(inputCol='cv', outputCol="features", minDocFreq=5)
    label_stringIdx = StringIndexer(inputCol = labelCol, outputCol = "label")
    lr = LogisticRegression(maxIter=100)
    pipeline = Pipeline(stages=[tokenizer, cv, idf, label_stringIdx, lr])
    return pipeline

In [7]:
%%time
pipelineFit = cv_idf("review","sentiment").fit(train_set)
predictions = pipelineFit.transform(val_set)
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(val_set.count())
roc_auc = evaluator.evaluate(predictions)

Wall time: 17 s


In [17]:
import pyspark.sql.functions as f
d=df.filter(f.col('sentiment')=='positive')
plot = d.withColumn('word', f.explode(f.split(f.col('review'), ' ')))\
    .groupBy('word')\
    .count()\
    .sort('count', ascending=False)
plot.show()
plot.count()

+-------+-----+
|   word|count|
+-------+-----+
|   film|49021|
|   movi|43129|
|    one|26958|
|   like|19885|
|   time|15480|
|    see|14687|
|   good|14573|
|  stori|13654|
|charact|13630|
|   make|13167|
|  great|12798|
|    get|12577|
|  watch|12507|
|   love|12064|
|   well|11309|
|   show|10813|
| realli|10676|
|   also|10550|
|  would|10368|
|   play| 9840|
+-------+-----+
only showing top 20 rows



108517

In [None]:
pandas_df = plot.toPandas().head(5)
pandas_df.set_index('word', inplace=True)
import matplotlib.pyplot as plt
pandas_df.plot(kind='bar',figsize=(12,6));

In [None]:
df.withColumn('count', f.size(f.split(f.col('review'), ' '))).agg({"count": "avg"}).show()
'''def get_keyval(row):
    text=row.text
    words=text.split(" ")
    return 
mapped_rdd = df.rdd.flatMap (lambda row: get_keyval (row))
counts_rdd = mapped_rdd.reduceByKey (add)
word_count = counts_rdd.collect ()'''

In [13]:
sc.stop()