### Libraries Dependency

In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, HiveContext

### Creating Spark context

In [2]:
SparkContext.setSystemProperty("spark.executor.memory", "4g")
sc = SparkContext('local[1]')
hc = HiveContext(sc)

In [3]:
sc._conf.getAll()

[('spark.driver.port', '41895'),
 ('spark.sql.catalogImplementation', 'hive'),
 ('spark.app.id', 'local-1558708349624'),
 ('spark.rdd.compress', 'True'),
 ('spark.executor.memory', '4g'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.driver.host', '10.0.2.15'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.master', 'local[1]'),
 ('spark.app.name', 'pyspark-shell')]

### Read a table from Hive

In [4]:
hc.sql('use project')
df = hc.sql('select * from tweet_orc where line_number is not null')
df.show(10)

+-----------+--------------------+-----+
|line_number|                text|label|
+-----------+--------------------+-----+
|          0|awww that bummer ...|    0|
|          1|is upset that he ...|    0|
|          2|dived many times ...|    0|
|          3|my whole body fee...|    0|
|          4|no it not behavin...|    0|
|          5|  not the whole crew|    0|
|          6|            need hug|    0|
|          7|hey long time no ...|    0|
|          8|nope they did not...|    0|
|          9|        que me muera|    0|
+-----------+--------------------+-----+
only showing top 10 rows



In [5]:
df.printSchema()

root
 |-- line_number: integer (nullable = true)
 |-- text: string (nullable = true)
 |-- label: integer (nullable = true)



In [6]:
df.select("line_number").show(10)

+-----------+
|line_number|
+-----------+
|          0|
|          1|
|          2|
|          3|
|          4|
|          5|
|          6|
|          7|
|          8|
|          9|
+-----------+
only showing top 10 rows



In [7]:
type(df)

pyspark.sql.dataframe.DataFrame

### drop nan

In [8]:
#df = df.dropna()
df.count()

3200000

### split dataset

In [9]:
(train_set, val_set, test_set) = df.randomSplit([0.98, 0.01, 0.01], seed = 2000)

### Logistic Regression with TFIDF

In [10]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, CountVectorizer
from pyspark.ml.feature import StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [13]:
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashtf = HashingTF(numFeatures=2**16, inputCol="words", outputCol='tf')
#minDocFreq: remove sparse terms
idf = IDF(inputCol='tf', outputCol="features", minDocFreq=5) 
label_stringIdx = StringIndexer(inputCol = "label", outputCol = "class")
pipeline = Pipeline(stages=[tokenizer, hashtf, idf, label_stringIdx])

pipelineFit = pipeline.fit(train_set)
train_df = pipelineFit.transform(train_set)
val_df = pipelineFit.transform(val_set)
test_df = pipelineFit.transform(test_set)
train_df.show(5)

