Skip to content

Commit

Permalink
JavaNetRegistryLookup amendment 1
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed May 19, 2023
1 parent 106c8d7 commit 9991c08
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,22 @@ import com.snowplowanalytics.iglu.core.circe.implicits._
import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaList}

import java.net.UnknownHostException
import java.net.URI
import java.net.http.HttpResponse.BodyHandlers
import java.net.http.{HttpClient, HttpRequest, HttpResponse}
import java.time.Duration

import scala.util.control.NonFatal

object JavaNetRegistryLookup {

private val ReadTimeoutMs = 4000L

private lazy val httpClient = HttpClient
.newBuilder()
.connectTimeout(Duration.ofMillis(1000))
.build()

implicit def ioLookupInstance[F[_]](implicit F: Sync[F]): RegistryLookup[F] =
new RegistryLookup[F] {
def lookup(repositoryRef: Registry, schemaKey: SchemaKey): F[Either[RegistryError, Json]] =
Expand Down Expand Up @@ -60,7 +72,7 @@ object JavaNetRegistryLookup {
case Registry.Http(_, connection) =>
Utils
.stringToUri(RegistryLookup.toPath(connection.uri.toString, schemaKey))
.flatMap(uri => Utils.unsafeGetFromUri(uri, connection.apikey))
.flatMap(uri => unsafeGetFromUri(uri, connection.apikey))
case Registry.Embedded(_, base) =>
val path = RegistryLookup.toPath(base, schemaKey)
Utils.unsafeEmbeddedLookup(path)
Expand All @@ -77,7 +89,7 @@ object JavaNetRegistryLookup {
registry match {
case Registry.Http(_, connection) =>
val subpath = RegistryLookup.toSubpath(connection.uri.toString, vendor, name, model)
Utils.stringToUri(subpath).flatMap(Utils.unsafeHttpList(_, connection.apikey))
Utils.stringToUri(subpath).flatMap(unsafeHttpList(_, connection.apikey))
case Registry.Embedded(_, base) =>
val path = RegistryLookup.toSubpath(base, vendor, name)
Utils.unsafeEmbeddedList(path, model)
Expand All @@ -100,7 +112,7 @@ object JavaNetRegistryLookup {
): F[Either[RegistryError, Json]] =
Utils
.stringToUri(RegistryLookup.toPath(http.uri.toString, key))
.traverse(uri => Utils.getFromUri(uri, http.apikey))
.traverse(uri => getFromUri(uri, http.apikey))
.map { response =>
val result = for {
body <- OptionT(response)
Expand Down Expand Up @@ -129,7 +141,7 @@ object JavaNetRegistryLookup {
): F[Either[RegistryError, SchemaList]] =
Utils
.stringToUri(RegistryLookup.toSubpath(http.uri.toString, vendor, name, model))
.traverse(uri => Utils.getFromUri(uri, http.apikey))
.traverse(uri => getFromUri(uri, http.apikey))
.map { response =>
for {
body <- response
Expand All @@ -139,4 +151,53 @@ object JavaNetRegistryLookup {
} yield list
}

/**
* Read a Json from an URI using optional apikey
* with added optional header, so it is unsafe as well and throws same exceptions
*
* @param uri the URL to fetch the JSON document from
* @param apikey optional apikey UUID to authenticate in Iglu Server
* @return The document at that URL if code is 2xx
*/
def getFromUri[F[_]: Sync](uri: URI, apikey: Option[String]): F[Option[String]] =
Sync[F].blocking(executeCall(uri, apikey))

/** Non-RT analog of [[getFromUri]] */
def unsafeGetFromUri(uri: URI, apikey: Option[String]): Either[RegistryError, Json] =
try {
executeCall(uri, apikey)
.map(parse)
.map(_.leftMap(e => RegistryError.RepoFailure(e.show)))
.getOrElse(RegistryError.NotFound.asLeft)
} catch {
case NonFatal(e) =>
Utils.repoFailure(e).asLeft
}

/** Non-RT analog of [[RegistryLookup.httpList]] */
def unsafeHttpList(uri: URI, apikey: Option[String]): Either[RegistryError, SchemaList] =
for {
json <- unsafeGetFromUri(uri, apikey)
list <- json.as[SchemaList].leftMap(e => RegistryError.RepoFailure(e.show))
} yield list

private def executeCall(uri: URI, apikey: Option[String]): Option[String] = {
val httpRequest = buildLookupRequest(uri, apikey)
val response = httpClient.send(httpRequest, BodyHandlers.ofString())
if (is2xx(response)) response.body.some else None
}

private def buildLookupRequest(uri: URI, apikey: Option[String]): HttpRequest = {
val baseRequest = HttpRequest
.newBuilder(uri)
.timeout(Duration.ofMillis(ReadTimeoutMs))

apikey
.fold(baseRequest)(key => baseRequest.header("apikey", key))
.build()
}

private def is2xx(response: HttpResponse[String]) =
response.statusCode() >= 200 && response.statusCode() <= 299

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@ import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer}

import java.io.{File, InputStream}
import java.net.URI
import java.net.http.HttpResponse.BodyHandlers
import java.net.http.{HttpClient, HttpRequest, HttpResponse}
import java.time.Duration
import scala.util.matching.Regex

// Scala
Expand All @@ -30,7 +27,6 @@ import scala.util.control.NonFatal
// Cats
import cats.effect.Sync
import cats.syntax.either._
import cats.syntax.option._
import cats.syntax.show._
import cats.syntax.traverse._

Expand All @@ -41,40 +37,9 @@ import io.circe.{Decoder, DecodingFailure, Json, ParsingFailure}
// Apache Commons
import org.apache.commons.lang3.exception.ExceptionUtils

// scalaj
import com.snowplowanalytics.iglu.core.SchemaList
import com.snowplowanalytics.iglu.core.circe.CirceIgluCodecs._

private[registries] object Utils {
private val ReadTimeoutMs = 4000L

private lazy val httpClient = HttpClient
.newBuilder()
.connectTimeout(Duration.ofMillis(1000))
.build()

/**
* Read a Json from an URI using optional apikey
* with added optional header, so it is unsafe as well and throws same exceptions
*
* @param uri the URL to fetch the JSON document from
* @param apikey optional apikey UUID to authenticate in Iglu Server
* @return The document at that URL if code is 2xx
*/
def getFromUri[F[_]: Sync](uri: URI, apikey: Option[String]): F[Option[String]] =
Sync[F].blocking(executeCall(uri, apikey))

/** Non-RT analog of [[getFromUri]] */
def unsafeGetFromUri(uri: URI, apikey: Option[String]): Either[RegistryError, Json] =
try {
executeCall(uri, apikey)
.map(parse)
.map(_.leftMap(e => RegistryError.RepoFailure(e.show)))
.getOrElse(RegistryError.NotFound.asLeft)
} catch {
case NonFatal(e) =>
repoFailure(e).asLeft
}

def unsafeEmbeddedList(path: String, modelMatch: Int): Either[RegistryError, SchemaList] =
try {
Expand Down Expand Up @@ -149,13 +114,6 @@ private[registries] object Utils {
}
}

/** Non-RT analog of [[RegistryLookup.httpList]] */
def unsafeHttpList(uri: URI, apikey: Option[String]): Either[RegistryError, SchemaList] =
for {
json <- unsafeGetFromUri(uri, apikey)
list <- json.as[SchemaList].leftMap(e => RegistryError.RepoFailure(e.show))
} yield list

/**
* A wrapper around Java's URI.
*
Expand All @@ -180,25 +138,6 @@ private[registries] object Utils {
} yield uri
}

private def executeCall(uri: URI, apikey: Option[String]): Option[String] = {
val httpRequest = buildLookupRequest(uri, apikey)
val response = httpClient.send(httpRequest, BodyHandlers.ofString())
if (is2xx(response)) response.body.some else None
}

private def buildLookupRequest(uri: URI, apikey: Option[String]): HttpRequest = {
val baseRequest = HttpRequest
.newBuilder(uri)
.timeout(Duration.ofMillis(ReadTimeoutMs))

apikey
.fold(baseRequest)(key => baseRequest.header("apikey", key))
.build()
}

private def is2xx(response: HttpResponse[String]) =
response.statusCode() >= 200 && response.statusCode() <= 299

private[resolver] def readResource[F[_]: Sync](path: String): F[Option[InputStream]] =
Sync[F].delay(unsafeReadResource(path))
private[resolver] def unsafeReadResource(path: String): Option[InputStream] =
Expand Down

0 comments on commit 9991c08

Please sign in to comment.