#Twitter + Watson Tone Analyzer sample Notebook Part 1: Loading the data
In this Notebook, we show how to load the custom library generate as part of the Twitter + Watson Tone Analyzer streaming application. Code can be found here: https://github.com/ibm-cds-labs/spark.samples/tree/master/streaming-twitter.
The following code is using a pre-built jar has been posted on the Github project, but you can replace with your own url if needed.

In [1]:
%AddJar https://github.com/DTAIEB/demos/raw/master/streaming-twitter-assembly-1.6.jar -f

Starting download from https://github.com/DTAIEB/demos/raw/master/streaming-twitter-assembly-1.6.jar
Finished download of streaming-twitter-assembly-1.6.jar


##Set up the Twitter and Watson credentials
Please refer to the tutorial for details on how to find the Twitter and Watson credentials, then add the value in the placeholders specified in the code below

In [2]:
val demo = com.ibm.cds.spark.samples.StreamingTwitter
demo.setConfig("twitter4j.oauth.consumerKey","XXXX")
demo.setConfig("twitter4j.oauth.consumerSecret","XXXX")
demo.setConfig("twitter4j.oauth.accessToken","XXXX")
demo.setConfig("twitter4j.oauth.accessTokenSecret","XXXX")
demo.setConfig("watson.tone.url","https://gateway.watsonplatform.net/tone-analyzer-beta/api")
demo.setConfig("watson.tone.password","XXXX")
demo.setConfig("watson.tone.username","XXXX")

##Start the Spark Stream to collect live tweets
Start a new Twitter Stream that collects the live tweets and enrich them with Sentiment Analysis scores. The stream is run for a duration specified in the second argument of the **startTwitterStreaming** method.
Note: if no duration is specified then the stream will run until the **stopTwitterStreaming** method is called.

In [3]:
import org.apache.spark.streaming._
demo.startTwitterStreaming(sc, Seconds(40))

Twitter stream started
Tweets are collected real-time and analyzed
To stop the streaming and start interacting with the data use: StreamingTwitter.stopTwitterStreaming
Receiver Started: TwitterReceiver-0
Batch started with 139 records
Batch completed with 139 records
Batch started with 270 records
Stopping Twitter stream. Please wait this may take a while
Receiver Stopped: TwitterReceiver-0
Reason:  : Stopped by driver
Batch completed with 270 records
Twitter stream stopped
You can now create a sqlContext and DataFrame with 38 Tweets created. Sample usage: 
val (sqlContext, df) = com.ibm.cds.spark.samples.StreamingTwitter.createTwitterDataFrames(sc)
df.printSchema
sqlContext.sql("select author, text from tweets").show


##Create a SQLContext and a dataframe with all the tweets
Note: this method will register a SparkSQL table called tweets

In [4]:
val (sqlContext, df) = demo.createTwitterDataFrames(sc)

A new table named tweets with 38 records has been correctly created and can be accessed through the SQLContext variable
Here's the schema for tweets
root
 |-- author: string (nullable = true)
 |-- date: string (nullable = true)
 |-- lang: string (nullable = true)
 |-- text: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- Anger: double (nullable = true)
 |-- Disgust: double (nullable = true)
 |-- Fear: double (nullable = true)
 |-- Joy: double (nullable = true)
 |-- Sadness: double (nullable = true)
 |-- Analytical: double (nullable = true)
 |-- Confident: double (nullable = true)
 |-- Tentative: double (nullable = true)
 |-- Openness: double (nullable = true)
 |-- Conscientiousness: double (nullable = true)
 |-- Extraversion: double (nullable = true)
 |-- Agreeableness: double (nullable = true)
 |-- EmotionalRange: double (nullable = true)



##Execute a SparkSQL query that contains all the data

In [5]:
val fullSet = sqlContext.sql("select * from tweets")  //Select all columns
fullSet.show

+--------------------+--------------------+-----+--------------------+---+----+------------------+------------------+------------------+-----------------+------------------+----------+---------+-----------------+-----------------+------------------+-----------------+-----------------+-----------------+
|              author|                date| lang|                text|lat|long|             Anger|           Disgust|              Fear|              Joy|           Sadness|Analytical|Confident|        Tentative|         Openness| Conscientiousness|     Extraversion|    Agreeableness|   EmotionalRange|
+--------------------+--------------------+-----+--------------------+---+----+------------------+------------------+------------------+-----------------+------------------+----------+---------+-----------------+-----------------+------------------+-----------------+-----------------+-----------------+
|Three Words o Wisdom|Sun Mar 06 13:00:...|en-gb|wildebeest rebuff...|0.0| 0.0|         

##SparkSQL query example on the data.
Select all the tweets that have Anger score greated than 70%

In [6]:
val set = sqlContext.sql("select text from tweets where Anger > 60")
println(set.count)
set.show

0
+----+
|text|
+----+
+----+



##Persist the dataset into a parquet file on Object Storage service
The parquet file will be reloaded in IPython Part 2 Notebook
Note: you can disregard the warning messages related to SLF4J

In [7]:
fullSet.repartition(1).saveAsParquetFile("swift://notebooks.spark/tweetsFull.parquet")

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
