In [1]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType, DoubleType
from pyspark.sql.functions import col, udf

In [2]:
kindle_store = spark.read.parquet("hdfs:///data/kindle_store")

In [3]:
reviews = kindle_store[["reviewText","overall"]]

In [4]:
reviews.show(20)

+--------------------+-------+
|          reviewText|overall|
+--------------------+-------+
|I enjoy vintage b...|    5.0|
|This book is a re...|    4.0|
|This was a fairly...|    4.0|
|I'd never read an...|    5.0|
|If you like perio...|    4.0|
|A beautiful in-de...|    4.0|
|I enjoyed this on...|    4.0|
|Never heard of Am...|    4.0|
|Darth Maul workin...|    5.0|
|This is a short s...|    4.0|
|I think I have th...|    5.0|
|Title has nothing...|    4.0|
|Well written. Int...|    3.0|
|Troy Denning's no...|    3.0|
|I am not for sure...|    5.0|
|I really enjoyed ...|    5.0|
|Great read enjoye...|    5.0|
|Another well writ...|    3.0|
|This one promises...|    5.0|
|I have a version ...|    4.0|
+--------------------+-------+
only showing top 20 rows



In [6]:
sqlContext.registerDataFrameAsTable(reviews, "table2")
reviews1 = sqlContext.sql("SELECT reviewText, overall from table2 LIMIT 50000")

In [7]:
reviews1.show(20)

+--------------------+-------+
|          reviewText|overall|
+--------------------+-------+
|I enjoy vintage b...|    5.0|
|This book is a re...|    4.0|
|This was a fairly...|    4.0|
|I'd never read an...|    5.0|
|If you like perio...|    4.0|
|A beautiful in-de...|    4.0|
|I enjoyed this on...|    4.0|
|Never heard of Am...|    4.0|
|Darth Maul workin...|    5.0|
|This is a short s...|    4.0|
|I think I have th...|    5.0|
|Title has nothing...|    4.0|
|Well written. Int...|    3.0|
|Troy Denning's no...|    3.0|
|I am not for sure...|    5.0|
|I really enjoyed ...|    5.0|
|Great read enjoye...|    5.0|
|Another well writ...|    3.0|
|This one promises...|    5.0|
|I have a version ...|    4.0|
+--------------------+-------+
only showing top 20 rows



In [8]:
#positive->1
#negative->0
def transform(star):
    if star >=3.0:
        return 1.0
    else:
        return 0.0
transformer = udf(transform)

In [9]:
df = reviews1.withColumn("label", transformer(reviews['overall']))

In [10]:
sqlContext.registerDataFrameAsTable(df, "table1")
df2 = sqlContext.sql("SELECT reviewText, label from table1 WHERE reviewText != ''")

In [11]:
(training, test) = df2.randomSplit([0.9, 0.1])

In [12]:
training.show(12)

+--------------------+-----+
|          reviewText|label|
+--------------------+-----+
|!!!!??? (sorry co...|  1.0|
|"'Looking Through...|  1.0|
|"'Reflections at ...|  1.0|
|"*69" starts with...|  1.0|
|"... Couldn't you...|  1.0|
|"...This is the s...|  1.0|
|"...the world is ...|  1.0|
|"A Dundee Christm...|  1.0|
|"A Hard Man is Go...|  1.0|
|"A Hero for Jessi...|  1.0|
|"A Work in Progre...|  1.0|
|"A professional i...|  1.0|
+--------------------+-----+
only showing top 12 rows



In [13]:
regexTokenizer = RegexTokenizer(inputCol="reviewText", outputCol="words", pattern="\\W")
# alternatively, pattern="\\w+", gaps(False)
countTokens = udf(lambda words: len(words), IntegerType())

In [14]:
regexTokenized = regexTokenizer.transform(training)
regexTokenized.select("reviewText", "words") \
    .withColumn("tokens", countTokens(col("words"))).show(truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [15]:
regexTokenized.show(20)

+--------------------+-----+--------------------+
|          reviewText|label|               words|
+--------------------+-----+--------------------+
|!!!!??? (sorry co...|  1.0|[sorry, couldn, t...|
|"'Looking Through...|  1.0|[looking, through...|
|"'Reflections at ...|  1.0|[reflections, at,...|
|"*69" starts with...|  1.0|[69, starts, with...|
|"... Couldn't you...|  1.0|[couldn, t, you, ...|
|"...This is the s...|  1.0|[this, is, the, s...|
|"...the world is ...|  1.0|[the, world, is, ...|
|"A Dundee Christm...|  1.0|[a, dundee, chris...|
|"A Hard Man is Go...|  1.0|[a, hard, man, is...|
|"A Hero for Jessi...|  1.0|[a, hero, for, je...|
|"A Work in Progre...|  1.0|[a, work, in, pro...|
|"A professional i...|  1.0|[a, professional,...|
|"Abused" is a sho...|  1.0|[abused, is, a, s...|
|"After the Coup" ...|  1.0|[after, the, coup...|
|"After the Night"...|  1.0|[after, the, nigh...|
|"America's Forgot...|  1.0|[america, s, forg...|
|"Angel Be Good" i...|  1.0|[angel, be, good,...|


In [16]:
htf = HashingTF(inputCol="words", outputCol="features")
tf = htf.transform(regexTokenized)
train = tf[["label","features"]]

In [17]:
types = [f.dataType for f in train.schema.fields]
types

[StringType, VectorUDT]

In [18]:
train2 = train.withColumn("label",train["label"].cast(DoubleType()))
types = [f.dataType for f in train2.schema.fields]
types

[DoubleType, VectorUDT]

In [19]:
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
pipeline = Pipeline(stages=[lr])
model = pipeline.fit(train2)

In [21]:
test.show()

+--------------------+-----+
|          reviewText|label|
+--------------------+-----+
|"A Secret in the ...|  1.0|
|"Alaska be Damned...|  1.0|
|"Am I real," is t...|  1.0|
|"And when you do,...|  1.0|
|"Ashes" is collec...|  1.0|
|"Deadly Medicine"...|  1.0|
|"Guardian of the ...|  1.0|
|"I don't want to ...|  1.0|
|"I will agree to ...|  1.0|
|"Reluctant" fits ...|  1.0|
|"SPOILERS"I just ...|  1.0|
|"The Scent of Ros...|  0.0|
|"To Conquer the H...|  0.0|
|"Under the Eagle"...|  1.0|
|"Write Good or Di...|  1.0|
|#1 in the new Kni...|  1.0|
|&#34;His Holiday ...|  1.0|
|&#34;Humorous Mys...|  0.0|
|&#34;The Lost Ark...|  1.0|
|(4.5 star Top Pic...|  1.0|
+--------------------+-----+
only showing top 20 rows



In [22]:
regexTokenized_test = regexTokenizer.transform(test)
regexTokenized.select("reviewText", "words") \
    .withColumn("tokens", countTokens(col("words"))).show(truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [27]:
tf_test = htf.transform(regexTokenized_test)
testSet = tf_test[["label","features"]]
testSet = testSet.withColumn("label",testSet["label"].cast(DoubleType()))

In [28]:
prediction = model.transform(testSet)

In [29]:
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(prediction)
print("Test Error = %g " % (1.0 - accuracy))

Test Error = 0.0984743 
