In [1]:
import os

In [2]:
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages JohnSnowLabs:spark-nlp:1.2.3 pyspark-shell'

In [3]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline, PipelineModel

In [4]:
sc = pyspark.SparkContext()
spark = SparkSession(sc)

In [5]:
labeled_data = spark.read.option("header", "true").option("encoding", "UTF-8").csv("trumptweet-mod.csv")

# Filter out retweets?

In [6]:
labeled_data.head(25)

# retweeted='FALSE'

[Row(X.1='1', X='4', text='RT @GOPBlackChick: Illegals must be deported, said @realDonaldTrump Glad somebody has the guts to use the D-word! https://t.co/y15YuRIE59', retweet_count='26', favorited='FALSE', truncated='FALSE', id_str='6.33E+017', in_reply_to_screen_name='NA', source='"<a href=""http://www.tweetcaster.com"" rel=""nofollow"">TweetCaster for Android</a>"', retweeted='FALSE', created_at='Mon Aug 17 12:22:27 +0000 2015', in_reply_to_status_id_str='NA', in_reply_to_user_id_str='NA', lang='en', listed_count='46', verified='FALSE', location='Gotham City', user_id_str='191986903', description='Do I look like Batman to you?', geo_enabled='FALSE', user_created_at='Fri Sep 17 21:55:51 +0000 2010', statuses_count='138514', followers_count='881', favourites_count='155', protected='FALSE', user_url='NA', name=' Red Hood ', time_zone='Eastern Time (US & Canada)', user_lang='en', utc_offset='-14400', friends_count='927', screen_name='Blaze_in_3D', country_code='NA', country='NA', place_t

In [7]:
# Normalize tweets
from pyspark.sql.functions import regexp_replace
from pyspark.sql.functions import trim

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

user_regex = r"@\S+"
url_regex = r"http[s]?://\S+"
hashtag_regex = r"#\S+"
space_regex = r"\s{2,}|\n"

#import emot
#def strip_emo(text):
#    for data in emot.emoji(text):
#        text = text.replace(data['value'], '')   
#    for data in emot.emoticons(text):
#        text = text.replace(data['value'], '')
#    return text
#
#strip_emo_udf = udf(strip_emo, StringType())

rt_regex = r"(?=\s?)(RT)(?=\s?)"
user_regex = r"@\S+"
url_regex = r"http[s]?:\/\/\S+"
hashtag_regex = r"#\S+"
space_regex = r"\s{2,}|\n"

# Remove RT, users (@foo), URLs, and duplicate space
uber_regex =  "|".join([rt_regex, user_regex, url_regex, hashtag_regex, space_regex])

#labeled_data = labeled_data.withColumn("norm_text", trim(strip_emo_udf(regexp_replace("text", uber_regex, ""))))
labeled_data = labeled_data.withColumn("norm_text", trim(regexp_replace("text", uber_regex, "")))
# TODO: At this point write the data to filesystem rather than above since cleaned?
# Maybe do cleaning across all of data first?

In [8]:
labeled_data.head(25)

[Row(X.1='1', X='4', text='RT @GOPBlackChick: Illegals must be deported, said @realDonaldTrump Glad somebody has the guts to use the D-word! https://t.co/y15YuRIE59', retweet_count='26', favorited='FALSE', truncated='FALSE', id_str='6.33E+017', in_reply_to_screen_name='NA', source='"<a href=""http://www.tweetcaster.com"" rel=""nofollow"">TweetCaster for Android</a>"', retweeted='FALSE', created_at='Mon Aug 17 12:22:27 +0000 2015', in_reply_to_status_id_str='NA', in_reply_to_user_id_str='NA', lang='en', listed_count='46', verified='FALSE', location='Gotham City', user_id_str='191986903', description='Do I look like Batman to you?', geo_enabled='FALSE', user_created_at='Fri Sep 17 21:55:51 +0000 2010', statuses_count='138514', followers_count='881', favourites_count='155', protected='FALSE', user_url='NA', name=' Red Hood ', time_zone='Eastern Time (US & Canada)', user_lang='en', utc_offset='-14400', friends_count='927', screen_name='Blaze_in_3D', country_code='NA', country='NA', place_t

In [9]:
# break up into separate parts
#train_data, test_data, validation_data = labeled_data.randomSplit([0.6, 0.2, 0.2], seed=71082)

#public DataFrame sample(boolean withReplacement,
#               double fraction,
#               long seed)
seed = 71082
train_data = labeled_data.sample(False, 0.6, seed)
test_data = labeled_data.sample(False, 0.2, seed)
validation_data = labeled_data.sample(False, 0.2, seed)

# Note: This wouldn't work in a cluster
def write_df(df, dirname, filename):
    Path = sc._gateway.jvm.org.apache.hadoop.fs.Path
    FileSystem = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
    Configuration = sc._gateway.jvm.org.apache.hadoop.conf.Configuration

    tmp_name = filename + ".tmp"
    df.coalesce(1).write.mode('overwrite').option("encoding", "UTF-8").text(tmp_name)
    
    fs = FileSystem.get(Configuration())
    fs.mkdirs(Path(dirname))
    # Assume one file output
    file = fs.globStatus(Path(tmp_name + "/*.txt"))[0].getPath();
    fs.rename(file, Path(dirname + "/" + filename));
    fs.delete(Path(tmp_name), True);

# split training data into positive/negative
positive_data = train_data.filter(train_data.Class == "1").select("norm_text")
write_df(positive_data, "trumptweet/positive", "1.txt")
#positive_data.write.mode("overwrite").text("trumptweet/positive")

negative_data = labeled_data.filter(train_data.Class == "0").select("norm_text")
write_df(negative_data, "trumptweet/negative", "1.txt")
#negative_data.write.mode("overwrite").text("trumptweet/negative")


In [10]:
test_data.head(25)
#labeled_data.head(25)

[Row(X.1='2', X='6', text='@CNN is there any other news than Trump in the whole world going on? News, please!', retweet_count='0', favorited='FALSE', truncated='FALSE', id_str='6.33E+017', in_reply_to_screen_name='CNN', source='"<a href=""http://twitter.com/download/android"" rel=""nofollow"">Twitter for Android</a>"', retweeted='FALSE', created_at='Mon Aug 17 12:22:29 +0000 2015', in_reply_to_status_id_str='NA', in_reply_to_user_id_str='759251', lang='en', listed_count='12', verified='FALSE', location='Grand Rapids MI', user_id_str='18170798', description='Beyond Existing Enterprises LLC', geo_enabled=None, user_created_at=None, statuses_count=None, followers_count=None, favourites_count=None, protected=None, user_url=None, name=None, time_zone=None, user_lang=None, utc_offset=None, friends_count=None, screen_name=None, country_code=None, country=None, place_type=None, full_name=None, place_name=None, place_id=None, place_lat=None, place_lon=None, lat=None, lon=None, expanded_url=None

In [11]:
from sparknlp.annotator import *
from sparknlp.base import DocumentAssembler, Finisher

In [12]:
### Define the dataframe
document_assembler = DocumentAssembler() \
            .setInputCol("norm_text")
    
sentence_detector = SentenceDetectorModel() \
    .setInputCols(["document"]) \
    .setOutputCol("sentence")

tokenizer = RegexTokenizer() \
            .setInputCols(["sentence"]) \
            .setOutputCol("token")
        
normalizer = Normalizer() \
            .setInputCols(["token"]) \
            .setOutputCol("normal")        
        
spell_checker = NorvigSweetingApproach() \
            .setInputCols(["normal"]) \
            .setOutputCol("spell")
        
sentiment_detector = ViveknSentimentApproach() \
    .setInputCols(["spell", "sentence"]) \
    .setOutputCol("sentiment") \
    .setPositiveSource("vivekn/positive") \
    .setNegativeSource("vivekn/negative") \
    .setPruneCorpus(False) # when training on small data you may want to disable this to not cut off infrequent words
    
finisher = Finisher() \
    .setInputCols(["sentiment"]) \
    .setIncludeKeys(True) \
    ##.setCleanAnnotations(False)
    
pipeline = Pipeline(stages=[
    document_assembler,
    sentence_detector,
    tokenizer,
    normalizer,
    spell_checker,
    sentiment_detector,
    finisher
])

sentiment_data = pipeline.fit(train_data).transform(train_data)    

In [13]:
#sentiment_data.write.mode("overwrite").option("header", "true").csv("sentiment_data.csv")

sentiment_data.show()

+--------------------+---+--------------------+--------------------+---------+--------------+--------------------+-----------------------+--------------------+--------------------+--------------------+-------------------------+-----------------------+-----+------------+----------+--------------------+-----------+--------------------+--------------------+--------------------+--------------------+---------------+----------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------+-------------+-------------+------------+-------------+----------+--------------+----------+--------+----------+------------+--------------------+--------------------+--------------------+--------------------+-----+--------------------+--------------------+
|                 X.1|  X|                text|       retweet_count|favorited|     truncated|              id_str|in_reply_to_screen_name|              source|           retweeted|          cre

In [14]:
from pyspark.sql.types import IntegerType

def round_up(d):
    return round(d + .01)

def calc_mean(s):
    if s is not None:
        ls = list(map(lambda x: 1 if (x == "result->positive") else 0, s.split("@")))
        return 0 if (len(ls) == 0) else round_up(sum(ls) / len(ls))
    else:
        return 0

calc_mean_udf = udf(calc_mean, IntegerType())

sentiment_data = sentiment_data.withColumn("mean_sentiment", calc_mean_udf("finished_sentiment"))
#sentiment_data.take(20)

In [15]:
sentiment_data.head()

Py4JJavaError: An error occurred while calling o205.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 7.0 failed 1 times, most recent failure: Lost task 0.0 in stage 7.0 (TID 7, localhost, executor driver): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$dfAssembleNoExtras$1: (string) => array<struct<annotatorType:string,begin:int,end:int,result:string,metadata:map<string,string>>>)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection.apply_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection.apply(Unknown Source)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1$$anonfun$6.apply(BatchEvalPythonExec.scala:124)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1$$anonfun$6.apply(BatchEvalPythonExec.scala:122)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1076)
	at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:1091)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1128)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1132)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:509)
	at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:333)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1954)
	at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269)
