-
Notifications
You must be signed in to change notification settings - Fork 153
/
Copy pathTweet.scala
82 lines (68 loc) · 2.93 KB
/
Tweet.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package models
import org.joda.time.DateTime
import play.api.libs.concurrent.Execution.Implicits.defaultContext
import play.api.libs.iteratee.Iteratee
import play.api.libs.json.{ Json, JsValue, JsSuccess, JsError}
import play.api.libs.oauth.{ ConsumerKey, RequestToken, OAuthCalculator }
import play.api.libs.ws.WS
import play.modules.reactivemongo.PlayBsonImplicits.JsValueWriter
import reactivemongo.bson.BSONObjectID
import actors._
import models.TweetImplicits._
import utils._
/** Simple Tweet representation */
case class Tweet(
tweet_id: Long,
screen_name: String,
text: String,
wordCount: Int,
charCount: Int,
location: String,
profile_image_url: String,
geo: Option[String],
created_at: DateTime,
id: Option[BSONObjectID]
)
/** holds the state for GUI updates (list of recent tweets and a word frequency map), used for Json serialization */
case class TweetState(
tweetList: List[Tweet],
wordMap: Map[String, Int],
charCountMean: Double,
charCountStdDev: Double,
wordCountMean: Double,
wordCountStdDev: Double,
n: Int
)
/** Companion object for case class Tweet, takes care of retrieving Tweets from
* Twitter using the Streaming API and publishing them on the akka eventStream */
object Tweet {
def stripImageUrl(t: Tweet) = t.copy(profile_image_url = t.profile_image_url.replaceAll("http://", "").replaceAll("_normal", ""))
/** Iteratee for processing each chunk from Twitter stream of Tweets. Parses Json chunks
* as Tweet instances and publishes them to eventStream.
*/
val tweetIteratee = Iteratee.foreach[Array[Byte]] { chunk =>
val chunkString = new String(chunk, "UTF-8")
val json = Json.parse(chunkString)
// inserting raw Tweet
// TODO: make this the only representation within MongoDB, rehydrate Tweets from this representation
Mongo.rawTweets.insert[JsValue](json)
TweetReads.reads(json) match {
case JsSuccess(t: Tweet, _) => { ActorStage.imgSupervisor ! WordCount.wordsChars(stripImageUrl(t))
//ActorStage.system.eventStream.publish(WordCount.wordsChars(stripImageUrl(t)))
}
case JsError(msg) => println(msg)
}
}
/** OAuth consumer key and secret for Twitter Streaming API*/
val consumerKey = ConsumerKey(Conf.get("twitter.consumer.key"), Conf.get("twitter.consumer.secret"))
/** OAuth request key and secret for Twitter Streaming API*/
val accessToken = RequestToken(Conf.get("twitter.accessToken.key"), Conf.get("twitter.accessToken.secret"))
/** Connect to Twitter Streaming API and retrieve a stream of Tweets for the specified search word or words.
* Individual words can be delimited by '%2C', see https://dev.twitter.com/docs/streaming-apis for reference.
* @param track String with search word(s) */
def listen(track: String) {
WS.url("https://stream.twitter.com/1.1/statuses/filter.json?track=" + track).withTimeout(-1)
.sign(OAuthCalculator(consumerKey, accessToken))
.get(_ => tweetIteratee)
}
}