diff --git a/README.md b/README.md index 90685e986..73af36e20 100644 --- a/README.md +++ b/README.md @@ -25,6 +25,7 @@ This library includes the following modules. - `magnolify-datastore` - conversion between Scala types and [Google Cloud Datastore](https://cloud.google.com/datastore/) `Entity` - `magnolify-guava` - type class derivation for [Guava](https://guava.dev) - [`Funnel[T]`](https://guava.dev/releases/snapshot-jre/api/docs/com/google/common/hash/Funnel.html) +- `magnolify-hbase` - type class derivation for [HBase](https://hbase.apache.org) `Result` and `Put` - `magnolify-neo4j` - conversion between Scala types and [Value](https://neo4j.com/docs/driver-manual/1.7/cypher-values/) - `magnolify-parquet` - support for [Parquet](http://parquet.apache.org/) columnar storage format. - `magnolify-protobuf` - conversion between Scala types and [Google Protocol Buffer](https://developers.google.com/protocol-buffers/docs/overview) `Message` diff --git a/build.sbt b/build.sbt index 243386935..cf507b84b 100644 --- a/build.sbt +++ b/build.sbt @@ -26,6 +26,7 @@ val bigtableVersion = "2.27.4" val catsVersion = "2.10.0" val datastoreVersion = "2.17.2" val guavaVersion = "32.1.2-jre" +val hbaseVersion = "2.5.5-hadoop3" val hadoopVersion = "3.3.6" val jacksonVersion = "2.15.2" val jodaTimeVersion = "2.12.5" @@ -247,6 +248,7 @@ lazy val root = project cats, datastore, guava, + hbase, parquet, protobuf, refined, @@ -519,6 +521,23 @@ lazy val neo4j = project ) ) +lazy val hbase = project + .in(file("hbase")) + .dependsOn( + shared, + cats % "test->test", + scalacheck % "test->test", + test % "test->test" + ) + .settings( + commonSettings, + moduleName := "magnolify-hbase", + description := "Magnolia add-on for hbase", + libraryDependencies ++= Seq( + "org.apache.hbase" % "hbase-client" % hbaseVersion % Provided + ) + ) + lazy val tools = project .in(file("tools")) .dependsOn( diff --git a/hbase/src/main/java/magnolify/hbase/HBaseType.scala b/hbase/src/main/java/magnolify/hbase/HBaseType.scala new file mode 100644 index 000000000..7404a1a67 --- /dev/null +++ b/hbase/src/main/java/magnolify/hbase/HBaseType.scala @@ -0,0 +1,267 @@ +/* + * Copyright 2023 Spotify AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package magnolify.hbase + +import magnolia1.{CaseClass, Magnolia, SealedTrait} +import magnolify.shared.* +import magnolify.shims.FactoryCompat +import org.apache.hadoop.hbase.client.{Mutation, Put, Result} + +import java.nio.ByteBuffer +import java.nio.charset.StandardCharsets.UTF_8 +import java.util.UUID +import scala.annotation.implicitNotFound +import scala.jdk.CollectionConverters.* +sealed trait HBaseType[T] extends Converter[T, Map[String, Array[Byte]], Map[String, Array[Byte]]] { + def apply(v: Result, family: Array[Byte]): T = + from( + v.getFamilyMap(family) + .asScala + .map { case (qualifier, value) => new String(qualifier, UTF_8) -> value } + .toMap + ) + + def apply(v: T, row: Array[Byte], family: Array[Byte], ts: Long = 0L): Mutation = to(v) + .foldLeft(new Put(row)) { case (row, (qualifier, value)) => + row.addColumn(family, qualifier.getBytes(UTF_8), ts, value) + } +} + +object HBaseType { + implicit def apply[T: HBaseField]: HBaseType[T] = HBaseType(CaseMapper.identity) + + def apply[T](cm: CaseMapper)(implicit f: HBaseField[T]): HBaseType[T] = f match { + case r: HBaseField.Record[_] => + new HBaseType[T] { + private val caseMapper: CaseMapper = cm + + override def from(xs: Map[String, Array[Byte]]): T = r.get(xs, null)(caseMapper).get + + override def to(v: T): Map[String, Array[Byte]] = r.put(null, v)(caseMapper) + } + case _ => + throw new IllegalArgumentException(s"BigtableType can only be created from Record. Got $f") + } +} + +sealed trait HBaseField[T] extends Serializable { + def get(xs: Map[String, Array[Byte]], k: String)(cm: CaseMapper): Value[T] + def put(k: String, v: T)(cm: CaseMapper): Map[String, Array[Byte]] +} + +object HBaseField { + sealed trait Record[T] extends HBaseField[T] + + sealed trait Primitive[T] extends HBaseField[T] { + def size: Option[Int] + def fromBytes(v: Array[Byte]): T + def toBytes(v: T): Array[Byte] + + override def get(xs: Map[String, Array[Byte]], k: String)(cm: CaseMapper): Value[T] = + xs.get(k) match { + case Some(v) => Value.Some(fromBytes(v)) + case None => Value.None + } + + override def put(k: String, v: T)(cm: CaseMapper): Map[String, Array[Byte]] = + Map(k -> toBytes(v)) + } + + // //////////////////////////////////////////////// + type Typeclass[T] = HBaseField[T] + + def join[T](caseClass: CaseClass[Typeclass, T]): HBaseField[T] = { + if (caseClass.isValueClass) { + val p = caseClass.parameters.head + val tc = p.typeclass + new HBaseField[T] { + override def get(xs: Map[String, Array[Byte]], k: String)(cm: CaseMapper): Value[T] = + tc.get(xs, k)(cm).map(x => caseClass.construct(_ => x)) + + override def put(k: String, v: T)(cm: CaseMapper): Map[String, Array[Byte]] = + p.typeclass.put(k, p.dereference(v))(cm) + } + } else { + new Record[T] { + private def qualifier(prefix: String, label: String): String = + if (prefix == null) label else s"$prefix.$label" + + override def get(xs: Map[String, Array[Byte]], k: String)(cm: CaseMapper): Value[T] = { + var fallback = true + val r = caseClass.construct { p => + val q = qualifier(k, cm.map(p.label)) + val v = p.typeclass.get(xs, q)(cm) + if (v.isSome) { + fallback = false + } + v.getOrElse(p.default) + } + // result is default if all fields are default + if (fallback) Value.Default(r) else Value.Some(r) + } + + override def put(k: String, v: T)(cm: CaseMapper): Map[String, Array[Byte]] = + caseClass.parameters.flatMap { p => + p.typeclass.put(qualifier(k, cm.map(p.label)), p.dereference(v))(cm) + }.toMap + } + } + } + + @implicitNotFound("Cannot derive BigtableField for sealed trait") + private sealed trait Dispatchable[T] + + def split[T: Dispatchable](sealedTrait: SealedTrait[Typeclass, T]): HBaseField[T] = ??? + + implicit def gen[T]: HBaseField[T] = macro Magnolia.gen[T] + + def apply[T](implicit f: HBaseField[T]): HBaseField[T] = f + + def from[T]: FromWord[T] = new FromWord + + class FromWord[T] { + def apply[U](f: T => U)(g: U => T)(implicit hbf: Primitive[T]): Primitive[U] = + new Primitive[U] { + override def size: Option[Int] = hbf.size + override def fromBytes(v: Array[Byte]): U = f(hbf.fromBytes(v)) + override def toBytes(v: U): Array[Byte] = hbf.toBytes(g(v)) + } + } + + private def primitive[T]( + capacity: Int + )(f: ByteBuffer => T)(g: (ByteBuffer, T) => ByteBuffer): Primitive[T] = new Primitive[T] { + override val size: Option[Int] = Some(capacity) + override def fromBytes(v: Array[Byte]): T = f(ByteBuffer.wrap(v).asReadOnlyBuffer()) + override def toBytes(v: T): Array[Byte] = g(ByteBuffer.allocate(capacity), v).array() + } + + implicit val hbfByte: Primitive[Byte] = + primitive[Byte](java.lang.Byte.BYTES)(_.get)(_.put(_)) + implicit val btChar: Primitive[Char] = + primitive[Char](java.lang.Character.BYTES)(_.getChar)(_.putChar(_)) + implicit val hbfShort: Primitive[Short] = + primitive[Short](java.lang.Short.BYTES)(_.getShort)(_.putShort(_)) + implicit val hbfInt: Primitive[Int] = + primitive[Int](java.lang.Integer.BYTES)(_.getInt)(_.putInt(_)) + implicit val hbfLong: Primitive[Long] = + primitive[Long](java.lang.Long.BYTES)(_.getLong)(_.putLong(_)) + implicit val hbfFloat: Primitive[Float] = + primitive[Float](java.lang.Float.BYTES)(_.getFloat)(_.putFloat(_)) + implicit val hbfDouble: Primitive[Double] = + primitive[Double](java.lang.Double.BYTES)(_.getDouble)(_.putDouble(_)) + implicit val hbfBoolean: Primitive[Boolean] = + from[Byte](_ == 1)(if (_) 1 else 0) + implicit val hbfUUID: Primitive[UUID] = + primitive[UUID](16)(bb => new UUID(bb.getLong, bb.getLong)) { (bb, uuid) => + bb.putLong(uuid.getMostSignificantBits).putLong(uuid.getLeastSignificantBits) + } + + implicit val hbfByteString: Primitive[Array[Byte]] = new Primitive[Array[Byte]] { + override val size: Option[Int] = None + override def fromBytes(v: Array[Byte]): Array[Byte] = v + override def toBytes(v: Array[Byte]): Array[Byte] = v + } + + implicit val hbfString: Primitive[String] = + from[Array[Byte]](new String(_, UTF_8))(_.getBytes(UTF_8)) + implicit def hbfEnum[T](implicit et: EnumType[T]): Primitive[T] = + from[String](et.from)(et.to) + implicit def hbfUnsafeEnum[T: EnumType]: Primitive[UnsafeEnum[T]] = + from[String](UnsafeEnum.from[T])(UnsafeEnum.to[T]) + + implicit val hbfBigInt: Primitive[BigInt] = + from[Array[Byte]](bs => BigInt(bs))(_.toByteArray) + implicit val hbfBigDecimal: Primitive[BigDecimal] = from[Array[Byte]] { bs => + val bb = ByteBuffer.wrap(bs).asReadOnlyBuffer() + val scale = bb.getInt + val unscaled = new Array[Byte](bb.remaining()) + bb.get(unscaled) + BigDecimal(BigInt(unscaled), scale) + } { bd => + val scale = bd.bigDecimal.scale() + val unscaled = bd.bigDecimal.unscaledValue().toByteArray + val bb = ByteBuffer.allocate(java.lang.Integer.BYTES + unscaled.length) + bb.putInt(scale).put(unscaled).array() + } + + implicit def hbfOption[T](implicit hbf: HBaseField[T]): HBaseField[Option[T]] = + new HBaseField[Option[T]] { + override def get(xs: Map[String, Array[Byte]], k: String)( + cm: CaseMapper + ): Value[Option[T]] = { + val subset = xs.filter { case (qualifier, _) => qualifier.startsWith(k) } + if (subset.isEmpty) Value.Default(None) else hbf.get(subset, k)(cm).toOption + } + + override def put(k: String, v: Option[T])(cm: CaseMapper): Map[String, Array[Byte]] = + v.map(hbf.put(k, _)(cm)).getOrElse(Map.empty) + } + + implicit def hbfIterable[T, C[T]](implicit + hbf: Primitive[T], + ti: C[T] => Iterable[T], + fc: FactoryCompat[T, C[T]] + ): Primitive[C[T]] = + new Primitive[C[T]] { + override val size: Option[Int] = None + + override def fromBytes(v: Array[Byte]): C[T] = { + val buf = ByteBuffer.wrap(v).asReadOnlyBuffer() + val n = buf.getInt + val b = fc.newBuilder + hbf.size match { + case Some(size) => + val ba = new Array[Byte](size) + (1 to n).foreach { _ => + buf.get(ba) + b += hbf.fromBytes(ba) + } + case None => + (1 to n).foreach { _ => + val s = buf.getInt + val ba = new Array[Byte](s) + buf.get(ba) + b += hbf.fromBytes(ba) + } + } + b.result() + } + + override def toBytes(v: C[T]): Array[Byte] = { + val buf = hbf.size match { + case Some(size) => + val bb = ByteBuffer.allocate(java.lang.Integer.BYTES + v.size * size) + bb.putInt(v.size) + v.foreach(x => bb.put(hbf.toBytes(x))) + bb + case None => + val vs = v.map(hbf.toBytes) + val size = + java.lang.Integer.BYTES + vs.iterator.map(_.length + java.lang.Integer.BYTES).sum + val bb = ByteBuffer.allocate(size) + bb.putInt(v.size) + vs.foreach { v => + bb.putInt(v.length) + bb.put(v) + } + bb + } + buf.array() + } + } +} diff --git a/hbase/src/test/scala/magnolify/hbase/HBaseTypeSuite.scala b/hbase/src/test/scala/magnolify/hbase/HBaseTypeSuite.scala new file mode 100644 index 000000000..4edbb2f4d --- /dev/null +++ b/hbase/src/test/scala/magnolify/hbase/HBaseTypeSuite.scala @@ -0,0 +1,87 @@ +/* + * Copyright 2023 Spotify AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import cats.Eq +import magnolify.cats.TestEq.* +import magnolify.cats.auto.* +import magnolify.hbase.{HBaseField, HBaseType} +import magnolify.scalacheck.TestArbitrary.* +import magnolify.scalacheck.auto.* +import magnolify.test.MagnolifySuite +import magnolify.test.Simple.* +import org.apache.hadoop.hbase.client.Result +import org.scalacheck.{Arbitrary, Prop} + +import java.net.URI +import java.nio.charset.StandardCharsets.UTF_8 +import java.time.Duration +import java.util.UUID +import scala.jdk.CollectionConverters.* +import scala.reflect.ClassTag + +class HBaseTypeSuite extends MagnolifySuite { + + private val row: Array[Byte] = "row".getBytes(UTF_8) + private val family: Array[Byte] = "family".getBytes(UTF_8) + + private def test[T: Arbitrary: ClassTag](implicit hbt: HBaseType[T], eq: Eq[T]): Unit = { + val tpe = ensureSerializable(hbt) + property(className[T]) { + Prop.forAll { (t: T) => + val mutation = tpe(t, row, family) + val cells = mutation.getFamilyCellMap.asScala.values + .flatMap(_.asScala) + .toArray + val result = Result.create(cells) + val copy = tpe(result, family) + + Prop.all(eq.eqv(t, copy)) + } + } + } + + implicit val eqByteArray: Eq[Array[Byte]] = + Eq.by(_.toList) + implicit val hbfUri: HBaseField[URI] = + HBaseField.from[String](x => URI.create(x))(_.toString) + implicit val hbfDuration: HBaseField[Duration] = + HBaseField.from[Long](Duration.ofMillis)(_.toMillis) + + test[Numbers] + test[Required] + test[Nullable] + test[Repeated] + test[HBaseNested] + test[Collections] + test[MoreCollections] + test[Enums] + test[UnsafeEnums] + test[Custom] + test[HBaseTypes] + +} + +// Collections are not supported +case class HBaseNested(b: Boolean, i: Int, s: String, r: Required, o: Option[Required]) + +case class HBaseTypes(b: Byte, c: Char, s: Short, ba: Array[Byte], uu: UUID) + +// Collections are not supported +case class DefaultInner(i: Int = 1, o: Option[Int] = Some(1)) +case class DefaultOuter( + i: DefaultInner = DefaultInner(2, Some(2)), + o: Option[DefaultInner] = Some(DefaultInner(2, Some(2))) +)