Skip to content
This repository has been archived by the owner on Dec 21, 2022. It is now read-only.

Implement early content-type check on response #17

Merged
merged 4 commits into from Nov 16, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.sbt
@@ -1,6 +1,6 @@
name := "metascraper"

version := "0.2.10-SNAPSHOT"
version := "0.3.1-SNAPSHOT"

scalaVersion := "2.11.7"

Expand Down
6 changes: 2 additions & 4 deletions src/main/scala/com/beachape/metascraper/Messages.scala
Expand Up @@ -24,14 +24,12 @@ object Messages {
acceptLanguageCode: String = "en",
userAgent: String = "Metascraper",
numberOfImages: Int = 5,
schemaFactories: Seq[SchemaFactory] = Seq(HtmlSchemas(OpenGraph, NormalPage))
)
schemaFactories: Seq[SchemaFactory] = Seq(HtmlSchemas(OpenGraph, NormalPage)))

sealed case class ScrapedData(
url: Url,
title: String,
description: String,
mainImageUrl: Url,
imageUrls: Seq[Url]
)
imageUrls: Seq[Url])
}
36 changes: 31 additions & 5 deletions src/main/scala/com/beachape/metascraper/Scraper.scala
Expand Up @@ -2,7 +2,8 @@ package com.beachape.metascraper

import com.beachape.metascraper.Messages.{ ScrapedData, ScrapeUrl }
import com.beachape.metascraper.extractors.{ SchemaFactory, Schema }
import com.ning.http.client.Response
import com.ning.http.client.AsyncHandler.STATE
import com.ning.http.client.{ HttpResponseHeaders, AsyncHandler, Response }
import dispatch._
import org.apache.commons.validator.routines.UrlValidator
import StringOps._
Expand All @@ -13,6 +14,12 @@ import scala.util._
/**
* Created by Lloyd on 2/15/15.
*/

object Scraper {

val ContentTypeHeaderName: String = "Content-Type"
}

class Scraper(httpClient: Http, urlSchemas: Seq[String])(implicit ec: ExecutionContext) {

private val urlValidator = new UrlValidator(urlSchemas.toArray)
Expand All @@ -37,9 +44,10 @@ class Scraper(httpClient: Http, urlSchemas: Seq[String])(implicit ec: ExecutionC
"User-Agent" -> Seq(message.userAgent),
"Accept-Language" -> Seq(message.acceptLanguageCode)
)
val request = url(messageUrl).setHeaders(requestHeaders)
val resp = httpClient(request)
resp map (s => extractData(s, messageUrl, message.schemaFactories, message.numberOfImages))
val request = url(messageUrl).setHeaders(requestHeaders).toRequest
val handler = handleSupportedTypes(supportedContentTypes(message))
val resp = httpClient(request, handler)
resp.map(s => extractData(s, messageUrl, message.schemaFactories, message.numberOfImages))
}
}

Expand All @@ -51,7 +59,7 @@ class Scraper(httpClient: Http, urlSchemas: Seq[String])(implicit ec: ExecutionC
*/
def extractData(resp: Response, url: String, schemaFactories: Seq[SchemaFactory], numberOfImages: Int): ScrapedData = {
if (resp.getStatusCode / 100 == 2) {
val schemas = schemaFactories.toStream.flatMap(f => Try(f.apply(resp)).getOrElse(Nil)) // Stream in case we have expensive factories
val schemas = schemaFactories.toStream.flatMap(f => Try(f.apply(resp)).getOrElse(Nil)) // Stream to avoid generating schemas if possible
val maybeUrl = schemas.flatMap(s => Try(s.extractUrl).toOption).find(_.isDefined).getOrElse(None)
val maybeTitle = schemas.flatMap(s => Try(s.extractTitle).toOption).find(_.isDefined).getOrElse(None)
val maybeDescription = schemas.flatMap(s => Try(s.extractDescription).toOption).find(_.isDefined).getOrElse(None)
Expand All @@ -69,4 +77,22 @@ class Scraper(httpClient: Http, urlSchemas: Seq[String])(implicit ec: ExecutionC
}
}

private[this] def supportedContentTypes(message: ScrapeUrl): Seq[String] = {
message.schemaFactories.flatMap(_.contentTypes)
}

/**
* Creates an OK response handler that aborts if the response's Content-Type is not in the passed OK types list
*/
private[metascraper] def handleSupportedTypes(okTypes: Seq[String]) = new OkFunctionHandler(identity) {
override def onHeadersReceived(headers: HttpResponseHeaders): STATE = {
val maybeHeaders = Option(headers.getHeaders)
val maybeContentType = maybeHeaders.flatMap(h => Option(h.getFirstValue(Scraper.ContentTypeHeaderName)))
maybeContentType match {
case Some(respType) if okTypes.exists(okType => respType.contains(okType)) => super.onHeadersReceived(headers)
case _ => STATE.ABORT
}
}
}

}
25 changes: 13 additions & 12 deletions src/main/scala/com/beachape/metascraper/ScraperActor.scala
Expand Up @@ -6,6 +6,7 @@ import dispatch._
import java.util.concurrent.Executors
import com.ning.http.client.{ AsyncHttpClientConfig, AsyncHttpClient }

import scala.concurrent.duration._
import scala.util.{ Failure, Success }

/**
Expand All @@ -23,18 +24,19 @@ object ScraperActor {
* @return Props for instantiating a ScaperActor
*/
def apply(
httpExecutorThreads: Int = 10,
httpExecutorThreads: Int = 20,
maxConnectionsPerHost: Int = 30,
connectionTimeoutInMs: Int = 10000,
requestTimeoutInMs: Int = 15000
) =
requestTimeoutInMs: Int = 15000) =
Props(
classOf[ScraperActor],
httpExecutorThreads,
maxConnectionsPerHost,
connectionTimeoutInMs,
requestTimeoutInMs
)

