<h1>init spark</h1>
<li>The driver_memory step is important since streaming apps take up a lot of memory</li>
<li>A few new jars:</li>
<ul>
    <li>twitter streaming api jar</li>
    <li>jfreechart jars for drawing charts</li>
</ul>

In [None]:
%%init_spark
launcher.num_executors = 4
launcher.executor_cores = 2
launcher.driver_memory = '10g'
launcher.packages= ["databricks:spark-corenlp:0.4.0-spark2.4-scala2.11", "org.apache.bahir:spark-streaming-twitter_2.11:2.4.0"]


<h2>CoreNLP imports</h2>

In [None]:
import java.util.Properties 
import scala.collection.JavaConverters._
import edu.stanford.nlp.ling.CoreAnnotations 
import edu.stanford.nlp.neural.rnn.RNNCoreAnnotations 
import edu.stanford.nlp.neural.rnn.RNNCoreAnnotations 
import edu.stanford.nlp.pipeline.{Annotation, StanfordCoreNLP}
import edu.stanford.nlp.sentiment.SentimentCoreAnnotations
import scala.collection.convert.wrapAll._
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.DataFrame



<h2>Parameters</h2>
<li><span style="color:blue">BATCH_SIZE</span>: The size of a micro batch</li>
<li><span style="color:blue">NUM_BATCHES</span>: The minumum amount of data you need to collect. Ideally, of course, this should run for ever!</li>
<li><span style="color:blue">WINDOW_LENGTH</span>: The size of a window. We're collecting data in small batches, a window in this instance is like constructing a moving average. Roughly, we're collecting a moving 5 minute moving average of data averaged every minute</li>
<li><span style="color:blue">SLIDE_DURATION</span>: The slide duration. Since we're reproducing a moving average, we'll keep this at the batch size</li>
<li><span style="color:blue">TWEET_WORDS</span>: The tweets we will examine. Each tweet in our moving average should contain at least one word from this list</li>


In [None]:
val BATCH_SIZE = 60
val NUM_BATCHES = 20
val WINDOW_LENGTH = 300
val SLIDE_DURATION = BATCH_SIZE
//val tweet_words = Array("aapl","apple","mac","ipad","iphone")
val TWEET_WORDS = Array("covid","corona","virus")

<h2>getSentiment</h2>
<li>A function that returns the sentiment given a piece of text</li>
<li>Get the sentence for each sentence in the text and divide by the number of sentences</li>
<li>(This is the stanford corenlp part!)
<li>Returns a Double</li>


<h2>Twitter keys</h2>
<li>Get twitter API keys <a href="https://developer.twitter.com/en/docs/basics/getting-started">Getting started with twitter API</a> (use the standard API)</li>
<li>Enter them below</li>
<li>Then assign them to various twitter4j objects</li>
<li>Finally, create the stream receiver</li>


<h3>Create a DStream with <span style="color:blue">(text,sentiment)</span> pairs</h3>
<li>the function, getText, returns the text of a tweet</li>
<li>Note: You will need to include the getSentiment function in this cell</li>


<h3>Filter the text_sentiment_array to include only tweets with words in our TWEET_WORDS array</h3>
<li>Also, throw away the text (i.e., return a DStream of Double)</li>
<li>Save this in an DStream of Double <span style="color:blue">sentiment_array</span></li>

<h3>Create and update an array that holds the moving average</h3>
<li>for each window, we will report the average sentiment in that window (all tweet sentiments/number of tweets) - note that this is not exactly a moving average in time terms</li>
<li>the main reason for doing this is that we may not get tweets in every window and then will have to deal with nans. Too complicated!</li>
<li>The method below is:
    <ul>
        <li>sentiment_window contains the sentiment of each tweet in the window</li>
        <li>using foreachRDD, and a function getAverages, calculate the average sentiment for that window</li>
        <li>getAverage should calculate (timestamp, sentiment) pair for each window<li>
<li>We also need to clean the timestamp. Convert it into a string, drop the "ms" from the end, and then drop everything other than last 7 digits</li>
<li><b>Note</b>: Bear in mind that while DStream objects do not persist, Scala objects, RDDs, etc. do persist. Once the stream stops, these non-DStream objects are still accessible in your program</li>

<h1>Create the chart</h1>
<li>We will create an xy graph</li>
<li><a href="http://www.jfree.org/jfreechart/api/javadoc/org/jfree/data/xy/XYSeries.html">http://www.jfree.org/jfreechart/api/javadoc/org/jfree/data/xy/XYSeries.html</a></li>
<li>x-axis contains the time stamp (last 7 digits) of the window. We need to convert this into an Int (x-axis is scaled)</li>
<li>y-axis contains the average sentiment of the window</li>

In [None]:
//All imports
import java.util.Properties 
import scala.collection.JavaConverters._
import edu.stanford.nlp.ling.CoreAnnotations 
import edu.stanford.nlp.neural.rnn.RNNCoreAnnotations 
import edu.stanford.nlp.neural.rnn.RNNCoreAnnotations 
import edu.stanford.nlp.pipeline.{Annotation, StanfordCoreNLP}
import edu.stanford.nlp.sentiment.SentimentCoreAnnotations
import scala.collection.convert.wrapAll._
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.DataFrame

//JFreeChart imports
import java.awt.Color 
import org.apache.log4j.{Level, Logger} 
import org.apache.spark.sql.SparkSession 
import org.jfree.chart.plot.{PlotOrientation, XYPlot} 
import org.jfree.chart.{ChartFactory, ChartFrame, JFreeChart, ChartUtilities} 
import org.jfree.data.xy.{XYSeries, XYSeriesCollection} 
import scala.util.Random 



