Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
tag: heroku-devcent…
Fetching contributors…

Octocat-spinner-32-eaf2f5

Cannot retrieve contributors at this time

file 198 lines (170 sloc) 7.215 kb
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197
package com.typesafe.webwords.indexer

import java.io.File
import java.net.URI
import java.net.URL

import com.typesafe.webwords.common._

import akka.actor.{ Index => _, _ }
import akka.actor.Actor.actorOf
import akka.dispatch._

sealed trait SpiderRequest
case class Spider(url: URL) extends SpiderRequest

sealed trait SpiderReply
case class Spidered(url: URL, index: Index)

/**
* This actor coordinates URLFetcher and IndexerActor actors in order
* to do a shallow spider of a site and compute an "index" for the
* site. An index is a list of word counts and a list of links found
* on the site.
*
* There's some list processing in functional style in this file
* that may be of interest if you're learning about Scala and
* functional programming.
*/
class SpiderActor
    extends Actor {
    private val indexer = actorOf[IndexerActor]
    private val fetcher = actorOf[URLFetcher]

    override def preStart() = {
        indexer.start
        fetcher.start
    }

    override def postStop() = {
        indexer.stop
        fetcher.stop
    }

    override def receive = {
        case request: SpiderRequest => request match {
            case Spider(url) =>
                self.channel.replyWith(SpiderActor.spider(indexer, fetcher, url))
        }
    }
}

object SpiderActor {
    private def fetchBody(fetcher: ActorRef, url: URL): Future[String] = {
        val fetched = fetcher ? FetchURL(url)
        fetched map {
            case URLFetched(status, headers, body) if status == 200 =>
                // FIXME should probably filter out non-HTML content types
                body
            case URLFetched(status, headers, body) =>
                throw new Exception("Failed to fetch, status: " + status)
            case whatever =>
                throw new IllegalStateException("Unexpected reply to url fetch: " + whatever)
        }
    }

    private def fetchIndex(indexer: ActorRef, fetcher: ActorRef, url: URL): Future[Index] = {
        fetchBody(fetcher, url) flatMap { body =>
            val indexed = indexer ? IndexHtml(url, body)
            indexed map {
                case IndexedHtml(index) =>
                    index

            }
        }
    }

    // pick a few links on the page to follow, preferring to "descend"
    private def childLinksToFollow(url: URL, index: Index): Seq[URL] = {
        val uri = removeFragment((url.toURI))
        val siteRoot = copyURI(uri, path = Some(null))
        val parentPath = new File(uri.getPath).getParent
        val parent = if (parentPath != null) copyURI(uri, path = Some(parentPath)) else siteRoot

        val sameSiteOnly = index.links map {
            kv => kv._2
        } map {
            new URI(_)
        } map {
            removeFragment(_)
        } filter {
            _ != uri
        } filter {
            isBelow(siteRoot, _)
        } sortBy {
            pathDepth(_)
        }
        val siblingsOrChildren = sameSiteOnly filter { isBelow(parent, _) }
        val children = siblingsOrChildren filter { isBelow(uri, _) }

        // prefer children, if not enough then siblings, if not enough then same site
        val toFollow = (children ++ siblingsOrChildren ++ sameSiteOnly).distinct take 10 map { _.toURL }
        toFollow
    }

    private[indexer] def combineSortedCounts(sortedA: List[(String, Int)], sortedB: List[(String, Int)]): List[(String, Int)] = {
        if (sortedA == Nil) {
            sortedB
        } else if (sortedB == Nil) {
            sortedA
        } else {
            val aHead = sortedA.head
            val bHead = sortedB.head
            val aWord = aHead._1
            val bWord = bHead._1

            if (aWord == bWord) {
                (aWord -> (aHead._2 + bHead._2)) :: combineSortedCounts(sortedA.tail, sortedB.tail)
            } else if (aWord < bWord) {
                aHead :: combineSortedCounts(sortedA.tail, sortedB)
            } else {
                bHead :: combineSortedCounts(sortedA, sortedB.tail)
            }
        }
    }

    private[indexer] def combineCounts(a: Seq[(String, Int)], b: Seq[(String, Int)]) = {
        combineSortedCounts(a.toList.sortBy(_._1), b.toList.sortBy(_._1)).sortBy(0 - _._2)
    }

    private[indexer] def mergeIndexes(a: Index, b: Index): Index = {
        val links = (a.links ++ b.links).sortBy(_._1).distinct

        // ideally we might combine and count length at the same time, but we'll live
        val counts = combineCounts(a.wordCounts, b.wordCounts).take(math.max(a.wordCounts.length, b.wordCounts.length))
        Index(links, counts)
    }

    private def combineIndexes(indexes: TraversableOnce[Index]): Index = {
        indexes.reduce(mergeIndexes(_, _))
    }

    /*
* This is a relatively complex example of using Akka's Future class.
* Note that this code never blocks (no future.await()
* or future.get()), instead it uses map, flatMap, and friends
* on the futures to chain them together. It then ultimately
* returns a future that will be completed when all the dependent
* futures are also completed.
*/
    private[indexer] def spider(indexer: ActorRef, fetcher: ActorRef, url: URL): Future[Spidered] = {
        implicit val dispatcher = indexer.dispatcher
        fetchIndex(indexer, fetcher, url) flatMap { rootIndex =>
            val childIndexes = childLinksToFollow(url, rootIndex) map { fetchIndex(indexer, fetcher, _) }
            val rootIndexFuture = new DefaultCompletableFuture[Index] // some 1.2 bug: AlreadyCompletedFuture breaks
            rootIndexFuture.completeWithResult(rootIndex)
            val allIndexFutures = childIndexes ++ Iterator(rootIndexFuture)
            val allIndexes = Future.sequence(allIndexFutures)
            val combinedIndex = allIndexes map { indexes =>
                combineIndexes(indexes)
            }
            val spidered = combinedIndex map { index =>
                Spidered(url, index)
            }

            spidered
        }
    }

    // this lets us copy URIs changing one part of the URI via keyword.
    private def copyURI(uri: URI, scheme: Option[String] = None, userInfo: Option[String] = None,
        host: Option[String] = None, port: Option[Int] = None, path: Option[String] = None,
        query: Option[String] = None, fragment: Option[String] = None): URI = {
        new URI(if (scheme.isEmpty) uri.getScheme else scheme.get,
            if (userInfo.isEmpty) uri.getUserInfo else userInfo.get,
            if (host.isEmpty) uri.getHost else host.get,
            if (port.isEmpty) uri.getPort else port.get,
            if (path.isEmpty) uri.getPath else path.get,
            if (query.isEmpty) uri.getQuery else query.get,
            if (fragment.isEmpty) uri.getFragment else fragment.get)
    }

    private[indexer] def removeFragment(uri: URI) = {
        if (uri.getFragment != null)
            copyURI(uri, fragment = Some(null))
        else
            uri
    }

    private[indexer] def isBelow(uri: URI, possibleChild: URI) = {
        val r = uri.relativize(possibleChild)
        !r.isAbsolute && uri != possibleChild
    }

    private[indexer] def pathDepth(uri: URI) = {
        uri.getPath.count(_ == '/')
    }
}
Something went wrong with that request. Please try again.