Skip to content

Commit

Permalink
Automatically convert table and field names to camel and underscores
Browse files Browse the repository at this point in the history
  • Loading branch information
once-ler committed Jul 25, 2020
1 parent ad63974 commit c01ac61
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 64 deletions.
Expand Up @@ -36,35 +36,35 @@ class CassandraClient[F[_] : Async : Sync: Concurrent : Functor](session: Resour
}
}

def insertManyAsync[A <: AnyRef](records: Chunk[A], keySpace: String = "", tableName: String = ""): F[Vector[ResultSet]] = {
session.use { s =>
// Threaded insert.
def insertManyAsync[A <: AnyRef](records: Chunk[A], keySpace: String = "", tableName: String = ""): F[Vector[ResultSet]] =
session.use { s =>

blockingThreadPool.use { ec: ExecutionContext =>
implicit val cs = ec
blockingThreadPool.use { ec: ExecutionContext =>
implicit val cs = ec

Async[F].async {
(cb: Either[Throwable, Vector[ResultSet]] => Unit) =>
Async[F].async {
(cb: Either[Throwable, Vector[ResultSet]] => Unit) =>

val statements = buildInsertStatements(records, keySpace, tableName)
val statements = buildInsertStatements(records, keySpace, tableName)

val l = statements.map { stmt =>
val f: Future[ResultSet] = s.executeAsync(stmt)
f
}
val l = statements.map { stmt =>
val f: Future[ResultSet] = s.executeAsync(stmt)
f
}

val f = Future.sequence(l)
val f = Future.sequence(l)

f.onComplete {
case Success(s) => cb(Right(s))
case Failure(e) => cb(Left(e))
}
}
f.onComplete {
case Success(s) => cb(Right(s))
case Failure(e) => cb(Left(e))
}
}

}
}
}

}

// Single BatchStatement.
def batchInsertAsync[A <: AnyRef](records: Chunk[A], keySpace: String = "", tableName: String = ""): F[ResultSet] = {
val batchStatement = buildInsertStatements(records, keySpace, tableName)
.asBatchStatement
Expand All @@ -73,7 +73,7 @@ class CassandraClient[F[_] : Async : Sync: Concurrent : Functor](session: Resour

}

def createAsync[A: TypeTag](keySpace: String, tableName: Option[String] = None)(partitionKeys: String*)(clusteringKeys: String*)(orderBy: (String, Option[Int])*): F[ResultSet] = {
def createAsync[A: TypeTag](keySpace: String)(partitionKeys: String*)(clusteringKeys: String*)(orderBy: (String, Option[Int])*): F[ResultSet] = {
val simpleStatement = getCreateStmt(keySpace)(partitionKeys:_*)(clusteringKeys:_*)(orderBy:_*)

execAsync(simpleStatement)
Expand Down
@@ -0,0 +1,14 @@
package com.eztier.datasource
package infrastructure.cassandra

trait WithCommon {
def camelToUnderscores(name: String) = "[A-Z\\d]".r.replaceAllIn(name.charAt(0).toLower.toString + name.substring(1), {m =>
"_" + m.group(0).toLowerCase()
})

def underscoreToCamel(name: String) = "_([a-z\\d])".r.replaceAllIn(name, {m =>
m.group(1).toUpperCase()
})

def camelToSpaces(name: String) = "[A-Z\\d]".r.replaceAllIn(name, (m => " " + m.group(0)))
}
Expand Up @@ -9,17 +9,7 @@ import com.datastax.driver.core.utils.UUIDs
import scala.Option
import scala.reflect.runtime.universe._

trait WithCreateStatementBuilder {

def camelToUnderscores(name: String) = "[A-Z\\d]".r.replaceAllIn(name.charAt(0).toLower.toString + name.substring(1), {m =>
"_" + m.group(0).toLowerCase()
})

def underscoreToCamel(name: String) = "_([a-z\\d])".r.replaceAllIn(name, {m =>
m.group(1).toUpperCase()
})

def camelToSpaces(name: String) = "[A-Z\\d]".r.replaceAllIn(name, (m => " " + m.group(0)))
trait WithCreateStatementBuilder extends WithCommon {

private def classAccessors[T: TypeTag]: List[MethodSymbol] = typeOf[T].members.collect {
case m: MethodSymbol if m.isCaseAccessor => m
Expand Down Expand Up @@ -76,15 +66,6 @@ trait WithCreateStatementBuilder {
}

private def getCreateStmtString[T](keySpace: String, tableName: String)(implicit typeTag: TypeTag[T]): Seq[String] = {
/*
val o = typeTag.tpe.resultType
val t = tableName match {
case Some(a) => a
case _ => o.typeSymbol.name.toString
}
*/

val m = toCaType[T]

val f = (Array[String]() /: m) {
Expand Down Expand Up @@ -126,19 +107,6 @@ trait WithCreateStatementBuilder {

val sb = s" with clustering order by (${ob})"

/*
val sb = orderBy match {
case Some(a) if a.length > 0 && f.find(_ == a) != None =>
val sort = direction match {
case Some(b) => if (b > 0) "asc" else "desc"
case None => "asc"
}
s" with clustering order by (${a} ${sort})"
case None => ""
}
*/

val t0 = t(0)
val trim = t0.substring(0, t0.length - 2)

Expand Down
Expand Up @@ -7,7 +7,7 @@ import fs2.Chunk

import scala.collection.JavaConverters._

trait WithInsertStatementBuilder {
trait WithInsertStatementBuilder extends WithCommon {

private def zipKV(
in: AnyRef,
Expand All @@ -17,7 +17,7 @@ trait WithInsertStatementBuilder {
((Array[String](), Array[AnyRef]()) /: in.getClass.getDeclaredFields.filter(filterFunc)) {
(a, f) =>
f.setAccessible(true)
val k = a._1 :+ formatFunc(f.getName).asInstanceOf[String]
val k = a._1 :+ formatFunc(camelToUnderscores(f.getName)).asInstanceOf[String]

val v = a._2 :+
(formatFunc(f.get(in)) match {
Expand All @@ -29,7 +29,12 @@ trait WithInsertStatementBuilder {
case a: Seq[_] => a.asJava
case _ => o
}
case _ => null
case None => null
case a: Map[_, _] => a.asJava
case a: List[_] => a.asJava
case a: Vector[_] => a.asJava
case a: Seq[_] => a.asJava
case _ => formatFunc(f.get(in))
}).asInstanceOf[AnyRef]

(k, v)
Expand Down
81 changes: 74 additions & 7 deletions src/test/scala/TestSimpleCassandraClientSpec.scala
Expand Up @@ -2,12 +2,14 @@ package com.eztier.datasource
package infrastructure.cassandra
package test

import java.time.Instant
import java.util.{Date, UUID}

import cats.data._
import cats.implicits._
import cats.effect.{IO, Sync}
import fs2.{Pipe, Stream}
import com.datastax.driver.core.utils.UUIDs
import fs2.{Chunk, Pipe, Stream}
import com.datastax.driver.core.{ResultSet, Row, Session, SimpleStatement, Statement}
import org.specs2.mutable.Specification
import com.eztier.datasource.infrastructure.cassandra.{CassandraClient, CassandraSession}
Expand All @@ -24,25 +26,77 @@ case class CaResourceModified
Current: String
)

case class CaResourceProcessed
(
Environment: String,
Store: String,
Type: String,
Purpose: String,
StartTime: Date,
Id: String,
Oid: String,
Uid: UUID,
Current: String
)

class TestSimpleCassandraClientSpec extends Specification {
val ec = scala.concurrent.ExecutionContext.global
implicit val timer = IO.timer(ec)
implicit val cs = IO.contextShift(ec)

"Simple Cassandra Client" should {

"Create usable client" in {
"Create a table" in {

val res = createSimpleCassandraClientResource[IO].use {
case db =>

val u = db.createAsync[CaResourceModified]("dwh")("Environment", "Store", "Type")("StartTime", "Id")(("StartTime" -> None), ("Id" -> None))

val r = u.unsafeRunSync()

IO.pure(r)
}.unsafeRunSync()

res.isInstanceOf[ResultSet]
}

"Create another table" in {

val res = createSimpleCassandraClientResource[IO].use {
case db =>

val cqlEndpoints = "127.0.0.1"
val cqlPort: Int = 9042
val user = "cassandra"
val pass = "cassandra"
val u = db.createAsync[CaResourceProcessed]("dwh")("Environment", "Store", "Type", "Purpose")("StartTime", "Id")(("StartTime" -> None), ("Id" -> None))

val r = u.unsafeRunSync()

IO.pure(r)
}.unsafeRunSync()

res.isInstanceOf[ResultSet]
}

"Insert a row" in {
val res = createSimpleCassandraClientResource[IO].use {
case db =>

val u = db.createAsync[CaResourceModified]("dwh", "ca_resource_modified".some)("Environment", "Store", "Type")("StartTime", "Id")(("StartTime" -> None), ("Id" -> None))
val a = CaResourceModified(
Environment = "development",
Store = "IKEA",
Type = "Sales",
StartTime = Date.from(Instant.now),
Id = "ABC123",
Oid = "5dd81233ae6d16d797be3915e52ac94b",
Uid = UUIDs.timeBased(),
Current = <root><inserted><row>
<oid>5dd81233ae6d16d797be3915e52ac94b</oid>
<type>Sales</type>
<ID>ABC123</ID>
<dateModified>1595638722248</dateModified>
</row></inserted></root>.toString()
)

val u = db.batchInsertAsync(Chunk.vector(Vector(a)), "dwh", "ca_resource_modified")

val r = u.unsafeRunSync()

Expand All @@ -52,6 +106,19 @@ class TestSimpleCassandraClientSpec extends Specification {
res.isInstanceOf[ResultSet]
}

"Read a table" in {
val res = createSimpleCassandraClientResource[IO].use {
case db =>
val u = db.readAsync("select * from dwh.ca_resource_processed limit 10")

val r = u.compile.toList.unsafeRunSync()

IO.pure(r)
}.unsafeRunSync()

res.isInstanceOf[List[Row]]
}

}

}

0 comments on commit c01ac61

Please sign in to comment.