Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Comparing changes

Choose two branches to see what's changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
base fork: taraxe/opinionz
base: 84b5e42c34
...
head fork: taraxe/opinionz
compare: 024d858802
Checking mergeability… Don't worry, you can still create the pull request.
  • 2 commits
  • 12 files changed
  • 0 commit comments
  • 1 contributor
View
28 app/actors/OpinionFinder.scala
@@ -7,8 +7,9 @@ import play.libs.Akka
import services.Sentiment
import actors.ProfileWorker.NewTweets
import org.bson.types.ObjectId
-import models.{Profile, Tweet}
import com.mongodb.casbah.commons.MongoDBObject
+import play.api.libs.concurrent.Promise
+import models.{Opinion, Profile, Tweet}
class OpinionFinder extends Actor {
@@ -17,16 +18,21 @@ class OpinionFinder extends Actor {
def receive = {
case Find(term, tweet @ _*) => {
- val tweets = tweet.foldLeft(List[Tweet]())( (r,t) => {
- val o = Sentiment.getSentiment(t.text)
- val betterTweet = t.copy(opinion = o)
- r :+ betterTweet
- })
- ProfileWorker.ref ! NewTweets(term, tweets:_*)
-
- Profile.findOne(MongoDBObject("expression" -> term)).map{ p =>
- Profile.save(p.copy(tweets = p.tweets ++ tweets))
- }
+ Logger.debug("OpinionFinder received Find(...)")
+ val promises:Seq[Promise[(Tweet,Opinion)]] = tweet.map(t => Sentiment.getSentiment(t.text).map(o => (t, o)))
+ Logger.debug("Fetched "+promises.size+ " opinion analysis")
+ val tweetsWithOpinion = Promise.sequence(promises)
+ .map(_.map(two => two._1.copy(opinion = Some(two._2))))
+
+ tweetsWithOpinion.map { tws =>
+ ProfileWorker.ref ! NewTweets(term, tws:_*)
+
+ Profile.findOne(MongoDBObject("expression" -> term)) //FIX-ME what s
+ .map(_.id)
+ .map{ profileId =>
+ tws.map(t => Tweet.insert(t.copy(profileId = Some(profileId))) ) //
+ }
+ }
}
case _ => {
Logger.info("error matching actor message")
View
9 app/actors/ProfileWorker.scala
@@ -16,7 +16,7 @@ import models.{Tweet, Opinion}
class ProfileWorker extends Actor {
import ProfileWorker._
- var listeners : List[Tuple2[String,Pushee[Tweet]]] = Nil
+ var listeners : List[(String, Pushee[Tweet])] = Nil
def receive = {
case Listen(term) => {
@@ -24,7 +24,7 @@ class ProfileWorker extends Actor {
pushee => self ! Init(pushee, term),
onComplete = self ! Quit()
)
- //Logger.info("New opinion stream on")
+ Logger.info("New opinion stream on")
sender ! channel
}
case Init(pushee,term) => {
@@ -32,13 +32,12 @@ class ProfileWorker extends Actor {
}
case Quit() => {
- //Logger.info("Opinion stream stopped ...")
+ Logger.info("Opinion stream stopped ...")
}
case NewTweets(term, tweets @ _*) => {
- //Logger.info("New opinion : " + tweets.toString)
+ Logger.debug("Will stream "+tweets.size + " tweets with opinion")
listeners.filter(_._1 == term).foreach(p => tweets.foreach( t => {
- println("Result tweet ::==================== "+ t.opinion)
p._2.push(t)}))
}
case _ => {
View
16 app/actors/StreamRecorder.scala
@@ -27,20 +27,26 @@ class StreamRecorder extends Actor {
def receive = {
case StartRecording(tokens, term) => {
+ Logger.debug("StreamRecorder received StartRecording("+tokens.toString()+","+term+")")
//Define an Iteratee send each list of tweets to Opinion finder actor
- val sendToOpinionFinder = Iteratee.foreach[List[Tweet]](list => OpinionFinder.ref ! Find(term, list: _*))
+ val sendToOpinionFinder = Iteratee.foreach[List[Tweet]](list => {
+ OpinionFinder.ref ! Find(term, list: _*)
+ })
//Define an Enumratee that transform Byte tweets into a list of Objects
val arrayToTweet: Enumeratee[Array[Byte], List[Tweet]] = Enumeratee.map[Array[Byte]](arr => {
val res = new String(arr)
+ Logger.debug(" --> New chunk")
Json.parse(res) match {
- case l: JsArray => l.asOpt[List[Tweet]].getOrElse(Nil)
- case o: JsObject => fromJson(o) :: Nil
+ case l: JsArray => {
+ l.asOpt[List[Tweet]].getOrElse(Nil)
+ }
+ case o: JsObject => {
+ List(fromJson(o))
+ }
}
})
//Iteratee to manage tweets stream
val wsIteratee = arrayToTweet.transform(sendToOpinionFinder)
- //Add the wanted term to the profile
- Profile.insert(Profile(term))
//Open twitter streaming pipe
WS.url("https://stream.twitter.com/1/statuses/filter.json?track=" + term)
.sign(OAuthCalculator(Twitter.KEY, tokens))
View
39 app/controllers/Profiles.scala
@@ -31,12 +31,10 @@ import play.api.libs.Comet
object Profiles extends Controller {
/** ====== Form definition ====== **/
val profileFrom: Form[Profile] = Form(
- mapping(
- "text" -> text) {
- expression => Profile(expression, Nil)
- } {
- profile => Some(profile.expression)
- })
+ mapping("text" -> text)
+ { term => Profile(expression = term) }
+ { profile => Some(profile.expression) }
+ )
/** ====== Actions defintion ====== **/
//Display form to create profile
@@ -45,23 +43,32 @@ object Profiles extends Controller {
}
//Create a profile and start streaming
- def create = Action { implicit request =>
+ def search = Action { implicit request =>
profileFrom.bindFromRequest.fold(
//Form with validation errors case
errors => {
Ok(views.html.profiles.form("There is some errors ", errors))
},
profile => {
- //Create a profile on mongodb
- Profile.insert(profile)
- //Retrieve Twitter Oauth tokens
- val tokens = Twitter.sessionTokenPair(request).get
- //Launch Tweets recorder
- StreamRecorder.ref ! StartRecording(tokens, profile.expression)
- Ok(views.html.profiles.stream("Now recording : " , profile))
- //Redirect(Profiles.stream(profile.expression)).withSession("token" -> t.token, "secret" -> t.secret)
+ Logger.debug("Find or create :"+profile.expression)
+ Profile //FIX-ME this should move inside StreamRecorder
+ .byTerm(profile.expression) // retrieve existing profile if any
+ .orElse{Profile.insert(profile).map(i => profile.copy(id = i))} //or create a new one in mongo
+ .toRight(InternalServerError) // if still no profile, fail
+ .right.map { p =>
+ //Retrieve Twitter Oauth tokens
+ val tokens = Twitter.sessionTokenPair(request).get
+ //Launch Tweets recorder
+ Logger.debug("")
+ StreamRecorder.ref ! StartRecording(tokens, p.expression)
+ p
+ }.fold(identity, p => Redirect(routes.Profiles.find(p.expression)))
})
-
+ }
+ def find(term:String) = Action { implicit request =>
+ Profile.byTerm(term)
+ .toRight(NotFound)
+ .fold(identity, p => Ok(views.html.profiles.stream("Now recording : " , p)))
}
/** ====== Define stream results ====== **/
View
10 app/models/Opinion.scala
@@ -11,23 +11,21 @@ import play.api.libs.json._
import play.api.Play.current
import java.util.Date
-case class Opinion(text: String, mood: String, prob: Double, date: Date = new Date()) {}
+case class Opinion(mood: String, prob: Double) {}
//case class Tweet(message:String, source:String, hashs:List[String])
-object Opinion extends SalatDAO[Opinion, ObjectId](collection = MongoPlugin.collection("opinions")) {
+object Opinion {
implicit object OpinionFormat extends Format[Opinion] {
def reads(json: JsValue): Opinion = Opinion(
- (json \ "text").as[String],
(json \ "mood").as[String],
(json \ "prob").as[Double])
def writes(o: Opinion): JsValue = JsObject(Seq(
- "text" -> JsString(o.text),
"mood" -> JsString(o.mood),
- "prob" -> JsNumber(o.prob),
- "date" -> JsNumber(o.date.getTime)))
+ "prob" -> JsNumber(o.prob)
+ ))
}
}
View
23 app/models/Profile.scala
@@ -5,7 +5,6 @@ import com.novus.salat.global._
import com.novus.salat.annotations._
import com.novus.salat.dao._
import com.mongodb.casbah.Imports._
-import com.mongodb.casbah.MongoConnection
import play.modules.mongodb._
import play.api.libs.json._
import play.api.libs.json.Json._
@@ -15,22 +14,26 @@ import play.api.Play.current
/**
* Case class for Profile document
*/
-case class Profile(expression: String, tweets: List[Tweet] = Nil) {}
+case class Profile(@Key("_id") id: ObjectId = new ObjectId, expression: String) {
+
+ def tweets = Profile.tweets.findByParentId(this.id).toList
+
+}
/**
* JSON formatter
*/
-object Profile extends SalatDAO[Profile, ObjectId](collection = MongoPlugin.collection("profiles")) {
+object Profile extends SalatDAO[Profile, ObjectId](collection = MongoPlugin.collection("profile")) {
+
+ val tweets = new ChildCollection[Tweet, ObjectId](collection = MongoPlugin.collection("tweet"), parentIdField = "profileId") {}
- implicit object ProfileFormat extends Format[Profile] {
- // Marshaling as JSON
- def reads(json: JsValue): Profile = Profile(
- (json \ "expression").as[String],
- (json \ "tweets").asOpt[List[Tweet]].getOrElse(List()))
- //Unmarshaling as JSON Object
+ implicit object ProfileFormat extends Writes[Profile] {
def writes(profile: Profile) = JsObject(Seq(
+ "id" -> JsString(profile.id.toString),
"expression" -> JsString(profile.expression),
"tweets" -> JsArray(profile.tweets.map(toJson(_)))))
}
-}
+ def byTerm(expression:String) = findOne(MongoDBObject("expression" -> expression))
+}
+case class Statistics()
View
20 app/models/Tweet.scala
@@ -1,21 +1,29 @@
package models
+import java.util.Date
+import com.novus.salat._
+import com.novus.salat.global._
+import com.novus.salat.annotations._
+import com.novus.salat.dao._
+import com.mongodb.casbah.Imports._
+import play.modules.mongodb._
import play.api.libs.json._
import play.api.libs.json.Json._
-import java.util.Date
-
+import play.api.libs.json.JsArray
+import play.api.Play.current
//TODO : add properties
-case class Tweet(text: String, opinion: Opinion = null) {}
+case class Tweet(profileId:Option[ObjectId], text: String, opinion: Option[Opinion] = None) {}
-object Tweet {
+object Tweet extends SalatDAO[Tweet, ObjectId](collection = MongoPlugin.collection("tweet")) {
implicit object TweetFormat extends Format[Tweet] {
def reads(json: JsValue): Tweet = Tweet(
+ None,
(json \ "text").as[String],
- (json \ "opinion").asOpt[Opinion].getOrElse(Opinion("", "", 0, new Date()))
+ (json \ "opinion").asOpt[Opinion]
)
-
def writes(tweet: Tweet): JsObject = JsObject(Seq(
+ "profileId" -> JsString(tweet.profileId.toString),
"text" -> JsString(tweet.text),
"opinion" -> toJson(tweet.opinion)
))
View
13 app/services/Sentiment.scala
@@ -3,16 +3,16 @@ package services
import models._
import play.api.libs.ws.WS
import play.api.libs.json.Json._
+import play.api.libs.concurrent.Promise
object Sentiment {
/**
* Get opinion
*/
- def getSentiment(text: String) = {
+ def getSentiment(text: String):Promise[Opinion] = {
WS.url(sentimentUrl).withQueryString(("api_key", key), ("text", text))
.get()
- .map { response => response.json.as[Opinion] }
- .value.get
+ .map(_.json.as[Opinion])
}
/**
* Add new sentence to repo
@@ -32,11 +32,4 @@ object Sentiment {
.map { response => (response.json \ "quota_remaining").as[Long] }
.value.get;
}
-
- def startProfile(profile : Profile) {
- Profile.insert(profile)
-
- }
-
-
}
View
2  app/views/profiles/form.scala.html
@@ -4,7 +4,7 @@
@main("Welcome to Play 2.0") {
-@helper.form(action = routes.Profiles.create) {
+@helper.form(action = routes.Profiles.search) {
<fieldset>
<legend>Opinion Finder</legend>
@inputText(
View
6 conf/routes
@@ -7,9 +7,11 @@
GET / controllers.Application.index
GET /logout controllers.Application.logout
GET /auth controllers.Twitter.authenticate
+GET /search controllers.Profiles.index
POST /search controllers.Profiles.index
-GET /stream/:term controllers.Profiles.stream(term:String)
-POST /result controllers.Profiles.create
+GET /stream/:term controllers.Profiles.stream(term)
+POST /result controllers.Profiles.search
+GET /:term controllers.Profiles.find(term)
# Map static resources from the /public folder to the /assets URL path
GET /assets/*file controllers.Assets.at(path="/public", file)
View
BIN  specs/components.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
View
1  specs/todo.txt
@@ -1,6 +1,7 @@
- fenetrage automatique aka mise à l'échelle (pour dataviz cumulée ou pas)
- restauration de l'historique dans la vue
- navigation dans l'historique des tweets
+- marqueur temporel à un instant t
- bar de stats client
- stats serveur
- popup de drilldown sur une barre (liste des tweets avec sentiment, avatar, date...)

No commit comments for this range

Something went wrong with that request. Please try again.