Caused by: java.lang.NullPointerException
	at com.johnsnowlabs.nlp.DocumentAssembler.com$johnsnowlabs$nlp$DocumentAssembler$$assemble(DocumentAssembler.scala:52)
	at com.johnsnowlabs.nlp.DocumentAssembler$$anonfun$dfAssembleNoExtras$1.apply(DocumentAssembler.scala:72)
	at com.johnsnowlabs.nlp.DocumentAssembler$$anonfun$dfAssembleNoExtras$1.apply(DocumentAssembler.scala:71)
	... 16 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:336)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply$mcI$sp(Dataset.scala:2803)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2800)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2800)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
	at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2823)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:2800)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$dfAssembleNoExtras$1: (string) => array<struct<annotatorType:string,begin:int,end:int,result:string,metadata:map<string,string>>>)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection.apply_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection.apply(Unknown Source)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1$$anonfun$6.apply(BatchEvalPythonExec.scala:124)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1$$anonfun$6.apply(BatchEvalPythonExec.scala:122)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1076)
	at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:1091)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1128)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1132)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:509)
	at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:333)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1954)
	at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269)
Caused by: java.lang.NullPointerException
	at com.johnsnowlabs.nlp.DocumentAssembler.com$johnsnowlabs$nlp$DocumentAssembler$$assemble(DocumentAssembler.scala:52)
	at com.johnsnowlabs.nlp.DocumentAssembler$$anonfun$dfAssembleNoExtras$1.apply(DocumentAssembler.scala:72)
	at com.johnsnowlabs.nlp.DocumentAssembler$$anonfun$dfAssembleNoExtras$1.apply(DocumentAssembler.scala:71)
	... 16 more
