Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
tag: heroku-devcent…
Fetching contributors…

Octocat-spinner-32-eaf2f5

Cannot retrieve contributors at this time

file 144 lines (113 sloc) 4.788 kb
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
package com.typesafe.webwords.common

import java.net.URL
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.TimeUnit
import java.util.concurrent.Executors
import com.ning.http.client._
import akka.actor._
import akka.dispatch._
import akka.event.EventHandler

sealed trait URLFetcherIncoming
case class FetchURL(u: URL) extends URLFetcherIncoming

sealed trait URLFetcherOutgoing
case class URLFetched(status: Int, headers: Map[String, String], body: String) extends URLFetcherOutgoing

/**
* This is an actor which encapsulates the AsyncHttpClient library.
*/
class URLFetcher extends Actor {

    private val asyncHttpClient = URLFetcher.makeClient

    override def receive = {
        case incoming: URLFetcherIncoming => {
            val f = incoming match {
                case FetchURL(u) =>
                    URLFetcher.fetchURL(asyncHttpClient, u)
            }

            self.channel.replyWith(f)
        }
    }

    override def postStop = {
        asyncHttpClient.close()
    }
}

object URLFetcher {
    // This field is just used for debug/logging/testing
    val httpInFlight = new AtomicInteger(0)

    // note: an AsyncHttpClient is a heavy object with a thread
    // and connection pool associated with it, it's supposed to
    // be shared among lots of requests, not per-http-request
    private def makeClient(implicit dispatcher: MessageDispatcher) = {
        val executor = Executors.newCachedThreadPool()

        val builder = new AsyncHttpClientConfig.Builder()
        val config = builder.setMaximumConnectionsTotal(1000)
            .setMaximumConnectionsPerHost(15)
            .setExecutorService(executor)
            .setFollowRedirects(true)
            .build
        new AsyncHttpClient(config)
    }

    private def fetchURL(asyncHttpClient: AsyncHttpClient, u: URL): Future[URLFetched] = {
        // timeout the Akka future 50ms after we'd have timed out the request anyhow,
        // gives us 50ms to parse the response
        val f = new DefaultCompletableFuture[URLFetched](asyncHttpClient.getConfig().getRequestTimeoutInMs() + 50,
            TimeUnit.MILLISECONDS)

        val httpHandler = new AsyncHandler[Unit]() {
            httpInFlight.incrementAndGet()

            val builder =
                new Response.ResponseBuilder()

            var finished = false

            // We can have onThrowable called because onCompleted
            // throws, and other complex situations, so to handle everything
            // we use this
            private def finish(body: => Unit): Unit = {
                if (!finished) {
                    try {
                        body
                    } catch {
                        case t: Throwable => {
                            EventHandler.debug(this, t.getMessage)
                            f.completeWithException(t)
                            throw t // rethrow for benefit of AsyncHttpClient
                        }
                    } finally {
                        finished = true
                        httpInFlight.decrementAndGet()
                        assert(f.isCompleted)
                    }
                }
            }

            // this can be called if any of our other methods throws,
            // including onCompleted.
            def onThrowable(t: Throwable) {
                finish { throw t }
            }

            def onBodyPartReceived(bodyPart: HttpResponseBodyPart) = {
                builder.accumulate(bodyPart)

                AsyncHandler.STATE.CONTINUE
            }

            def onStatusReceived(responseStatus: HttpResponseStatus) = {
                builder.accumulate(responseStatus)

                AsyncHandler.STATE.CONTINUE
            }

            def onHeadersReceived(responseHeaders: HttpResponseHeaders) = {
                builder.accumulate(responseHeaders)

                AsyncHandler.STATE.CONTINUE
            }

            def onCompleted() = {
                import scala.collection.JavaConverters._

                finish {
                    val response = builder.build()

                    val headersJavaMap = response.getHeaders()

                    var headers = Map.empty[String, String]
                    for (header <- headersJavaMap.keySet.asScala) {
                        // sometimes getJoinedValue() would be more correct.
                        headers += (header -> headersJavaMap.getFirstValue(header))
                    }

                    val body = response.getResponseBody()

                    f.completeWithResult(URLFetched(response.getStatusCode(), headers, body))
                }
            }
        }

        asyncHttpClient.prepareGet(u.toExternalForm()).execute(httpHandler)
        f
    }
}
Something went wrong with that request. Please try again.