In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
# INSTALLING SPARK

!apt-get install openjdk-8-jdk-headless -qq > /dev/null

!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz

!tar xf spark-3.1.2-bin-hadoop3.2.tgz

!pip install -q findspark
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 8.1 kB/s 
[?25hCollecting py4j==0.10.9.2
  Downloading py4j-0.10.9.2-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 15.0 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.0-py2.py3-none-any.whl size=281805912 sha256=2db9b3a45c006e57546dcdbfb7f5a586e7c5bd5a1a4d8dfb162818e5eb006ec8
  Stored in directory: /root/.cache/pip/wheels/0b/de/d2/9be5d59d7331c6c2a7c1b6d1a4f463ce107332b1ecd4e80718
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.2 pyspark-3.2.0


In [3]:
# SETTING SPARK ENV PATH

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop3.2"

In [4]:
# FINDING SPARK IN SYSTEM

import findspark
findspark.init()

findspark.find()

'/content/spark-3.1.2-bin-hadoop3.2'

In [5]:
# STARTING SPARK SESSION

from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

In [6]:
spark

In [7]:
import pyspark

# train_file = spark.read.csv('/content/drive/MyDrive/Colab Notebooks/train.csv', header=True)
train_file = spark.read.csv('/content/drive/MyDrive/BigDataProject/Dataset/train.csv', header = True)
train_file.printSchema

<bound method DataFrame.printSchema of DataFrame[Sentiment: string, Tweet: string]>

In [8]:
num_rows = train_file.count()

train_file = train_file.dropDuplicates().dropna()

num_rows_noNull = train_file.count()

print(num_rows, num_rows_noNull)

1519999 1504818


In [43]:
# DATA CLEANING
import re
l = []
from pyspark.sql.functions import regexp_replace
from pyspark.sql.functions import *

# REGEX TO REMOVE @ AND USERNAME TAGGED
reg_df = train_file.withColumn("Tweet",regexp_replace("Tweet","@[A-Za-z0-9]+","")) 
# REGEX TO REMOVE TRAILING AND LEADING MULTIPLE SPACES
reg_df1 = reg_df.withColumn("Tweet",trim(col("Tweet")))
#REGEX TO REMOVE URL - UNNECESSARY
reg_df2 = reg_df1.withColumn("Tweet",regexp_replace("Tweet","https?://[A-Za-z0-9./]+",""))
reg_df3 = reg_df2.withColumn("Tweet",regexp_replace("Tweet","[^a-zA-Z0-9 ]"," ")) 

reg_df4 = reg_df3.withColumn("Tweet",regexp_replace("Tweet","[^\w\s]",""))

train_file = reg_df4

In [44]:
# HASHING + IDF + TOKENISER -> LOGISTIC REGRESSION
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

tokenizer = Tokenizer(inputCol="Tweet", outputCol="words")
hashtf = HashingTF(numFeatures=2**16, inputCol="words", outputCol='tf')

idf = IDF(inputCol='tf', outputCol="features", minDocFreq=5)
label_stringIdx = StringIndexer(inputCol = "Sentiment", outputCol = "label")
pipeline = Pipeline(stages=[tokenizer, hashtf, idf, label_stringIdx])

(train_set, val_set) = train_file.randomSplit([0.98, 0.02], seed = 2000)

pipelineFit = pipeline.fit(train_set)
train_df = pipelineFit.transform(train_set)
val_df = pipelineFit.transform(val_set)

lr = LogisticRegression(maxIter=100)
lrModel = lr.fit(train_df)
predictions = lrModel.transform(val_df)

evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
evaluator.evaluate(predictions)

0.8562897155879056

In [45]:
# VECTOR ASSEMBLER

inputCols = [row[0] for row in train_set.select('Tweet').collect()]

from pyspark.ml.feature import VectorAssembler
vecAssembler = VectorAssembler(inputCols = inputCols, outputCol = "features").setHandleInvalid("skip")

In [46]:
# STANDARD SCALER

from pyspark.ml.feature import StandardScaler
stdScaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=False)

In [47]:
from pyspark.ml.classification import LinearSVC
lsvc = LinearSVC(maxIter=10, regParam=0.1, featuresCol="scaledFeatures", labelCol='Sentiment')

from pyspark.ml import Pipeline
pipeline_lsvc = Pipeline(stages=[vecAssembler, stdScaler, lsvc])
pipelineModel_lsvc = pipeline_lsvc.fit(train_set)

KeyboardInterrupt: ignored

In [49]:
from pyspark.ml.feature import CountVectorizer