private def coreCount = Runtime.getRuntime.availableProcessors()
}

/**
Expand All @@ -44,11 +46,10 @@ object ScraperActor {
* method
*/
class ScraperActor(
httpExecutorThreads: Int = 10,
threadMultiplier: Int = 3,
maxConnectionsPerHost: Int = 30,
connectionTimeoutInMs: Int = 10000,
requestTimeoutInMs: Int = 15000
)
connectionTimeout: Duration = 10.seconds,
requestTimeout: Duration = 15.seconds)
extends Actor with ActorLogging {

import context.dispatcher
Expand All @@ -57,17 +58,17 @@ class ScraperActor(
val validSchemas = Seq("http", "https")
// Http client config
val followRedirects = true
val connectionPooling = true
val connectionPooling = false

private val executorService = Executors.newFixedThreadPool(httpExecutorThreads)
private val executorService = Executors.newFixedThreadPool(threadMultiplier * ScraperActor.coreCount)
private val config = new AsyncHttpClientConfig.Builder()
.setExecutorService(executorService)
.setIOThreadMultiplier(1) // otherwise we might not have enough threads
.setIOThreadMultiplier(threadMultiplier)
.setMaxConnectionsPerHost(maxConnectionsPerHost)
.setAllowPoolingConnections(connectionPooling)
.setAllowPoolingSslConnections(connectionPooling)
.setConnectTimeout(connectionTimeoutInMs)
.setRequestTimeout(requestTimeoutInMs)
.setConnectTimeout(connectionTimeout.toMillis.toInt)
.setRequestTimeout(requestTimeout.toMillis.toInt)
.setFollowRedirect(followRedirects).build
private val asyncHttpClient = new AsyncHttpClient(config)
private val httpClient = new Http(asyncHttpClient)
Expand Down
Expand Up @@ -7,6 +7,11 @@ import com.ning.http.client.Response
*/
trait SchemaFactory extends (Response => Seq[Schema]) {

/**
* Supported Content-Types for this [[SchemaFactory]]
*/
def contentTypes: Seq[String]

/**
* Based on a [[Response]], returns a list of [[Schema]]
*/
Expand Down
Expand Up @@ -24,24 +24,13 @@ trait HtmlSchema extends Schema {

}

object HtmlSchemas {

private val ContentType = "text/html"

private def supportedContentType(response: Response): Boolean = {
Option(response.getContentType).exists(_.contains(ContentType))
}
}

case class HtmlSchemas(schemas: (Document => HtmlSchema)*) extends SchemaFactory {

val contentTypes: Seq[String] = Seq("text/html")

def apply(resp: Response): Seq[HtmlSchema] = {
if (HtmlSchemas.supportedContentType(resp)) {
val doc = Jsoup.parse(String(resp), resp.getUri.toString)
schemas.map(_.apply(doc))
} else {
Nil
}
val doc = Jsoup.parse(String(resp), resp.getUri.toString)
schemas.map(_.apply(doc))
}

}
Expand Up @@ -85,7 +85,7 @@ class ScraperActorSpec extends TestKit(ActorSystem("testSystem"))
val Right(scrapedData) = response
scrapedData.title should be('empty)
scrapedData.description should be('empty)
scrapedData.url should be("http://www.beachape.com/downloads/code/scala/schwatcher_example.scala")
scrapedData.url should be("https://beachape.com/downloads/code/scala/schwatcher_example.scala")
scrapedData.mainImageUrl should be('empty)
scrapedData.imageUrls should be('empty)
}
Expand Down
32 changes: 31 additions & 1 deletion src/test/scala/com/beachape/metascraper/ScraperSpec.scala
Expand Up @@ -6,8 +6,9 @@ import java.util

import com.beachape.metascraper.Messages.ScrapeUrl
import com.beachape.metascraper.extractors.{ SchemaFactory, DocsSupport, Schema }
import com.ning.http.client.AsyncHandler.STATE
import com.ning.http.client.cookie.Cookie
import com.ning.http.client.{ FluentCaseInsensitiveStringsMap, Response }
import com.ning.http.client.{ HttpResponseHeaders, FluentCaseInsensitiveStringsMap, Response }

import dispatch.{ Uri, Http }
import org.scalatest._
Expand Down Expand Up @@ -42,6 +43,34 @@ class ScraperSpec

}

describe("#handleSupportedTypes handler") {

val htmlHandler = subject.handleSupportedTypes(Seq("text/html"))
val htmlRespHeadersMap = {
val m = new FluentCaseInsensitiveStringsMap()
m.add("Content-Type", "text/html")
}
val nonHtmlRespHeadersMap = {
val m = new FluentCaseInsensitiveStringsMap()
m.add("Content-Type", "application/json")
}

it("should create a handler that continues on supported header types") {
val r = htmlHandler.onHeadersReceived(new HttpResponseHeaders() {
def getHeaders: FluentCaseInsensitiveStringsMap = htmlRespHeadersMap
})
r shouldBe STATE.CONTINUE
}

it("should create a handler that aborts on unsupported header types") {
val r = htmlHandler.onHeadersReceived(new HttpResponseHeaders() {
def getHeaders: FluentCaseInsensitiveStringsMap = nonHtmlRespHeadersMap
})
r shouldBe STATE.ABORT
}

}

describe("#extractData") {

val scraper1 = new Schema {
Expand Down Expand Up @@ -73,6 +102,7 @@ class ScraperSpec
}

val factory = new SchemaFactory {
val contentTypes = Seq("text/html")
def apply(s: Response): Seq[Schema] = Seq(scraper1, scraper2)
}

Expand Down