In [0]:
from google.colab import drive
drive.mount('/content/drive')


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www-eu.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

In [0]:
import findspark
findspark.init()

In [0]:
from pyspark import SparkContext
from pyspark.sql import SparkSession

In [0]:
from pyspark import SparkConf

In [0]:
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType

In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.sql.functions import col

In [0]:
MAX_MEMORY = "10g"

spark = SparkSession \
    .builder \
    .appName("Foo") \
    .config("spark.executor.memory", MAX_MEMORY) \
    .config("spark.driver.memory", MAX_MEMORY) \
    .getOrCreate()

In [0]:
from pyspark.sql import SQLContext
sqlCtx = SQLContext(spark)

In [0]:
df = spark.read.csv("/content/drive/My Drive/a_r_headline_token.csv", header = "true")

In [0]:
df.count()

3105184

In [0]:
df.printSchema()

root
 |-- star_rating: string (nullable = true)
 |-- tokenized_headlines: string (nullable = true)



In [0]:
from pyspark.sql.functions import col, regexp_replace, split
df_mod= df.withColumn(
    "features",
    split(regexp_replace(col("tokenized_headlines"), r"(^\[)|(\]$)|(')", ""), ",")
)

In [0]:
df_mod.printSchema()

root
 |-- star_rating: string (nullable = true)
 |-- tokenized_headlines: string (nullable = true)
 |-- features: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [0]:
from pyspark.ml.feature import CountVectorizer
cv = CountVectorizer(inputCol="features", outputCol="tok_features")

In [0]:
model = cv.fit(df_mod)
#model = cv.fit(tokenize)
result = model.transform(df_mod)
#result = model.transform(tokenize)
result.select("tok_features").show(truncate=False)

+--------------------------------------------------------------------------------+
|tok_features                                                                    |
+--------------------------------------------------------------------------------+
|(262144,[1,5,19,30,53,59,337],[1.0,1.0,1.0,1.0,1.0,1.0,1.0])                    |
|(262144,[201,1625],[1.0,1.0])                                                   |
|(262144,[10,28,329],[1.0,1.0,1.0])                                              |
|(262144,[21,139,191,947],[1.0,1.0,1.0,1.0])                                     |
|(262144,[1782,8687,255750],[1.0,1.0,1.0])                                       |
|(262144,[72,113],[1.0,1.0])                                                     |
|(262144,[0,2,11,15,90,1687,9052,10269],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])       |
|(262144,[0,107,190,506,11363],[1.0,1.0,1.0,1.0,1.0])                            |
|(262144,[152,552],[1.0,1.0])                                                    |
|(26

In [0]:
from pyspark.sql.types import IntegerType
result = result.withColumn("star_rating", result["star_rating"].cast(IntegerType()))

In [0]:
result.show()

+-----------+--------------------+--------------------+--------------------+
|star_rating| tokenized_headlines|            features|        tok_features|
+-----------+--------------------+--------------------+--------------------+
|          4|['this', 'book', ...|[this,  book,  wa...|(262144,[1,5,19,3...|
|          3|    ['fun', 'fluff']|       [fun,  fluff]|(262144,[201,1625...|
|          4|"['this', 'is', "...|["[this,  is,  ""...|(262144,[10,28,32...|
|          5|['fine', 'author'...|[fine,  author,  ...|(262144,[21,139,1...|
|          4|['execellent', 'c...|[execellent,  cur...|(262144,[1782,868...|
|          4|['interesting', '...| [interesting,  fun]|(262144,[72,113],...|
|          4|"['i', 'viewed', ...|["[i,  viewed,  t...|(262144,[0,2,11,1...|
|          4|['no', 'frills', ...|[no,  frills,  ju...|(262144,[0,107,19...|
|          5|['simple', 'enter...|[simple,  enterta...|(262144,[152,552]...|
|          4|['a', 'good', 're...|   [a,  good,  read]|(262144,[3,9,18],...|

In [0]:
result.printSchema()

root
 |-- star_rating: integer (nullable = true)
 |-- tokenized_headlines: string (nullable = true)
 |-- features: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- tok_features: vector (nullable = true)



In [0]:
train, test = result.randomSplit([0.9, 0.1], seed=12345)

In [0]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(labelCol="star_rating", featuresCol="tok_features",maxIter=10)
model=lr.fit(train)
predict_train=model.transform(train)
#predict_test=model.transform(test)
predict_train.select("star_rating","prediction").show(10)

+-----------+----------+
|star_rating|prediction|
+-----------+----------+
|          1|       5.0|
|          1|       5.0|
|          1|       5.0|
|          1|       4.0|
|          1|       1.0|
|          1|       1.0|
|          1|       1.0|
|          1|       5.0|
|          1|       1.0|
|          1|       1.0|
+-----------+----------+
only showing top 10 rows



In [0]:
predict_test=model.transform(test)
#predict_test=model.transform(test)
predict_test.select("star_rating","prediction").show(10)

+-----------+----------+
|star_rating|prediction|
+-----------+----------+
|          1|       1.0|
|          1|       5.0|
|          1|       5.0|
|          1|       1.0|
|          1|       2.0|
|          1|       5.0|
|          1|       5.0|
|          1|       5.0|
|          1|       5.0|
|          1|       5.0|
+-----------+----------+
only showing top 10 rows



In [0]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="star_rating") 

In [0]:
print('Accuracy ', evaluator.evaluate(predict_train, 
                                      {evaluator.metricName: 'accuracy'}))

In [0]:
print('Accuracy ', evaluator.evaluate(predict_test, 
                                      {evaluator.metricName: 'accuracy'}))

Accuracy  0.6493410807819517