//All parameters
val BATCH_SIZE = 60
val NUM_BATCHES = 20
val WINDOW_LENGTH = 300
val SLIDE_DURATION = 60

//getSentiment function

def getSentiment(text: String): Double = {
    val props = new Properties()
    props.setProperty("annotators", "tokenize, ssplit, pos, parse, sentiment")
    val pipeline: StanfordCoreNLP = new StanfordCoreNLP(props)


    val annotation: Annotation = pipeline.process(text)
    val sentences = annotation.get(classOf[CoreAnnotations.SentencesAnnotation])
    val score = sentences.map { sent => 
        val tree = sent.get(classOf[SentimentCoreAnnotations.SentimentAnnotatedTree])
        val score = RNNCoreAnnotations.getPredictedClass(tree)
        score
    }
    score.toArray.sum.toDouble/score.size
}

//val words = Array("covid","corona","virus")
val words = Array("aapl","apple","mac","ipad","iphone")

//filterData function
def filterData(text: String,words: Array[String]): Boolean =
    words.exists(text.contains)

//Twitter API keys

val CONSUMER_KEY = 
val CONSUMER_SECRET = 
val ACCESS_TOKEN = 
val ACCESS_TOKEN_SECRET = 

//Twitter API keys attached to twitter4j
System.setProperty("twitter4j.oauth.consumerKey",CONSUMER_KEY)
System.setProperty("twitter4j.oauth.consumerSecret",CONSUMER_SECRET)
System.setProperty("twitter4j.oauth.accessToken",ACCESS_TOKEN)
System.setProperty("twitter4j.oauth.accessTokenSecret",ACCESS_TOKEN_SECRET)


//Streaming context and twitter stream set up
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.twitter._
val ssc = new StreamingContext(sc,Seconds(BATCH_SIZE.toLong))

val stream = TwitterUtils.createStream(ssc, None)
val tweets = stream.filter(_.getLang == "en")


//text_sentiment_pairs
val text_sentiment_pairs = tweets.map { status =>
        val text = status.getText.toLowerCase
        val sentiment = getSentiment(text)
        (text,sentiment)
    }


//sentiment_array
val sentiment_array = text_sentiment_pairs.filter(l => filterData(l._1,words)).map(l=>l._2)

//Window definition
val sentiment_window = sentiment_array.window(Seconds(WINDOW_LENGTH),Seconds(SLIDE_DURATION))

//FUNCTIONALITY FOR WINDOW AVERAGES
//An ArrayBuffer to hold each (timestamp, average) pair
val all_averages = ArrayBuffer[(String,Double)]()

//getAverage function
def getAverage(sentiments: Array[Double], aa: ArrayBuffer[(String,Double)],t: String) = {
    val new_avg = sentiments.sum/sentiments.length
    val clean_timestamp = t.split(" ")(0).takeRight(7)
    aa+=((clean_timestamp,new_avg))
}

//compute average for a window and add it, timestamped to all_averages
sentiment_window.foreachRDD((r,t) => {
    //println(r.count,r.sum)
    getAverage(r.collect(),all_averages,t.toString)
    println(all_averages)
})


//Create a new XYSeries object that holds the data for the graph
//And a dataset that contains this XYSeries object
//The goal is to update xy whenever there is a new average in all_averages

val xy = new XYSeries("") 
val dataset = new XYSeriesCollection(xy)

//Creates the chart object (done for you)
val chart = ChartFactory.createXYLineChart( 
  "Sentiment Chart",  // chart title 
  "Time",               // x axis label 
  "Sentiment",                   // y axis label 
  dataset,                   // data 
  PlotOrientation.VERTICAL, 
  false,                    // include legend 
  true,                     // tooltips 
  false                     // urls 
)

//From the chart, grab the blot so that we can configure formatting info
val plot = chart.getXYPlot() 

def configurePlot(plot: XYPlot): Unit = { 
  plot.setBackgroundPaint(Color.WHITE) 
  plot.setDomainGridlinePaint(Color.BLACK) 
  plot.setRangeGridlinePaint(Color.BLACK) 
  plot.setOutlineVisible(false) 
} 

//A function that shows the chart. This, when called, will pop up the chartin a separate window.// 

def show(chart: JFreeChart) { 
  val frame = new ChartFrame("plot", chart) 
  frame.pack() 
  frame.setVisible(true) 
}

//Call the plot configuration function
//Call the show chart function (now it will actually pop up)
configurePlot(plot) 
show(chart)


//Start the stream
//Inside a while loop, sleep for a bit
//then check if there are new elements in all_averages
//if there are new elements, add them to xy using addOrUpdate (see documentation linked above)
//you can do this in many ways but easy way is to keep a record of the current length
//check if the new length of the array is greater than the recorded length
//if it is, add the elements in all_averages.length - previous_length to xy

//Use addOrUpdate (not add) so that the graph updates
//Use Thread.sleep(n) to sleep n-seconds (10000, or 10 seconds should work well)

//The while should run as long as the length of all_averages is less than NUM_BATCHES
//Call ssc.stop(false) after the while loop

//Enjoy!
var index = 0
ssc.start
while (all_averages.length < NUM_BATCHES ) {
    Thread.sleep(10000);
    var len = all_averages.length
    if (len > index) {
        all_averages.takeRight(len-index).toArray.foreach{ case (x: String, y: Double) => xy.addOrUpdate(x.toInt,y) } 
        index = len
    }
}
ssc.stop(false)