In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!pip install pyspark

from IPython import display
import math
import pandas as pd
import numpy as np

from pyspark.sql import SQLContext
from pyspark import SparkContext

from pyspark.sql.types import *

Collecting pyspark
  Downloading pyspark-3.2.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 39 kB/s 
[?25hCollecting py4j==0.10.9.2
  Downloading py4j-0.10.9.2-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 54.0 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.0-py2.py3-none-any.whl size=281805912 sha256=e51447172ba06fd6fe9fcec3a9e237f1b874800bd7493cda3a7b45366c9dc5e1
  Stored in directory: /root/.cache/pip/wheels/0b/de/d2/9be5d59d7331c6c2a7c1b6d1a4f463ce107332b1ecd4e80718
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.2 pyspark-3.2.0


In [None]:
sc =SparkContext()
sqlContext = SQLContext(sc)



In [None]:
customSchema = StructType([
    StructField("clean_text", StringType()), 
    StructField("category", StringType())])

In [None]:
filename1 = '/content/drive/MyDrive/BDMA/tweets.csv'
filename2 = '/content/drive/MyDrive/BDMA/redt_dataset.csv'

In [None]:
df1 = sqlContext.read.format("csv").option("header", "true").schema(customSchema).load(filename1)
df1.count()

660

In [None]:
df2 = sqlContext.read.format("csv").option("header", "true").schema(customSchema).load(filename2)
df2.count()

1003

In [None]:
df = df1.union(df2)
df.count() 

1663

In [None]:
data = df.na.drop(how='any')
data.show(5)

+----------+--------------------+
|clean_text|            category|
+----------+--------------------+
|         0|stages zzzzz deni...|
|         1|              denial|
|   memes "|                  -1|
|         2|depression lasagn...|
|         3|november 2017 ist...|
+----------+--------------------+
only showing top 5 rows



In [None]:
data.printSchema()

root
 |-- clean_text: string (nullable = true)
 |-- category: string (nullable = true)



In [None]:
from pyspark.sql.functions import col

data.groupBy("category").count().orderBy(col("count").desc()).show()

+--------------------+-----+
|            category|count|
+--------------------+-----+
|                  -1|  485|
|                   1|  463|
|                   0|   66|
|              denial|    3|
|                   "|    2|
| wife decided dox...|    1|
|who knew all need...|    1|
|“fawning coverage...|    1|
|are psychedelics ...|    1|
|fatigue not only ...|    1|
|              yup … |    1|
|               yooo |    1|
|have you tried st...|    1|
|childhood emotion...|    1|
|the last time thi...|    1|
|who cooked depres...|    1|
|tired this crippl...|    1|
|went into chapter...|    1|
|may depression ma...|    1|
|bestie will take ...|    1|
+--------------------+-----+
only showing top 20 rows



*Model Pipeline*

In [None]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.classification import LogisticRegression

# regular expression tokenizer
regexTokenizer = RegexTokenizer(inputCol="clean_text", outputCol="words", pattern="\\W")

# stop words
add_stopwords = ["http","https","amp","rt","t","c","the"] 

stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(add_stopwords)

# bag of words count
countVectors = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=20000, minDF=5)

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
label_stringIdx = StringIndexer(inputCol = "category", outputCol = "label")

pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx])

# Fit the pipeline to training documents.
pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)
dataset.show(5)

+----------+--------------------+-------+--------+------------+-----+
|clean_text|            category|  words|filtered|    features|label|
+----------+--------------------+-------+--------+------------+-----+
|         0|stages zzzzz deni...|    [0]|     [0]|(1891,[],[])|371.0|
|         1|              denial|    [1]|     [1]|(1891,[],[])|  3.0|
|   memes "|                  -1|[memes]| [memes]|(1891,[],[])|  0.0|
|         2|depression lasagn...|    [2]|     [2]|(1891,[],[])|108.0|
|         3|november 2017 ist...|    [3]|     [3]|(1891,[],[])|281.0|
+----------+--------------------+-------+--------+------------+-----+
only showing top 5 rows



*Partition Training & Test sets & Model Training and Evaluation*

In [None]:
# set seed for reproducibility
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testData.count()))

Training Dataset Count: 1129
Test Dataset Count: 468


In [None]:
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
lrModel = lr.fit(trainingData)

predictions = lrModel.transform(testData)

predictions.filter(predictions['prediction'] == 0).select("clean_text","category","probability","label","prediction")\
.orderBy("probability", ascending=False).show(n = 10, truncate = 30)

+------------------------------+--------+------------------------------+-----+----------+
|                    clean_text|category|                   probability|label|prediction|
+------------------------------+--------+------------------------------+-----+----------+
|but the one thing you won t...|      -1|[0.9785939511902572,0.01983...|  0.0|       0.0|
| guys \ \ this whole paragr...|      -1|[0.9694005213887851,0.03059...|  0.0|       0.0|
| think dont wanna through t...|       1|[0.9582365451126503,0.04171...|  1.0|       0.0|
|first sorry for bad english...|      -1|[0.9551908926051615,0.04117...|  0.0|       0.0|
|hey guys guess the reasonin...|      -1|[0.9327499140656144,0.05774...|  0.0|       0.0|
| been out anxiety depressio...|      -1|[0.9257049025780745,0.07429...|  0.0|       0.0|
| dated this girl from feel ...|       1|[0.9215141026940283,0.07585...|  1.0|       0.0|
|’ tired but sad sleep ’ hun...|      -1|[0.8933334627480363,0.04098...|  0.0|       0.0|
| brother 

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.31388818182793443

In [None]:

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.3.2-bin-hadoop2.7"

In [None]:
from pyspark.ml.feature import HashingTF, IDF

hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=30000)
idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, hashingTF, idf, label_stringIdx])

pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)

(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
lrModel = lr.fit(trainingData)

predictions = lrModel.transform(testData)

predictions.filter(predictions['prediction'] == 0) \
    .select("clean_text","category","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

ConnectionRefusedError: ignored