Skip to content
This repository has been archived by the owner on Dec 12, 2023. It is now read-only.

Commit

Permalink
List, Set, Option of UDT Support
Browse files Browse the repository at this point in the history
  • Loading branch information
nob13 committed Apr 13, 2017
1 parent 1550d12 commit 83a1068
Show file tree
Hide file tree
Showing 8 changed files with 320 additions and 83 deletions.
20 changes: 15 additions & 5 deletions Readme.md
Expand Up @@ -45,21 +45,31 @@ And let the Adapter automatically generated
adapter.insert(person, session)
val back: Seq[Person] = adapter.loadAllFromCassandra(session)

Supported
---------
* boiler plate free `fromRow`, `insert`, `loadAllFromCassandra`

Supported Types
---------------
* Most fundamental types
* Option (flaky)
* Set of Primitives
* List of Primitives (as Seq)
* Option
* Set of Primitives and of UDT
* List of Primitives (as Seq) and of UDT
* Primitive UDT handling


Not supported / tested yet
--------------------------
* Map
* Combination of Set/Seq with UDT or Option
* Tuples
* Combination of Set/Seq with more than one UDT
* Adding custom types
* Prepared Statements for fast insert

Not planned (yet)
-----------------
* Query Helpers


Testing
Expand Down
78 changes: 37 additions & 41 deletions src/main/scala/net/reactivecore/cca/CassandraCaseClassAdapter.scala
@@ -1,7 +1,7 @@
package net.reactivecore.cca

import com.datastax.driver.core.{ Row, Session, UserType }
import net.reactivecore.cca.utils.{ CassandraReader, CompiledGroup, OrderedWriter }
import net.reactivecore.cca.utils._
import shapeless._

import scala.collection.mutable
Expand Down Expand Up @@ -58,24 +58,52 @@ private class AutoCassandraCaseClassAdapter[T: CompoundCassandraConversionCodec]
val writer = OrderedWriter.makeCollector()
codec.orderedWrite(instance, writer)

val values = treatUdtValues(writer.result().values, session)
val written = writer.result()

session.execute(query, values: _*)
val firstGroup = written.values match {
case IndexedSeq(CompiledGroup("", values, GroupType.Compound)) => values
case _ =>
// should not happen
throw new EncodingException(s"Expected case class to convert exactly into one group")
}

val treated = treatUdtValues(firstGroup, session)
session.execute(query, treated: _*)
}

