Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
tag: heroku-devcent…
Fetching contributors…

Octocat-spinner-32-eaf2f5

Cannot retrieve contributors at this time

file 82 lines (67 sloc) 2.766 kb
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
package com.typesafe.webwords.common

import java.net.URL

import akka.actor.{ Index => _, _ }
import akka.actor.Actor.actorOf
import akka.dispatch._

sealed trait ClientActorIncoming
case class GetIndex(url: String, skipCache: Boolean) extends ClientActorIncoming

sealed trait ClientActorOutgoing
case class GotIndex(url: String, index: Option[Index], cacheHit: Boolean) extends ClientActorOutgoing

/**
* This actor encapsulates:
* - checking the cache for an index of a certain URL
* - asking the indexer worker process to index the URL if it's not cached
* - checking the cache again when the worker is done
* It coordinates a WorkQueueClientActor and IndexStorageActor to accomplish
* this.
*/
class ClientActor(config: WebWordsConfig) extends Actor {
    import ClientActor._

    private val client = actorOf(new WorkQueueClientActor(config.amqpURL))
    private val cache = actorOf(new IndexStorageActor(config.mongoURL))

    override def receive = {
        case incoming: ClientActorIncoming =>
            incoming match {
                case GetIndex(url, skipCache) =>

                    // we look in the cache, if that fails, ask spider to
                    // spider and then notify us, and then we look in the
                    // cache again.
                    def getWithoutCache = {
                        getFromWorker(client, url) flatMap { _ =>
                            getFromCacheOrElse(cache, url, cacheHit = false) {
                                new AlreadyCompletedFuture[GotIndex](Right(GotIndex(url, index = None, cacheHit = false)))
                            }
                        }
                    }

                    val futureGotIndex = if (skipCache)
                        getWithoutCache
                    else
                        getFromCacheOrElse(cache, url, cacheHit = true) { getWithoutCache }

                    self.channel.replyWith(futureGotIndex)
            }
    }

    override def preStart = {
        client.start
        cache.start
    }

    override def postStop = {
        client.stop
        cache.stop
    }
}

object ClientActor {
    private def getFromCacheOrElse(cache: ActorRef, url: String, cacheHit: Boolean)(fallback: => Future[GotIndex]): Future[GotIndex] = {
        cache ? FetchCachedIndex(url) flatMap {
            case CachedIndexFetched(Some(index)) =>
                new AlreadyCompletedFuture(Right(GotIndex(url, Some(index), cacheHit)))
            case CachedIndexFetched(None) =>
                fallback
        }
    }

    private def getFromWorker(client: ActorRef, url: String): Future[Unit] = {
        client ? SpiderAndCache(url) map {
            case SpideredAndCached(returnedUrl) =>
                Unit
        }
    }
}
Something went wrong with that request. Please try again.