Skip to content

Commit

Permalink
Embedded and in-memory lookups should be common across http modules (c…
Browse files Browse the repository at this point in the history
…lose #244)
  • Loading branch information
istreeter authored and oguzhanunlu committed May 30, 2023
1 parent d94f20b commit 5b8f84a
Show file tree
Hide file tree
Showing 7 changed files with 275 additions and 236 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*
* Copyright (c) 2014-2023 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package com.snowplowanalytics.iglu.client.resolver.registries

// Java
import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer}

import java.io.{File, InputStream}
import scala.util.matching.Regex

// Scala
import scala.io.Source
import scala.util.control.NonFatal

// Cats
import cats.effect.Sync
import cats.effect.implicits._
import cats.implicits._
import cats.data.EitherT

// circe
import io.circe.parser.parse
import io.circe.Json

import com.snowplowanalytics.iglu.core.SchemaList

private[registries] object Embedded {

/**
* Retrieves an Iglu Schema from the Embedded Iglu Repo as a JSON
*
* @param base path on the local filesystem system
* @param key The SchemaKey uniquely identifying the schema in Iglu
* @return either a `Json` on success, or `RegistryError` in case of any failure
* (i.e. all exceptions should be swallowed by `RegistryError`)
*/
def lookup[F[_]: Sync](
base: String,
key: SchemaKey
): F[Either[RegistryError, Json]] = {
val path = RegistryLookup.toPath(base, key)
val is = readResource[F](path)
val schema = is.bracket(_.traverse(fromStream[F]))(_.traverse_(closeStream[F]))
val result = for {
stringOption <- schema.attemptT.leftMap(Utils.repoFailure)
string <- EitherT.fromOption[F](stringOption, RegistryError.NotFound: RegistryError)
json <- EitherT.fromEither[F](parse(string)).leftMap(Utils.invalidSchema)
} yield json

result.value
}

/** Not-RT analog of [[Embedded.lookup]] */
def unsafeLookup(path: String): Either[RegistryError, Json] =
try {
val is = unsafeReadResource(path)
val schema = is.map(unsafeFromStream)
val result = schema
.toRight(RegistryError.NotFound: RegistryError)
.flatMap(x => parse(x).leftMap(Utils.invalidSchema))
is.fold(())(unsafeCloseStream)
result
} catch {
case NonFatal(e) =>
e match {
case _: NullPointerException => RegistryError.NotFound.asLeft
case _ => Utils.repoFailure(e).asLeft
}
}

def unsafeList(path: String, modelMatch: Int): Either[RegistryError, SchemaList] =
try {
val d =
new File(
getClass.getResource(path).getPath
) // this will throw NPE for missing entry in embedded repos
val schemaFileRegex: Regex = (".*/schemas/?" + // path to file
"([a-zA-Z0-9-_.]+)/" + // Vendor
"([a-zA-Z0-9-_]+)/" + // Name
"([a-zA-Z0-9-_]+)/" + // Format
"([1-9][0-9]*)-(\\d+)-(\\d+)$").r // MODEL, REVISION and ADDITION

def getFolderContent(d: File): List[String] = {
d.listFiles
.filter(_.isFile)
.toList
.filter(_.getName.startsWith(s"${modelMatch.toString}-"))
.map(_.getAbsolutePath)
}

val content =
if (d.exists & d.isDirectory)
getFolderContent(d)
else
List.empty[String]

content
.traverse {
case schemaFileRegex(vendor, name, format, model, revision, addition)
if model == modelMatch.toString =>
SchemaKey(
vendor = vendor,
name = name,
format = format,
version = SchemaVer
.Full(model = model.toInt, revision = revision.toInt, addition = addition.toInt)
).asRight
case f => RegistryError.RepoFailure(s"Corrupted schema file name at $f").asLeft
}
.map(_.sortBy(_.version))
.flatMap(s =>
if (s.isEmpty)
RegistryError.NotFound.asLeft
else
s.asRight
)
.map(SchemaList.parseUnsafe)
} catch {
case NonFatal(e) =>
e match {
case _: NullPointerException => RegistryError.NotFound.asLeft
case _ => Utils.repoFailure(e).asLeft
}
}

private def readResource[F[_]: Sync](path: String): F[Option[InputStream]] =
Sync[F].delay(unsafeReadResource(path))
private def unsafeReadResource(path: String): Option[InputStream] =
Option(getClass.getResource(path)).map(_.openStream())

private def fromStream[F[_]: Sync](is: InputStream): F[String] =
Sync[F].delay(unsafeFromStream(is))
private def unsafeFromStream(is: InputStream): String =
Source.fromInputStream(is).mkString

private def closeStream[F[_]: Sync](is: InputStream): F[Unit] =
Sync[F].delay(unsafeCloseStream(is))
private def unsafeCloseStream(is: InputStream): Unit =
is.close()

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,28 +40,20 @@ object JavaNetRegistryLookup {
.build()

implicit def ioLookupInstance[F[_]](implicit F: Sync[F]): RegistryLookup[F] =
new RegistryLookup[F] {
def lookup(repositoryRef: Registry, schemaKey: SchemaKey): F[Either[RegistryError, Json]] =
repositoryRef match {
case Registry.Http(_, connection) => httpLookup(connection, schemaKey)
case Registry.Embedded(_, path) => RegistryLookup.embeddedLookup[F](path, schemaKey)
case Registry.InMemory(_, schemas) =>
F.delay(RegistryLookup.inMemoryLookup(schemas, schemaKey))
}

def list(
registry: Registry,
new RegistryLookup.StdRegistryLookup[F] {
def httpLookup(
registry: Registry.Http,
schemaKey: SchemaKey
): F[Either[RegistryError, Json]] =
lookupImpl(registry.http, schemaKey)

def httpList(
registry: Registry.Http,
vendor: String,
name: String,
model: Int
): F[Either[RegistryError, SchemaList]] =
registry match {
case Registry.Http(_, connection) => httpList(connection, vendor, name, model)
case Registry.Embedded(_, base) =>
val path = toSubpath(base, vendor, name)
Sync[F].delay(Utils.unsafeEmbeddedList(path, model))
case _ => F.pure(RegistryError.NotFound.asLeft)
}
listImpl(registry.http, vendor, name, model)
}

// Id instance also swallows all exceptions into `RegistryError`
Expand All @@ -75,7 +67,7 @@ object JavaNetRegistryLookup {
.flatMap(uri => unsafeGetFromUri(uri, connection.apikey))
case Registry.Embedded(_, base) =>
val path = RegistryLookup.toPath(base, schemaKey)
Utils.unsafeEmbeddedLookup(path)
Embedded.unsafeLookup(path)
case Registry.InMemory(_, schemas) =>
RegistryLookup.inMemoryLookup(schemas, schemaKey)
}
Expand All @@ -88,11 +80,11 @@ object JavaNetRegistryLookup {
): Id[Either[RegistryError, SchemaList]] =
registry match {
case Registry.Http(_, connection) =>
val subpath = toSubpath(connection.uri.toString, vendor, name, model)
val subpath = RegistryLookup.toSubpath(connection.uri.toString, vendor, name, model)
Utils.stringToUri(subpath).flatMap(unsafeHttpList(_, connection.apikey))
case Registry.Embedded(_, base) =>
val path = toSubpath(base, vendor, name)
Utils.unsafeEmbeddedList(path, model)
val path = RegistryLookup.toSubpath(base, vendor, name)
Embedded.unsafeList(path, model)
case _ =>
RegistryError.NotFound.asLeft
}
Expand All @@ -106,7 +98,7 @@ object JavaNetRegistryLookup {
* @return either a `Json` on success, or `RegistryError` in case of any failure
* (i.e. all exceptions should be swallowed by `RegistryError`)
*/
private def httpLookup[F[_]: Sync](
private def lookupImpl[F[_]: Sync](
http: Registry.HttpConnection,
key: SchemaKey
): F[Either[RegistryError, Json]] =
Expand All @@ -124,23 +116,19 @@ object JavaNetRegistryLookup {

result.getOrElseF[Json](RegistryError.NotFound.asLeft)
}
.recover {
case uhe: UnknownHostException =>
val error = s"Unknown host issue fetching: ${uhe.getMessage}"
RegistryError.RepoFailure(error).asLeft
case NonFatal(nfe) =>
val error = s"Unexpected exception fetching: $nfe"
RegistryError.RepoFailure(error).asLeft
.recover { case uhe: UnknownHostException =>
val error = s"Unknown host issue fetching: ${uhe.getMessage}"
RegistryError.RepoFailure(error).asLeft
}

private def httpList[F[_]: Sync](
private def listImpl[F[_]: Sync](
http: Registry.HttpConnection,
vendor: String,
name: String,
model: Int
): F[Either[RegistryError, SchemaList]] =
Utils
.stringToUri(toSubpath(http.uri.toString, vendor, name, model))
.stringToUri(RegistryLookup.toSubpath(http.uri.toString, vendor, name, model))
.traverse(uri => getFromUri(uri, http.apikey))
.map { response =>
for {
Expand Down Expand Up @@ -200,19 +188,4 @@ object JavaNetRegistryLookup {
private def is2xx(response: HttpResponse[String]) =
response.statusCode() >= 200 && response.statusCode() <= 299

private def toSubpath(
prefix: String,
vendor: String,
name: String,
model: Int
): String =
s"${prefix.stripSuffix("/")}/schemas/$vendor/$name/jsonschema/$model"

private def toSubpath(
prefix: String,
vendor: String,
name: String
): String =
s"${prefix.stripSuffix("/")}/schemas/$vendor/$name/jsonschema"

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,12 @@ package com.snowplowanalytics.iglu.client.resolver.registries
// Java
import java.net.URI

// Cats
import cats.syntax.either._
import cats.syntax.show._

// circe
import io.circe.{Decoder, HCursor, Json}
import io.circe.{Decoder, DecodingFailure, HCursor, Json}

// Iglu Core
import com.snowplowanalytics.iglu.core.SelfDescribingSchema
Expand All @@ -35,7 +39,6 @@ sealed trait Registry extends Product with Serializable {
}

object Registry {
import Utils._

/**
* An embedded repository is one which is embedded inside the calling code,
Expand Down Expand Up @@ -88,6 +91,14 @@ object Registry {
/** Helper class to extract HTTP URI and api key from config JSON */
case class HttpConnection(uri: URI, apikey: Option[String])

private implicit val uriCirceJsonDecoder: Decoder[URI] =
Decoder.instance { cursor =>
for {
string <- cursor.as[String]
uri <- Utils.stringToUri(string).leftMap(e => DecodingFailure(e.show, cursor.history))
} yield uri
}

implicit val httpConnectionDecoder: Decoder[HttpConnection] =
new Decoder[HttpConnection] {
def apply(c: HCursor): Decoder.Result[HttpConnection] =
Expand Down

0 comments on commit 5b8f84a

Please sign in to comment.