From 381fe440cf37f9aab1e48964cc4c30558133291b Mon Sep 17 00:00:00 2001 From: Bendix Saeltz Date: Wed, 9 Dec 2020 17:17:20 +0100 Subject: [PATCH 1/6] Draft Gzip compression --- .../embeddedhttp/SunEmbeddedHttpServer.scala | 38 +++++++++++++++---- 1 file changed, 30 insertions(+), 8 deletions(-) diff --git a/reporters/kamon-prometheus/src/main/scala/kamon/prometheus/embeddedhttp/SunEmbeddedHttpServer.scala b/reporters/kamon-prometheus/src/main/scala/kamon/prometheus/embeddedhttp/SunEmbeddedHttpServer.scala index 7ce5860d5..e61718af7 100644 --- a/reporters/kamon-prometheus/src/main/scala/kamon/prometheus/embeddedhttp/SunEmbeddedHttpServer.scala +++ b/reporters/kamon-prometheus/src/main/scala/kamon/prometheus/embeddedhttp/SunEmbeddedHttpServer.scala @@ -16,12 +16,15 @@ package kamon.prometheus.embeddedhttp -import java.net.{InetAddress, InetSocketAddress} -import java.nio.charset.StandardCharsets - import com.sun.net.httpserver.{HttpExchange, HttpHandler, HttpServer} import com.typesafe.config.Config import kamon.prometheus.ScrapeSource +import kamon.prometheus.embeddedhttp.SunEmbeddedHttpServer.shouldUseCompression + +import java.net.{InetAddress, InetSocketAddress} +import java.nio.charset.StandardCharsets +import java.util.zip.GZIPOutputStream +import scala.collection.JavaConverters._ class SunEmbeddedHttpServer(hostname: String, port: Int, scrapeSource: ScrapeSource, config: Config) extends EmbeddedHttpServer(hostname, port, scrapeSource, config) { private val server = { @@ -31,16 +34,19 @@ class SunEmbeddedHttpServer(hostname: String, port: Int, scrapeSource: ScrapeSou override def handle(httpExchange: HttpExchange): Unit = { val data = scrapeSource.scrapeData() val bytes = data.getBytes(StandardCharsets.UTF_8) - httpExchange.sendResponseHeaders(200, bytes.length) val os = httpExchange.getResponseBody try { os.write(bytes) - } - finally - os.close() + if (shouldUseCompression(httpExchange)) { + val gzip = new GZIPOutputStream(os) + httpExchange.sendResponseHeaders(200, 0) + gzip.write(bytes) + gzip.close() + } else httpExchange.sendResponseHeaders(200, bytes.length) + } finally os.close() } - } + s.createContext("/metrics", handler) s.createContext("/", handler) s.start() @@ -49,3 +55,19 @@ class SunEmbeddedHttpServer(hostname: String, port: Int, scrapeSource: ScrapeSou def stop(): Unit = server.stop(0) } + +object SunEmbeddedHttpServer { + def shouldUseCompression(httpExchange: HttpExchange): Boolean = { + val encodingHeaders = httpExchange.getRequestHeaders.asScala.get("Accept-Encoding") + encodingHeaders match { + case Some(headerList) => + val trimmedEncodings = for { + encodingHeader <- headerList.asScala + encodings = encodingHeader.split(",") + encoding <- encodings + } yield encoding.trim().toLowerCase() + trimmedEncodings.contains("gzip") + case None => false + } + } +} \ No newline at end of file From d98f3506d36dbfb309334a1bfe3f61622dca6dc0 Mon Sep 17 00:00:00 2001 From: Bendix Saeltz Date: Wed, 16 Dec 2020 16:50:54 +0100 Subject: [PATCH 2/6] Review changes --- .../embeddedhttp/SunEmbeddedHttpServer.scala | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/reporters/kamon-prometheus/src/main/scala/kamon/prometheus/embeddedhttp/SunEmbeddedHttpServer.scala b/reporters/kamon-prometheus/src/main/scala/kamon/prometheus/embeddedhttp/SunEmbeddedHttpServer.scala index 37cb8833b..6e605a45b 100644 --- a/reporters/kamon-prometheus/src/main/scala/kamon/prometheus/embeddedhttp/SunEmbeddedHttpServer.scala +++ b/reporters/kamon-prometheus/src/main/scala/kamon/prometheus/embeddedhttp/SunEmbeddedHttpServer.scala @@ -37,13 +37,15 @@ class SunEmbeddedHttpServer(hostname: String, port: Int, scrapeSource: ScrapeSou val bytes = data.getBytes(StandardCharsets.UTF_8) val os = httpExchange.getResponseBody try { - os.write(bytes) if (shouldUseCompression(httpExchange)) { - val gzip = new GZIPOutputStream(os) - httpExchange.sendResponseHeaders(200, 0) - gzip.write(bytes) - gzip.close() - } else httpExchange.sendResponseHeaders(200, bytes.length) + val gzip = new GZIPOutputStream(os) + httpExchange.sendResponseHeaders(200, 0) + httpExchange.getResponseHeaders.set("Content-Encoding", "gzip") + gzip.write(bytes) + } else { + httpExchange.sendResponseHeaders(200, bytes.length) + os.write(bytes) + } } finally os.close() } else httpExchange.sendResponseHeaders(404, -1) } @@ -71,4 +73,4 @@ object SunEmbeddedHttpServer { case None => false } } -} \ No newline at end of file +} From 39827262eb3bb40fe4f3e2cd00f01da765d4a389 Mon Sep 17 00:00:00 2001 From: Bendix Saeltz Date: Tue, 22 Dec 2020 12:14:41 +0100 Subject: [PATCH 3/6] Add test for gzipped metrics --- .../prometheus/EmbeddedHttpServerSpec.scala | 26 ++++++++++++++----- 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/reporters/kamon-prometheus/src/test/scala/kamon/prometheus/EmbeddedHttpServerSpec.scala b/reporters/kamon-prometheus/src/test/scala/kamon/prometheus/EmbeddedHttpServerSpec.scala index 22b736b55..1f7325791 100644 --- a/reporters/kamon-prometheus/src/test/scala/kamon/prometheus/EmbeddedHttpServerSpec.scala +++ b/reporters/kamon-prometheus/src/test/scala/kamon/prometheus/EmbeddedHttpServerSpec.scala @@ -9,6 +9,7 @@ import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpec} class SunHttpServerSpecSuite extends EmbeddedHttpServerSpecSuite { override def testConfig: Config = ConfigFactory.load() } + //class NanoHttpServerSpecSuite extends EmbeddedHttpServerSpecSuite { // override def testConfig: Config = ConfigFactory.parseString( // """ @@ -22,6 +23,7 @@ class SunHttpServerSpecSuite extends EmbeddedHttpServerSpecSuite { abstract class EmbeddedHttpServerSpecSuite extends WordSpec with Matchers with BeforeAndAfterAll with KamonTestSnapshotSupport { protected def testConfig: Config + protected def port: Int = 9095 private var testee: PrometheusReporter = _ @@ -74,13 +76,25 @@ abstract class EmbeddedHttpServerSpecSuite extends WordSpec with Matchers with B httpGetMetrics("/metricsss") } } + + "gzipped metrics have smaller size" in { + //arrange + testee.reportPeriodSnapshot(counter("jvm.mem")) + //act + val metrics = httpGetMetrics("/metrics") + val gzippedMetrics = httpGetMetrics("/metrics", gzip = true) + //assert + metrics should be < gzippedMetrics + } } - private def httpGetMetrics(endpoint: String): String = { - val src = scala.io.Source.fromURL(new URL(s"http://127.0.0.1:$port$endpoint")) - try - src.mkString - finally - src.close() + private def httpGetMetrics(endpoint: String, gzip: Boolean = false): String = { + val url = new URL(s"http://127.0.0.1:$port$endpoint") + val connection = url.openConnection + if (gzip) connection.setRequestProperty("Accept-Encoding", "gzip") + val src = scala.io.Source.fromInputStream(connection.getInputStream) + if (gzip) connection.getRequestProperty("Accept-Encoding") shouldBe "gzip" + try src.getLines.mkString + finally src.close() } } From ca6d56feea0659dd1ab77b96c42a5ab13da7a14c Mon Sep 17 00:00:00 2001 From: Bendix Saeltz Date: Tue, 22 Dec 2020 12:50:13 +0100 Subject: [PATCH 4/6] Fix test for gzipped metrics --- .../prometheus/EmbeddedHttpServerSpec.scala | 25 +++++++++++++------ 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/reporters/kamon-prometheus/src/test/scala/kamon/prometheus/EmbeddedHttpServerSpec.scala b/reporters/kamon-prometheus/src/test/scala/kamon/prometheus/EmbeddedHttpServerSpec.scala index 1f7325791..063fe01bf 100644 --- a/reporters/kamon-prometheus/src/test/scala/kamon/prometheus/EmbeddedHttpServerSpec.scala +++ b/reporters/kamon-prometheus/src/test/scala/kamon/prometheus/EmbeddedHttpServerSpec.scala @@ -2,10 +2,11 @@ package kamon.prometheus import java.io.FileNotFoundException import java.net.URL - import com.typesafe.config.{Config, ConfigFactory} import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpec} +import java.util.zip.GZIPInputStream + class SunHttpServerSpecSuite extends EmbeddedHttpServerSpecSuite { override def testConfig: Config = ConfigFactory.load() } @@ -82,19 +83,27 @@ abstract class EmbeddedHttpServerSpecSuite extends WordSpec with Matchers with B testee.reportPeriodSnapshot(counter("jvm.mem")) //act val metrics = httpGetMetrics("/metrics") - val gzippedMetrics = httpGetMetrics("/metrics", gzip = true) + val gzippedMetrics = httpGetGzippedMetrics("/metrics") //assert - metrics should be < gzippedMetrics + metrics.length should be < gzippedMetrics.length } } - private def httpGetMetrics(endpoint: String, gzip: Boolean = false): String = { + private def httpGetMetrics(endpoint: String): String = { + val url = new URL(s"http://127.0.0.1:$port$endpoint") + val src = scala.io.Source.fromURL(url) + try src.mkString + finally src.close() + } + + private def httpGetGzippedMetrics(endpoint: String): String = { val url = new URL(s"http://127.0.0.1:$port$endpoint") val connection = url.openConnection - if (gzip) connection.setRequestProperty("Accept-Encoding", "gzip") - val src = scala.io.Source.fromInputStream(connection.getInputStream) - if (gzip) connection.getRequestProperty("Accept-Encoding") shouldBe "gzip" + connection.setRequestProperty("Accept-Encoding", "gzip") + val gzipStream = new GZIPInputStream(connection.getInputStream) + val src = scala.io.Source.fromInputStream(gzipStream) + connection.getRequestProperty("Accept-Encoding") shouldBe "gzip" try src.getLines.mkString - finally src.close() + finally gzipStream.close() } } From 16b64bc912ed4999c41f8786181286be3c1a6669 Mon Sep 17 00:00:00 2001 From: SimunKaracic Date: Wed, 13 Jan 2021 19:05:08 +0100 Subject: [PATCH 5/6] Refactor gzip implementation It was not closing connections correctly on gzip requests --- .../embeddedhttp/SunEmbeddedHttpServer.scala | 45 ++++++++++--------- 1 file changed, 24 insertions(+), 21 deletions(-) diff --git a/reporters/kamon-prometheus/src/main/scala/kamon/prometheus/embeddedhttp/SunEmbeddedHttpServer.scala b/reporters/kamon-prometheus/src/main/scala/kamon/prometheus/embeddedhttp/SunEmbeddedHttpServer.scala index 6e605a45b..ff0af4c0e 100644 --- a/reporters/kamon-prometheus/src/main/scala/kamon/prometheus/embeddedhttp/SunEmbeddedHttpServer.scala +++ b/reporters/kamon-prometheus/src/main/scala/kamon/prometheus/embeddedhttp/SunEmbeddedHttpServer.scala @@ -21,10 +21,13 @@ import com.typesafe.config.Config import kamon.prometheus.ScrapeSource import kamon.prometheus.embeddedhttp.SunEmbeddedHttpServer.shouldUseCompression +import java.io.OutputStream import java.net.{InetAddress, InetSocketAddress} import java.nio.charset.StandardCharsets +import java.util import java.util.zip.GZIPOutputStream import scala.collection.JavaConverters._ +import scala.collection.mutable class SunEmbeddedHttpServer(hostname: String, port: Int, scrapeSource: ScrapeSource, config: Config) extends EmbeddedHttpServer(hostname, port, scrapeSource, config) { private val server = { @@ -35,18 +38,19 @@ class SunEmbeddedHttpServer(hostname: String, port: Int, scrapeSource: ScrapeSou if (httpExchange.getRequestURI.getPath == "/metrics") { val data = scrapeSource.scrapeData() val bytes = data.getBytes(StandardCharsets.UTF_8) - val os = httpExchange.getResponseBody + var os: OutputStream = null try { if (shouldUseCompression(httpExchange)) { - val gzip = new GZIPOutputStream(os) - httpExchange.sendResponseHeaders(200, 0) - httpExchange.getResponseHeaders.set("Content-Encoding", "gzip") - gzip.write(bytes) - } else { - httpExchange.sendResponseHeaders(200, bytes.length) - os.write(bytes) - } - } finally os.close() + httpExchange.getResponseHeaders.set("Content-Encoding", "gzip") + httpExchange.sendResponseHeaders(200, 0) + os = new GZIPOutputStream(httpExchange.getResponseBody) + os.write(bytes) + } else { + os = httpExchange.getResponseBody + httpExchange.sendResponseHeaders(200, bytes.length) + os.write(bytes) + } + } finally Option(os).map(_.close()) } else httpExchange.sendResponseHeaders(404, -1) } } @@ -61,16 +65,15 @@ class SunEmbeddedHttpServer(hostname: String, port: Int, scrapeSource: ScrapeSou object SunEmbeddedHttpServer { def shouldUseCompression(httpExchange: HttpExchange): Boolean = { - val encodingHeaders = httpExchange.getRequestHeaders.asScala.get("Accept-Encoding") - encodingHeaders match { - case Some(headerList) => - val trimmedEncodings = for { - encodingHeader <- headerList.asScala - encodings = encodingHeader.split(",") - encoding <- encodings - } yield encoding.trim().toLowerCase() - trimmedEncodings.contains("gzip") - case None => false - } + httpExchange.getRequestHeaders + .asScala.get("Accept-Encoding") + .map(extractEncodings) + .exists(_.contains("gzip")) + } + + private def extractEncodings(headerList: util.List[String]): mutable.Buffer[String] = { + headerList.asScala + .flatMap(_.split(",")) + .map(_.trim().toLowerCase()) } } From f99577eca9ebf317fbbcb5caa6b5238e9bad6be6 Mon Sep 17 00:00:00 2001 From: SimunKaracic Date: Wed, 13 Jan 2021 19:13:47 +0100 Subject: [PATCH 6/6] Fix gzip test --- .../test/scala/kamon/prometheus/EmbeddedHttpServerSpec.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/reporters/kamon-prometheus/src/test/scala/kamon/prometheus/EmbeddedHttpServerSpec.scala b/reporters/kamon-prometheus/src/test/scala/kamon/prometheus/EmbeddedHttpServerSpec.scala index 063fe01bf..44c7095dd 100644 --- a/reporters/kamon-prometheus/src/test/scala/kamon/prometheus/EmbeddedHttpServerSpec.scala +++ b/reporters/kamon-prometheus/src/test/scala/kamon/prometheus/EmbeddedHttpServerSpec.scala @@ -78,14 +78,14 @@ abstract class EmbeddedHttpServerSpecSuite extends WordSpec with Matchers with B } } - "gzipped metrics have smaller size" in { + "respect gzip Content-Encoding headers" in { //arrange testee.reportPeriodSnapshot(counter("jvm.mem")) //act val metrics = httpGetMetrics("/metrics") val gzippedMetrics = httpGetGzippedMetrics("/metrics") //assert - metrics.length should be < gzippedMetrics.length + metrics.length should be > gzippedMetrics.length } }