+-----------+--------------------+-----+--------------------+--------------------+--------------------+-----+
|line_number|                text|label|               words|                  tf|            features|class|
+-----------+--------------------+-----+--------------------+--------------------+--------------------+-----+
|          0|awww that bummer ...|    0|[awww, that, bumm...|(65536,[8436,8847...|(65536,[8436,8847...|  0.0|
|          0|awww that bummer ...|    0|[awww, that, bumm...|(65536,[8436,8847...|(65536,[8436,8847...|  0.0|
|          1|is upset that he ...|    0|[is, upset, that,...|(65536,[1444,2071...|(65536,[1444,2071...|  0.0|
|          1|is upset that he ...|    0|[is, upset, that,...|(65536,[1444,2071...|(65536,[1444,2071...|  0.0|
|          2|dived many times ...|    0|[dived, many, tim...|(65536,[2548,2888...|(65536,[2548,2888...|  0.0|
+-----------+--------------------+-----+--------------------+--------------------+--------------------+-----+
only showi

In [15]:
lr = LogisticRegression(maxIter=100)
lrModel = lr.fit(train_df)

predictions = lrModel.transform(val_df)
# evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
# evaluator.evaluate(predictions)
accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(val_set.count())
print("valication accuracy: ", accuracy)
predictions_test = lrModel.transform(test_df)
accuracy_test = predictions_test.filter(predictions_test.label == predictions_test.prediction).count() / float(test_set.count())
print("test accuracy: ", accuracy_test)

valication accuracy:  0.7975956284153005
test accuracy:  0.80287222676136


In [17]:
#evaluator.getMetricName()

### Logistic Regression with CountVectorizer and IDF

In [11]:
from pyspark.ml.feature import CountVectorizer

In [12]:
tokenizer = Tokenizer(inputCol="text", outputCol="words")
cv = CountVectorizer(vocabSize=2**16, inputCol="words", outputCol='cv')
#minDocFreq: remove sparse terms
idf = IDF(inputCol='cv', outputCol="features", minDocFreq=5) 
label_stringIdx = StringIndexer(inputCol = "label", outputCol = "class")
lr = LogisticRegression(maxIter=100)
pipeline = Pipeline(stages=[tokenizer, cv, idf, label_stringIdx, lr])
pipelineFit = pipeline.fit(train_set)

#train_df = pipelineFit.transform(train_set)
#val_df = pipelineFit.transform(val_set)
#test_df = pipelineFit.transform(test_set)
#train_df.show(5)

+-----------+--------------------+-----+--------------------+--------------------+--------------------+-----+--------------------+--------------------+----------+
|line_number|                text|label|               words|                  cv|            features|class|       rawPrediction|         probability|prediction|
+-----------+--------------------+-----+--------------------+--------------------+--------------------+-----+--------------------+--------------------+----------+
|          0|awww that bummer ...|    0|[awww, that, bumm...|(16384,[0,3,5,10,...|(16384,[0,3,5,10,...|  0.0|[3.68135290712708...|[0.97543001663847...|       0.0|
|          0|awww that bummer ...|    0|[awww, that, bumm...|(16384,[0,3,5,10,...|(16384,[0,3,5,10,...|  0.0|[3.68135290712708...|[0.97543001663847...|       0.0|
|          1|is upset that he ...|    0|[is, upset, that,...|(16384,[3,4,6,7,1...|(16384,[3,4,6,7,1...|  0.0|[4.06736991567062...|[0.98316587735562...|       0.0|
|          1|is upset 

In [16]:
predictions_val = pipelineFit.transform(val_set)
accuracy = predictions_val.filter(predictions_val.label == predictions_val.prediction).count() / float(val_set.count())
print("Validation Accuracy Score: {0:.4f}".format(accuracy))
#roc_auc = evaluator.evaluate(predictions)
#print "ROC-AUC: {0:.4f}".format(roc_auc)

predictions_t = pipelineFit.transform(test_set)
accuracy_test = predictions_t.filter(predictions_t.label == predictions_t.prediction).count() / float(test_set.count())
print("Test Accuracy Score: {0:.4f}".format(accuracy_test))

Validation Accuracy Score: 0.7990
Test Accuracy Score: 0.8027


### Logisitic Regression with N-gram

In [18]:
from pyspark.ml.feature import NGram

In [31]:
tokenizer = Tokenizer(inputCol="text", outputCol="words")
ngram = NGram(n=1, inputCol="words", outputCol="n_gram")
hashtf = HashingTF(numFeatures=2**16,inputCol="n_gram", outputCol="tf")
idf = IDF(inputCol='tf', outputCol="features", minDocFreq=5) 
label_stringIdx = StringIndexer(inputCol = "label", outputCol = "class")
lr = LogisticRegression(maxIter=100)
pipeline = Pipeline(stages=[tokenizer, ngram, hashtf, idf, label_stringIdx, lr])
pipelineFit = pipeline.fit(train_set)

Exception ignored in: <object repr() failed>
Traceback (most recent call last):
  File "/home/stephen/.local/lib/python3.6/site-packages/pyspark/ml/wrapper.py", line 40, in __del__
    if SparkContext._active_spark_context and self._java_obj is not None:
AttributeError: 'HashingTF' object has no attribute '_java_obj'


In [34]:
predictions_val = pipelineFit.transform(val_set)
accuracy = predictions_val.filter(predictions_val.label == predictions_val.prediction).count() / float(val_set.count())
print("Validation Accuracy Score: {0:.4f}".format(accuracy))

predictions_t = pipelineFit.transform(test_set)
accuracy_test = predictions_t.filter(predictions_t.label == predictions_t.prediction).count() / float(test_set.count())
print("Test Accuracy Score: {0:.4f}".format(accuracy_test))

Validation Accuracy Score: 0.7976
Test Accuracy Score: 0.8029
