In [1]:
!pip install spark
!pip install pyspark
!pip install warnings

Collecting spark
  Downloading spark-0.2.1.tar.gz (41 kB)
[K     |████████████████████████████████| 41 kB 30 kB/s  eta 0:00:011
[?25hBuilding wheels for collected packages: spark
  Building wheel for spark (setup.py) ... [?25ldone
[?25h  Created wheel for spark: filename=spark-0.2.1-py3-none-any.whl size=58738 sha256=41b06bd55e34be11ded9d11555c8c17d8065eeca90c33c511d54181732e7f623
  Stored in directory: /root/.cache/pip/wheels/4e/0e/f1/164619f9920fb447d294afaae11a7715bd442ded7225953d72
Successfully built spark
Installing collected packages: spark
Successfully installed spark-0.2.1
Collecting pyspark
  Downloading pyspark-3.0.1.tar.gz (204.2 MB)
[K     |████████████████████████████████| 204.2 MB 27 kB/s s eta 0:00:01  |▌                               | 3.1 MB 403 kB/s eta 0:08:18
[?25hCollecting py4j==0.10.9
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 42.5 MB/s eta 0:00:01
[?25hBuilding wheels for collected packages: p

In [54]:
import pyspark as ps
import warnings
from pyspark.sql import SQLContext,SparkSession
from textblob import TextBlob
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import udf, col
import re

In [55]:
try:
    sc = ps.SparkContext('local[10]')
    sqlContext = SQLContext(sc)
    print("Just created a SparkContext")
except ValueError:
    warnings.warn("SparkContext already exists in this scope")

  


In [59]:
spark = SparkSession.builder.master("local[10]").appName("Twitter-Sentiment-Analysis").getOrCreate()
df=spark.read.csv(path="../input/sentiment140/training.1600000.processed.noemoticon.csv",header=False).cache()

df = df.select('_c5','_c0')

df.show()

+--------------------+---+
|                 _c5|_c0|
+--------------------+---+
|@switchfoot http:...|  0|
|is upset that he ...|  0|
|@Kenichan I dived...|  0|
|my whole body fee...|  0|
|@nationwideclass ...|  0|
|@Kwesidei not the...|  0|
|         Need a hug |  0|
|@LOLTrish hey  lo...|  0|
|@Tatiana_K nope t...|  0|
|@twittera que me ...|  0|
|spring break in p...|  0|
|I just re-pierced...|  0|
|@caregiving I cou...|  0|
|@octolinz16 It it...|  0|
|@smarrison i woul...|  0|
|@iamjazzyfizzle I...|  0|
|Hollis' death sce...|  0|
|about to file taxes |  0|
|@LettyA ahh ive a...|  0|
|@FakerPattyPattz ...|  0|
+--------------------+---+
only showing top 20 rows



In [60]:
@udf(returnType=StringType())
def clean_tweet(tweet): 
    return ' '.join(re.sub("(@[A-Za-z0-9]+)|([^0-9A-Za-z \t])|(\w+:\/\/\S+)", " ", tweet).split())

df = df.dropna()    
df=df.withColumn("cleantweet", clean_tweet(col("_c5")))
df=df.select('cleantweet','_c0')
df.show()

+--------------------+---+
|          cleantweet|_c0|
+--------------------+---+
|Awww that s a bum...|  0|
|is upset that he ...|  0|
|I dived many time...|  0|
|my whole body fee...|  0|
|no it s not behav...|  0|
|  not the whole crew|  0|
|          Need a hug|  0|
|hey long time no ...|  0|
|K nope they didn ...|  0|
|        que me muera|  0|
|spring break in p...|  0|
|I just re pierced...|  0|
|I couldn t bear t...|  0|
|It it counts idk ...|  0|
|i would ve been t...|  0|
|I wish I got to w...|  0|
|Hollis death scen...|  0|
| about to file taxes|  0|
|ahh ive always wa...|  0|
|Oh dear Were you ...|  0|
+--------------------+---+
only showing top 20 rows



In [57]:
df1=spark.read.csv(path="../input/sentiment140/training.1600000.processed.noemoticon.csv",header=False).cache().limit(1500000)
df2=spark.read.csv(path="../input/sentiment140/training.1600000.processed.noemoticon.csv",header=False).cache().limit(1550000)
df3=df.subtract(df1)
df4=df.subtract(df2)
df5=df3.subtract(df4)

In [58]:
df1
df1 = df1.select('_c5','_c0')
df1=df1.withColumn("cleantweet", clean_tweet(col("_c5")))
df1=df1.select('cleantweet','_c0')

df4
df4 = df4.select('_c5','_c0')
df4=df4.withColumn("cleantweet", clean_tweet(col("_c5")))
df4=df4.select('cleantweet','_c0')

df5
df5 = df5.select('_c5','_c0')
df5=df5.withColumn("cleantweet", clean_tweet(col("_c5")))
df5=df5.select('cleantweet','_c0')

train_set=df1
val_set=df4
test_set=df5

In [52]:
import time

start=time.time()

from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

tokenizer = Tokenizer(inputCol="cleantweet", outputCol="words")
hashtf = HashingTF(numFeatures=2**16, inputCol="words", outputCol='tf')
idf = IDF(inputCol='tf', outputCol="features", minDocFreq=5)
label_stringIdx = StringIndexer(inputCol = "_c0", outputCol = "label")
pipeline = Pipeline(stages=[tokenizer, hashtf, idf, label_stringIdx])

pipelineFit = pipeline.fit(train_set)
train_df = pipelineFit.transform(train_set)
val_df = pipelineFit.transform(val_set)
train_df.show(5)

end=time.time()

print("Total Time =",end-start)

KeyboardInterrupt: 

In [None]:
import time

start=time.time()

from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(maxIter=100)
lrModel = lr.fit(train_df)
predictions = lrModel.transform(val_df)

from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
evaluator.evaluate(predictions)

end=time.time()

print("Total Time =",end-start)

In [None]:
import time

start=time.time()

accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(val_set.count())
print(accuracy)

end=time.time()

print("Total Time =",end-start)

In [None]:
lrModel.save("./output")