-
Notifications
You must be signed in to change notification settings - Fork 152
Expand file tree
/
Copy pathTwitter.scala
More file actions
146 lines (117 loc) · 5.43 KB
/
Twitter.scala
File metadata and controls
146 lines (117 loc) · 5.43 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package controllers
import akka.actor.{Actor, Props}
import org.joda.time.DateTime
import play.api.libs.concurrent.Execution.Implicits.defaultContext
import play.api.libs.iteratee.{Iteratee, Concurrent}
import play.api.libs.json.Json
import play.api.mvc.{Action, Controller, WebSocket}
import reactivemongo.api._
import reactivemongo.bson._
import reactivemongo.bson.handlers.DefaultBSONHandlers.DefaultBSONReaderHandler
import reactivemongo.bson.BSONDateTime
import reactivemongo.api.QueryBuilder
import actors._
import models._
import models.TweetImplicits._
import utils._
import models.TweetState
import scala.concurrent.Future
/** Controller for serving main BirdWatch page including the WebSocket connection */
object Twitter extends Controller {
/** Serves HTML page (static content at the moment, page gets updates through WebSocket) */
def tweetList() = Action {
implicit request => {
RequestLogger.log(request)
Ok(views.html.twitter.tweets(Seq[Tweet]()))
}
}
/** Serves WebSocket connection updating the UI */
def tweetFeed() = WebSocket.using[String] {
implicit request =>
/** Iteratee for incoming messages on WebSocket connection, currently ignored */
val in = Iteratee.ignore[String]
/** Creates enumerator and channel for Strings through Concurrent factory object
* for pushing data through the WebSocket */
val (out, wsOutChannel) = Concurrent.broadcast[String]
/** "side-effecting" function to do something with the accumulator without possibly mutating it
* e.g. push some computation to a WebSocket enumerator or to log file
* @param tweetList accumulator inside the Iteratee
* @return Unit, cannot interfere with the accumulator inside the Iteratee
*/
def interceptTweetList(tweetList: List[Tweet]) {
val (charCountMean, charCountStdDev) = Calc.stdDev(tweetList.map(t => t.charCount))
val (wordCountMean, wordCountStdDev) = Calc.stdDev(tweetList.map(t => t.wordCount))
val tweetState = TweetState(tweetList.take(1), WordCount.topN(tweetList, 250), charCountMean, charCountStdDev,
wordCountMean, wordCountStdDev, tweetList.size)
wsOutChannel.push(Json.stringify(Json.toJson(tweetState)))
}
/** Creates enumerator and channel for Tweets through Concurrent factory object */
val (enumerator, tweetChannel) = Concurrent.broadcast[Tweet]
/** Iteratee processing Tweets from tweetChannel, accumulating a rolling window of tweets */
val tweetListIteratee = WordCount.tweetListIteratee(interceptTweetList, List[Tweet](), 1000)
enumerator |>>> tweetListIteratee // attach tweetListIteratee to enumerator
/** Actor for subscribing to eventStream. Pushes received tweets into TweetChannel for
* consumption through iteratee (and potentially other consumers, decoupled) */
val subscriber = ActorStage.system.actorOf(Props(new Actor {
def receive = {
case t: Tweet => tweetChannel.push(t) // push received tweet into Concurrent.Channel[Tweet]
}
}))
/** Pre-load the last 500 tweets through WebSocket connection */
latestTweetQuery.map {
tweets => tweets.take(500).reverse.foreach(t => tweetChannel.push(t)) // push last 500 tweets
ActorStage.system.eventStream.subscribe(subscriber, classOf[Tweet]) // subscribe to incoming tweets
}
(in, out) // in and out channels for WebSocket connection
}
/** Query latest tweets (lazily evaluated stream, result could be od arbitrary size) */
def latestTweetQuery: Future[List[Tweet]] = {
val query = QueryBuilder().query(BSONDocument("created_at" -> BSONDocument("$lte" -> BSONDateTime(DateTime.now.getMillis))))
.sort("created_at" -> SortOrder.Descending)
// run this query over the collection
val cursor = Mongo.tweets.find(query)
cursor.toList
}
/** Controller Action serving Tweets as JSON going backwards in time from the
* specified time in milliseconds from epoch
* @param millis time in millis
* @param results number of results to return
*/
def tweetsJson(millis: Long, results: Int) = Action {
implicit request =>
Async {
latestTweetQuery.map {
tweets => Ok(content = Json.toJson(tweets.take(results)))
}
}
}
/** Controller Action serving Tweets as JSON going backwards in time from when
* the action is called
* @param results number of results to return
*/
def tweetsJsonLatest(results: Int) = tweetsJson(DateTime.now.getMillis, results)
/** Controller Action replaying the specified number of tweets from
* the specified time in millis forward.
* @param minutesAgo time in minutes
* @param results number of results to return
*/
def tweetReplay(minutesAgo: Long, results: Int) = Action {
implicit request =>
Async {
val query = QueryBuilder().query(BSONDocument("created_at" ->
BSONDocument("$gte" -> BSONDateTime(DateTime.now.getMillis - (minutesAgo * 60 * 1000)))))
.sort("created_at" -> SortOrder.Ascending)
// run this query over the collection
val cursor = Mongo.tweets.find(query)
// got the list of documents (in a fully non-blocking way)
cursor.toList.map {
tweets =>
tweets.take(results).foreach {
t => ActorStage.system.eventStream.publish(t)
Thread.sleep(250)
}
Ok(Json.toJson(tweets.take(results)))
}
}
}
}