tokenizer = Tokenizer(inputCol="Tweet", outputCol="words")
cv = CountVectorizer(vocabSize=2**16, inputCol="words", outputCol='cv')
idf = IDF(inputCol='cv', outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
label_stringIdx = StringIndexer(inputCol = "Tweet", outputCol = "label")
lr = LogisticRegression(maxIter=100)
pipeline = Pipeline(stages=[tokenizer, cv, idf, label_stringIdx, lr])

pipelineFit = pipeline.fit(train_set)
predictions = pipelineFit.transform(val_set)
accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(val_set.count())
roc_auc = evaluator.evaluate(predictions)

print(accuracy , roc_auc)

Py4JJavaError: ignored

In [None]:
predDF_lsvc = pipelineModel_lsvc.transform(test_set)
lr_accuracy = evaluator.evaluate(predDF_lsvc)

print(lr_accuracy)

NameError: ignored

In [None]:
train_set[train_set.Sentiment == 4].head(10)

[Row(Sentiment='4', Tweet=' - Iowa No. 2 in happy! Yea!'),
 Row(Sentiment='4', Tweet=" @kuttyedathi&gt; My 2 yr old boy is sleeping and the music plays ' if u r happy &amp; u know it clap ur hands ' and he is clapping!"),
 Row(Sentiment='4', Tweet=' Hows everybody.?'),
 Row(Sentiment='4', Tweet=' SMILING FACES.'),
 Row(Sentiment='4', Tweet=' Welcome to http://twitter.com/katuuu'),
 Row(Sentiment='4', Tweet=' had a good night, love my best buds in the world!!! adamcheeeserosie'),
 Row(Sentiment='4', Tweet=" hihooo(:  but,, i think i'm gonna be on my granda's house :3 haha  til late so... i'll be here few minutes :O"),
 Row(Sentiment='4', Tweet=' monday monday'),
 Row(Sentiment='4', Tweet=' smiling.everyone should try it.'),
 Row(Sentiment='4', Tweet='#Follow @MaryKateOlsen9 and @ashleyolsen7 love them so much u guys are my idols ')]

In [None]:
train_set[train_set.Sentiment == 0].head(10)

[Row(Sentiment='0', Tweet='      I must think about positive..'),
 Row(Sentiment='0', Tweet="  ''Love, save the empty''"),
 Row(Sentiment='0', Tweet='  hi nia im bored'),
 Row(Sentiment='0', Tweet=" #asylm J2 panel is over. Guess it's back to normal life."),
 Row(Sentiment='0', Tweet=" Alone in my room...again.. I'm bored.. "),
 Row(Sentiment='0', Tweet=" I'D RATHER BE IN THE  BAHAMAS!"),
 Row(Sentiment='0', Tweet=" I'll get home like 5pm today it will be a long day no hangover just my body is drain out n my legs hurt ;-( I'm dehrydrated"),
 Row(Sentiment='0', Tweet=' Mammoth cave here I come '),
 Row(Sentiment='0', Tweet=" My moodswings, nobody's online, meh.")]

In [None]:
from pyspark.ml.feature import NGram, VectorAssembler, CountVectorizer
from pyspark.ml.feature import ChiSqSelector

def build_trigrams(inputCol=["Tweet","target"], n=3):
    tokenizer = [Tokenizer(inputCol="Tweet", outputCol="words")]
    ngrams = [
        NGram(n=i, inputCol="words", outputCol="{0}_grams".format(i))
        for i in range(1, n + 1)
    ]

    cv = [
        CountVectorizer(vocabSize=2**14,inputCol="{0}_grams".format(i),
            outputCol="{0}_tf".format(i))
        for i in range(1, n + 1)
    ]
    idf = [IDF(inputCol="{0}_tf".format(i), outputCol="{0}_tfidf".format(i), minDocFreq=5) for i in range(1, n + 1)]

    assembler = [VectorAssembler(
        inputCols=["{0}_tfidf".format(i) for i in range(1, n + 1)],
        outputCol="rawFeatures"
    )]
    label_stringIdx = [StringIndexer(inputCol = "Tweet", outputCol = "label")]
    selector = [ChiSqSelector(numTopFeatures=2**14,featuresCol='rawFeatures', outputCol="features")]
    lr = [LogisticRegression(maxIter=100)]
    return Pipeline(stages=tokenizer + ngrams + cv + idf+ assembler + label_stringIdx+selector+lr)

trigram_pipelineFit = build_trigrams().fit(train_set)
predictions = trigram_pipelineFit.transform(val_set)
accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(dev_set.count())
roc_auc = evaluator.evaluate(predictions)

# print accuracy, roc_auc
print (accuracy)
print (roc_auc)

Py4JJavaError: ignored

In [None]:
from pyspark.ml.feature import CountVectorizer

tokenizer = Tokenizer(inputCol="Tweet", outputCol="words")
cv = CountVectorizer(vocabSize=2**16, inputCol="words", outputCol='cv')
idf = IDF(inputCol='cv', outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
label_stringIdx = StringIndexer(inputCol = "Tweet", outputCol = "label")
lr = LogisticRegression(maxIter=100)
pipeline = Pipeline(stages=[tokenizer, cv, idf, label_stringIdx, lr])

pipelineFit = pipeline.fit(train_set)
predictions = pipelineFit.transform(val_set)
accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(val_set.count())
roc_auc = evaluator.evaluate(predictions)
print(accuracy)
print(roc_auc)

Py4JJavaError: ignored