Skip to content

Commit

Permalink
Scala Stream Collector: integrate the size violation bad row type (cl…
Browse files Browse the repository at this point in the history
…oses #4177)
  • Loading branch information
BenFradet authored and benjben committed Sep 19, 2019
1 parent 93ad42f commit b466312
Show file tree
Hide file tree
Showing 24 changed files with 266 additions and 389 deletions.
4 changes: 1 addition & 3 deletions 2-collectors/scala-stream-collector/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,9 @@ lazy val commonDependencies = Seq(
Dependencies.Libraries.prometheusCommon,
// Scala
Dependencies.Libraries.scopt,
Dependencies.Libraries.scalaz7,
Dependencies.Libraries.akkaHttp,
Dependencies.Libraries.akkaSlf4j,
Dependencies.Libraries.json4sJackson,
Dependencies.Libraries.snowplowCommonEnrich,
Dependencies.Libraries.badRows,
Dependencies.Libraries.collectorPayload,
Dependencies.Libraries.pureconfig,
// Scala (test)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,21 @@
* implied. See the Apache License Version 2.0 for the specific language
* governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow
package collectors
package scalastream
package com.snowplowanalytics.snowplow.collectors.scalastream

import java.io.File

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import akka.stream.ActorMaterializer
import com.snowplowanalytics.snowplow.collectors.scalastream.metrics._
import com.snowplowanalytics.snowplow.collectors.scalastream.model._
import com.typesafe.config.{Config, ConfigFactory}
import org.slf4j.LoggerFactory
import pureconfig._

import metrics._
import model._

// Main entry point of the Scala collector.
trait Collector {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ import akka.http.scaladsl.model.{ContentType, HttpResponse, StatusCode, StatusCo
import akka.http.scaladsl.model.headers.HttpCookiePair
import akka.http.scaladsl.server.{Directive1, Route}
import akka.http.scaladsl.server.Directives._
import com.snowplowanalytics.snowplow.collectors.scalastream.model.DntCookieMatcher

import monitoring.BeanRegistry
import model.DntCookieMatcher

trait CollectorRoute {
def collectorService: Service
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,18 @@
* implied. See the Apache License Version 2.0 for the specific language
* governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow
package collectors.scalastream
package com.snowplowanalytics.snowplow.collectors.scalastream

import java.util.UUID

import scala.collection.JavaConverters._

import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers._
import com.snowplowanalytics.snowplow.CollectorPayload.thrift.model1.CollectorPayload
import org.apache.commons.codec.binary.Base64
import org.slf4j.LoggerFactory
import scalaz._

import CollectorPayload.thrift.model1.CollectorPayload
import enrich.common.outputs.BadRow
import generated.BuildInfo
import model._
import utils.SplitBatch
Expand Down Expand Up @@ -266,10 +263,7 @@ class CollectorService(
if (canReplace) target.replaceAllLiterally(token, event.networkUserId)
else target
(HttpResponse(StatusCodes.Found).withHeaders(`RawHeader`("Location", replacedTarget)), Nil)
case None =>
val badRow = createBadRow(event, "Redirect failed due to lack of u parameter")
(HttpResponse(StatusCodes.BadRequest),
sinks.bad.storeRawEvents(List(badRow), partitionKey))
case None => (HttpResponse(StatusCodes.BadRequest), Nil)
}

/**
Expand Down Expand Up @@ -374,10 +368,4 @@ class CollectorService(
case Some(`Origin`(origin)) => HttpOriginRange.Default(origin)
case _ => HttpOriginRange.`*`
})

/** Puts together a bad row ready for sinking */
private def createBadRow(event: CollectorPayload, message: String): Array[Byte] =
BadRow(new String(SplitBatch.ThriftSerializer.get().serialize(event)), NonEmptyList(message))
.toCompactJson
.getBytes("UTF-8")
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
package com.snowplowanalytics.snowplow.collectors.scalastream
package metrics

import java.io.StringWriter
import java.time.Duration

import akka.http.scaladsl.model.{HttpMethod, StatusCode, Uri}
import io.prometheus.client.exporter.common.TextFormat
import io.prometheus.client.{CollectorRegistry, Counter, Gauge, Histogram}
import org.apache.commons.io.output.StringBuilderWriter

import generated.BuildInfo
import PrometheusMetricsService.Metrics._
Expand All @@ -32,11 +32,8 @@ import model.PrometheusMetricsConfig
* and report generation based on collected statistics
*/
trait MetricsService {

def observeRequest(method: HttpMethod, uri: Uri, status: StatusCode, duration: Duration): Unit

def report(): String

}

/**
Expand Down Expand Up @@ -70,15 +67,13 @@ class PrometheusMetricsService(metricsConfig: PrometheusMetricsConfig) extends M
}

override def report(): String = {
val writer = new StringBuilderWriter()
val writer = new StringWriter()
TextFormat.write004(writer, registry.metricFamilySamples())
writer.getBuilder.toString
writer.toString
}

}

object PrometheusMetricsService {

final val NanosecondsInSecond: Double = Math.pow(10, 9)

object Metrics {
Expand All @@ -90,5 +85,4 @@ object PrometheusMetricsService {

final val Labels = Seq("endpoint", "method", "code")
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package com.snowplowanalytics.snowplow.collectors.scalastream
import scala.concurrent.duration.FiniteDuration

import akka.http.scaladsl.model.headers.HttpCookiePair
import io.circe.Json

import sinks.Sink

Expand All @@ -26,7 +27,7 @@ package model {
* Case class for holding both good and
* bad sinks for the Stream Collector.
*/
case class CollectorSinks(good: Sink, bad: Sink)
final case class CollectorSinks(good: Sink, bad: Sink)

/**
* Case class for holding the results of
Expand All @@ -35,15 +36,15 @@ package model {
* @param good All good results
* @param bad All bad results
*/
case class EventSerializeResult(good: List[Array[Byte]], bad: List[Array[Byte]])
final case class EventSerializeResult(good: List[Array[Byte]], bad: List[Array[Byte]])

/**
* Class for the result of splitting a too-large array of events in the body of a POST request
*
* @param goodBatches List of batches of events
* @param failedBigEvents List of events that were too large
*/
case class SplitBatchResult(goodBatches: List[List[String]], failedBigEvents: List[String])
final case class SplitBatchResult(goodBatches: List[List[Json]], failedBigEvents: List[Json])

final case class CookieConfig(
enabled: Boolean,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.slf4j.LoggerFactory
trait Sink {

// Maximum number of bytes that a single record can contain
val MaxBytes: Long
val MaxBytes: Int

lazy val log = LoggerFactory.getLogger(getClass())

Expand Down

This file was deleted.

Loading

0 comments on commit b466312

Please sign in to comment.