Skip to content

Commit

Permalink
Chain insert after merging in test
Browse files Browse the repository at this point in the history
  • Loading branch information
once-ler committed Jul 26, 2020
1 parent 956bcb7 commit d9f0a87
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 31 deletions.
Expand Up @@ -4,7 +4,7 @@ package infrastructure.cassandra
trait WithCommon {
def camelToUnderscores(name: String) = "[A-Z\\d]".r.replaceAllIn(name.charAt(0).toLower.toString + name.substring(1), {m =>
"_" + m.group(0).toLowerCase()
})
}).replaceAll("__", "_")

def underscoreToCamel(name: String) = "_([a-z\\d])".r.replaceAllIn(name, {m =>
m.group(1).toUpperCase()
Expand Down
86 changes: 56 additions & 30 deletions src/test/scala/TestSimpleCassandraClientSpec.scala
Expand Up @@ -13,6 +13,7 @@ import com.datastax.driver.core.utils.UUIDs
import fs2.{Chunk, Pipe, Stream}
import com.datastax.driver.core.{ResultSet, Row, Session, SimpleStatement, Statement}
import com.weather.scalacass.syntax._
import com.eztier.common.mergeSyntax._
import org.specs2.mutable.Specification
import com.eztier.datasource.infrastructure.cassandra.{CassandraClient, CassandraSession}

Expand Down Expand Up @@ -41,11 +42,62 @@ case class CaResourceProcessed
Current: String = ""
)

class CaResourceManager[F[_]] extends WithCommon {

def fetchNextTasks(db: CassandraClient[F], keyspace: String, environment: String, store: String, dataType: String, purpose: String)(func: CaResourceProcessed => CaResourceProcessed) = {
val ts = LocalDateTime.now().minusDays(2).toInstant(ZoneOffset.UTC).toEpochMilli()
val defaultUuid = UUIDs.endOf(ts)

db.readAsync(
s"""select uid from $keyspace.ca_resource_processed
| where environment = '$environment'
| and store = '$store'
| and type = '$dataType'
| and purpose = '$purpose' limit 1""".stripMargin
) ++ Stream.emit(defaultUuid).covary[F]
.chunkN(2)
//.take(1L)
.flatMap { b =>
val a = b.head.get

val nextStmt =
s"""select * from $keyspace.ca_resource_modified
| where environment = '$environment'
| and store = '$store'
| and type = '$dataType'
| and uid > ${a.toString}

| limit 10""".

stripMargin

// println(nextStmt)

db.readAsync(nextStmt)
}
.map (_.as[CaResourceModified])
.map { a =>
val b = CaResourceProcessed()
(b merge a).copy(Purpose = purpose)
}
.map(func)
//.chunkN(10)
//.flatMap(a => Stream.eval(db.batchInsertAsync(a, keyspace, "ca_resource_processed")))

}

}

object CaResourceManager {
def apply[F[_]]: CaResourceManager[F] = new CaResourceManager[F]()
}

class TestSimpleCassandraClientSpec extends Specification {
val ec = scala.concurrent.ExecutionContext.global
implicit val timer = IO.timer(ec)
implicit val cs = IO.contextShift(ec)

val keyspace = "dwh"
val environment = "development"
val store = "IKEA"
val type_ = "Sales"
Expand Down Expand Up @@ -153,38 +205,12 @@ class TestSimpleCassandraClientSpec extends Specification {
val res = createSimpleCassandraClientResource[IO].use {
case db =>

val ts = LocalDateTime.now().minusDays(2).toInstant(ZoneOffset.UTC).toEpochMilli()
val defaultUuid = UUIDs.endOf(ts)

val u = db.readAsync(
s"""select uid from dwh.ca_resource_processed
| where environment = '${environment}'
| and store = '${store}'
| and type = '${type_}'
| and purpose = '${purpose}' limit 1""".stripMargin
) ++ Stream.emit(defaultUuid).covary[IO]
.take(1L)
.flatMap { a =>
val nextStmt =
s"""select * from dwh.ca_resource_modified
| where environment = '${environment}'
| and store = '${store}'
| and type = '${type_}'
| and uid > ${a.toString}
| limit 10""".stripMargin

println(nextStmt)

db.readAsync(nextStmt)
val u2 = CaResourceManager[IO].fetchNextTasks(db, keyspace, environment, store, type_, purpose) { a =>
println(a)
a
}
.map(_.as[CaResourceModified])
.map { a =>
val b = CaResourceProcessed()

(b merge a).copy(Purpose = purpose)
}

IO.suspend(u.compile.toList)
IO.suspend(u2.compile.toList)
}.unsafeRunSync()

res.isInstanceOf[List[AnyRef]]
Expand Down

0 comments on commit d9f0a87

Please sign in to comment.