private def treatUdtValues(values: IndexedSeq[AnyRef], session: Session): IndexedSeq[AnyRef] = {
/**
* Treats the converion of compound elements into UDTValues.
* This is dependent to a running session connection (ast there is no way in creating UDTValues without
* UserType, which is also not possible to create without DB Connection).
*/
private def treatUdtValues(values: IndexedSeq[AnyRef], session: Session, typeHint: Option[UserType] = None): IndexedSeq[AnyRef] = {
// TODO: This is messy and only supports depth of one...
values.map {
case CompiledGroup(columnName, values) =>
val subValuesTreated = treatUdtValues(values, session)
case CompiledGroup(columnName, subValues, GroupType.ListGroup) =>
val userType = userTypeCache.getUserType(columnName, session)
val subValuesTreated = treatUdtValues(subValues, session, Some(userType))

val converted = subValuesTreated.asJava
converted
case CompiledGroup(columnName, subValues, GroupType.SetGroup) =>
val userType = userTypeCache.getUserType(columnName, session)
val subValuesTreated = treatUdtValues(subValues, session, Some(userType))

val converted = subValuesTreated.toSet.asJava
converted
case CompiledGroup(columnName, subValues, GroupType.Compound) =>
val subValuesTreated = treatUdtValues(subValues, session)
val userType = typeHint.getOrElse(userTypeCache.getUserType(columnName, session))
val value = userType.newValue()

val fieldNames = userType.getFieldNames.asScala.toIndexedSeq
if (fieldNames.size != values.size) {
throw new EncodingException(s"Writing UDT in ${columnName} failed for ${tableName}, expected ${fieldNames.size}, found ${values.size}")
if (fieldNames.size != subValues.size) {
throw new EncodingException(s"Writing UDT in ${columnName} failed for ${tableName}, expected ${fieldNames.size}, found ${subValues.size}")
}
// TODO: Converter operation could be cached
fieldNames.zip(subValuesTreated).foreach {
case (fieldName, singleFieldValue) if singleFieldValue == null =>
value.setToNull(fieldName)
case (fieldName, singleFieldValue) =>
value.set(fieldName, singleFieldValue, singleFieldValue.getClass.asInstanceOf[Class[AnyRef]])
}
Expand All @@ -90,39 +118,7 @@ private class AutoCassandraCaseClassAdapter[T: CompoundCassandraConversionCodec]
session.execute(query).all().asScala.map(fromRow)
}

private object userTypeCache {

object lock
var cache: mutable.Map[String, UserType] = mutable.Map.empty

def getUserType(columnName: String, session: Session): UserType = {
val candidate = lock.synchronized(cache.get(columnName))
candidate match {
case Some(v) => v
case None =>
val userType = fetchUserType(columnName, session)
lock.synchronized {
cache.put(columnName, userType)
}
userType
}
}

private def fetchUserType(columnName: String, session: Session): UserType = {
val meta = session.getCluster.getMetadata
val column = (for {
keyspace <- Option(meta.getKeyspace(session.getLoggedKeyspace))
table <- Option(keyspace.getTable(tableName))
column <- Option(table.getColumn(columnName))
} yield column).getOrElse {
throw new EncodingException(s"Could not find type for column ${columnName} in ${tableName}")
}
column.getType match {
case user: UserType => user
case somethingElse => throw new EncodingException(s"Expected UserTYpe for ${columnName} in ${tableName}, found ${somethingElse}")
}
}
}
val userTypeCache = new UserTypeCache(tableName)
}

object CassandraCaseClassAdapter {
Expand Down
52 changes: 37 additions & 15 deletions src/main/scala/net/reactivecore/cca/CassandraConversionCodec.scala
@@ -1,6 +1,6 @@
package net.reactivecore.cca

import net.reactivecore.cca.utils.{ CassandraReader, OrderedWriter }
import net.reactivecore.cca.utils.{ CassandraReader, GroupType, OrderedWriter }
import shapeless._

import scala.annotation.implicitNotFound
Expand Down Expand Up @@ -54,18 +54,14 @@ case class CompoundCassandraConversionCodec[T](
}

def orderedWrite(instance: T, writer: OrderedWriter): Unit = {
val group = writer.startGroup(GroupType.Compound)
val deconstructed = deconstructor(instance)
deconstructed.zip(fields).foreach {
case (subFieldValue, (columnName, subCodec)) =>
subCodec match {
case c: CompoundCassandraConversionCodec[_] =>
val subWriter = writer.startGroup(columnName)
c.forceOrderedWrite(subFieldValue, subWriter)
writer.endGroup(subWriter)
case other =>
other.forceOrderedWrite(subFieldValue, writer)
}
group.setColumnName(columnName)
subCodec.forceOrderedWrite(subFieldValue, group)
}
writer.endGroup(group)
}
}

Expand All @@ -86,6 +82,21 @@ case class SetCodec[SubType, CassandraType](subCodec: PrimitiveCassandraConversi
}
}

case class SetUdtCodec[SubType, CassandraType](subCodec: CompoundCassandraConversionCodec[SubType]) extends IterableCodec[SubType, Set[SubType]] {

override def decodeFrom(reader: CassandraReader): Set[SubType] = {
reader.getUdtSet.map(subCodec.decodeFrom).toSet
}

override def orderedWrite(instance: Set[SubType], writer: OrderedWriter): Unit = {
val group = writer.startGroup(GroupType.SetGroup)
instance.foreach { v =>
subCodec.orderedWrite(v, group)
}
writer.endGroup(group)
}
}

case class SeqCodec[SubType, CassandraType](subCodec: PrimitiveCassandraConversionCodec[SubType, CassandraType]) extends IterableCodec[SubType, Seq[SubType]] {

override def decodeFrom(reader: CassandraReader): Seq[SubType] = {
Expand All @@ -98,7 +109,22 @@ case class SeqCodec[SubType, CassandraType](subCodec: PrimitiveCassandraConversi
}
}

case class OptionalCodec[SubType, CassandraType](subCodec: PrimitiveCassandraConversionCodec[SubType, CassandraType]) extends CassandraConversionCodec[Option[SubType]] {
case class SeqUdtCodec[SubType, CassandraType](subCodec: CompoundCassandraConversionCodec[SubType]) extends IterableCodec[SubType, Seq[SubType]] {

override def decodeFrom(reader: CassandraReader): Seq[SubType] = {
reader.getUdtList.map(subCodec.decodeFrom).toSeq
}

override def orderedWrite(instance: Seq[SubType], writer: OrderedWriter): Unit = {
val group = writer.startGroup(GroupType.ListGroup)
instance.foreach { v =>
subCodec.orderedWrite(v, group)
}
writer.endGroup(group)
}
}

case class OptionalCodec[SubType](subCodec: CassandraConversionCodec[SubType]) extends CassandraConversionCodec[Option[SubType]] {
override def decodeFrom(reader: CassandraReader): Option[SubType] = {
if (reader.isNull) {
None
Expand Down Expand Up @@ -138,9 +164,7 @@ case class PrimitiveCassandraConversionCodec[T, CassandraType: ClassTag](

private[cca] def forceScalaToCassandra(o: Any): CassandraType = scalaToCassandra(o.asInstanceOf[T])

override def decodeFrom(reader: CassandraReader): T = cassandraToScala(reader.get[CassandraType].getOrElse {
throw new DecodingException(s"Expected non null value while reading ${reader.position}")
})
override def decodeFrom(reader: CassandraReader): T = cassandraToScala(reader.get[CassandraType])

override def orderedWrite(instance: T, writer: OrderedWriter): Unit = {
writer.write(scalaToCassandra(instance))
Expand All @@ -163,8 +187,6 @@ object PrimitiveCassandraConversionCodec {

object CassandraConversionCodec extends LabelledProductTypeClassCompanion[CassandraConversionCodec] with DefaultCodecs {

def generate[T <: Product](implicit ct: Lazy[CassandraConversionCodec[T]]): CompoundCassandraConversionCodec[T] = apply[T].asInstanceOf[CompoundCassandraConversionCodec[T]]

object typeClass extends LabelledProductTypeClass[CassandraConversionCodec] {

override def product[H, T <: HList](name: String, headCodec: CassandraConversionCodec[H], tailCodec: CassandraConversionCodec[T]): CassandraConversionCodec[::[H, T]] = {
Expand Down
21 changes: 16 additions & 5 deletions src/main/scala/net/reactivecore/cca/DefaultCodecs.scala
Expand Up @@ -5,9 +5,7 @@ import java.net.InetAddress
import java.sql.Timestamp
import java.util.{ Date, UUID }

import shapeless.the

import scala.reflect.ClassTag
import shapeless.{ LabelledGeneric, LabelledProductTypeClass, the }

trait DefaultCodecs {

Expand Down Expand Up @@ -53,6 +51,19 @@ trait DefaultCodecs {
implicit def primitiveSeqCodec[T, CassandraType](implicit primitiveCodec: PrimitiveCassandraConversionCodec[T, CassandraType]): CassandraConversionCodec[Seq[T]] =
SeqCodec(primitiveCodec)

implicit def primitiveOptionalCodec[T, CassandraType](implicit primitiveCodec: PrimitiveCassandraConversionCodec[T, CassandraType]): CassandraConversionCodec[Option[T]] =
OptionalCodec(primitiveCodec)
implicit def udtSeqCodec[T](implicit compoundCassandraConversionCodec: CompoundCassandraConversionCodec[T]): CassandraConversionCodec[Seq[T]] =
SeqUdtCodec(compoundCassandraConversionCodec)

implicit def udtSeqCodec[T <: Product](implicit codec: CassandraConversionCodec[T]): CassandraConversionCodec[Seq[T]] = {
val upcasted = codec.asInstanceOf[CompoundCassandraConversionCodec[T]]
SeqUdtCodec(upcasted)
}

implicit def udtSetCodec[T <: Product](implicit codec: CassandraConversionCodec[T]): CassandraConversionCodec[Set[T]] = {
val upcasted = codec.asInstanceOf[CompoundCassandraConversionCodec[T]]
SetUdtCodec(upcasted)
}

implicit def optionalCodec[T, CassandraType](implicit subCodec: CassandraConversionCodec[T]): CassandraConversionCodec[Option[T]] =
OptionalCodec(subCodec)
}

0 comments on commit 83a1068

Please sign in to comment.