Skip to content

Commit

Permalink
Merge 6c1d008 into 24a2082
Browse files Browse the repository at this point in the history
  • Loading branch information
mcsherrylabs committed Jan 4, 2021
2 parents 24a2082 + 6c1d008 commit 3e3e125
Show file tree
Hide file tree
Showing 30 changed files with 1,367 additions and 970 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,4 @@ testconn.script
.settings
.classpath
.cache-*
.bsp
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
language: scala

scala:
- 2.12.10
- 2.13.4

script: "sbt clean coverage test"
after_success: "sbt coverageReport coveralls"
42 changes: 29 additions & 13 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,38 +1,54 @@

name := "sss-db"

version := "0.9.43-SNAPSHOT"
version := "0.9.45"

val scala213 = "2.13.2"

scalaVersion := scala213

//Can't if using new 2.13 features.
//crossScalaVersions := Seq("2.12.10", scala213)
scalaVersion := "2.13.4"

updateOptions := updateOptions.value.withGigahorse(false)

parallelExecution in Test := false

//needed to retrieve ancillary, publish happens via global.sbt.
resolvers += ("stepsoft" at "http://nexus.mcsherrylabs.com/repository/releases/").withAllowInsecureProtocol(true)
resolvers += "stepsoft" at "https://nexus.mcsherrylabs.com/repository/releases/"

resolvers += "stepsoft-snapshots" at "https://nexus.mcsherrylabs.com/repository/snapshots/"


publishTo := {
val nexus = "https://nexus.mcsherrylabs.com/"
if (isSnapshot.value)
Some("snapshots" at nexus + "repository/snapshots")
else
Some("releases" at nexus + "repository/releases")
}

credentials += sys.env.get("NEXUS_USER").map(userName => Credentials(
"Sonatype Nexus Repository Manager",
"nexus.mcsherrylabs.com",
userName,
sys.env.getOrElse("NEXUS_PASS", ""))
).getOrElse(
Credentials(Path.userHome / ".ivy2" / ".credentials")
)

resolvers += ("stepsoft-snapshots" at "http://nexus.mcsherrylabs.com/repository/snapshots/").withAllowInsecureProtocol(true)

dependencyOverrides += "org.scala-lang" % "scala-compiler" % scalaVersion.value

//libraryDependencies += "joda-time" % "joda-time" % "2.9.9"

// https://mvnrepository.com/artifact/org.scalatest/scalatest
libraryDependencies += "org.scalatest" %% "scalatest" % "3.2.0-M2" % Test
libraryDependencies += "org.scalatest" %% "scalatest" % "3.2.2" % Test

libraryDependencies += "org.hsqldb" % "hsqldb" % "2.5.1" % Test

libraryDependencies += "com.mcsherrylabs" %% "sss-ancillary" % "1.15-SNAPSHOT"
val excludeJetty = ExclusionRule(organization = "org.eclipse.jetty.aggregate")

libraryDependencies += "com.mcsherrylabs" %% "sss-ancillary" % "1.18" excludeAll(excludeJetty)

libraryDependencies += "org.apache.commons" % "commons-dbcp2" % "2.7.0"
libraryDependencies += "org.apache.commons" % "commons-dbcp2" % "2.8.0"

libraryDependencies += "org.apache.commons" % "commons-pool2" % "2.8.0"
libraryDependencies += "org.apache.commons" % "commons-pool2" % "2.9.0"

libraryDependencies += "com.zaxxer" % "HikariCP" % "3.4.5"

Expand Down
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=1.3.8
sbt.version=1.4.3
1 change: 1 addition & 0 deletions project/project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
addSbtPlugin("io.get-coursier" % "sbt-coursier" % "1.1.0-M11")
65 changes: 41 additions & 24 deletions src/main/scala/sss/db/Db.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,32 @@ package sss.db


import com.typesafe.config.Config
import sss.ancillary.{DynConfig, Logging}
import sss.ancillary.{DynConfig, Logging, LoggingFutureSupport}
import sss.db.TxIsolationLevel.TxIsolationLevel
import sss.db.datasource.DataSource
import sss.db.datasource.DataSource._

import scala.language.dynamics
import scala.concurrent.ExecutionContext
import scala.util.Try



object Db {

def apply(dbConfig: DbConfig)(ds:CloseableDataSource) = {
new Db(dbConfig)(ds)
def apply(dbConfig: DbConfig)(ds:CloseableDataSource,
executionContext: ExecutionContext): Db = {
new Db(dbConfig)(ds, executionContext)
}

def apply(dbConfig: Config)(ds:CloseableDataSource): Db = {
apply(DynConfig[DbConfig](dbConfig))(ds)
def apply(dbConfig: Config)(ds:CloseableDataSource,
executionContext: ExecutionContext): Db = {
apply(DynConfig[DbConfig](dbConfig))(ds, executionContext)
}

def apply(dbConfigName: String = "database", ds:CloseableDataSource = DataSource()): Db = {
apply(DynConfig[DbConfig](dbConfigName))(ds)
def apply(dbConfigName: String = "database",
ds:CloseableDataSource = DataSource(),
executionContext: ExecutionContext = ExecutionContextHelper.ioExecutionContext): Db = {
apply(DynConfig[DbConfig](dbConfigName))(ds, executionContext)
}
}

Expand All @@ -31,15 +39,20 @@ trait DbConfig {
val createSqlOpt: Option[java.lang.Iterable[String]]
}

class Db(dbConfig: DbConfig)(private[db] val ds:CloseableDataSource) extends Logging with Dynamic with Tx {
class Db(dbConfig: DbConfig)(closeableDataSource:CloseableDataSource, ec: ExecutionContext)
extends Logging
with LoggingFutureSupport{


if(dbConfig.useShutdownHook) sys addShutdownHook shutdown

DbInitialSqlExecutor(dbConfig: DbConfig, executeSql _)
DbInitialSqlExecutor(dbConfig: DbConfig, executeSql)(closeableDataSource)

def selectDynamic(tableName: String) = table(tableName)
implicit val runContext: RunContext = new RunContext(closeableDataSource, ec)

def table(name: String): Table = new Table(name, ds, dbConfig.freeBlobsEarly)
def runContext(level: TxIsolationLevel): RunContext = new RunContext(closeableDataSource, ec, Option(level))

def table(name: String): Table = new Table(name, runContext, dbConfig.freeBlobsEarly)

/*
Views - they're great!
Expand All @@ -53,17 +66,17 @@ class Db(dbConfig: DbConfig)(private[db] val ds:CloseableDataSource) extends Log
TL;DR for blockchain type applications views are a good solution.
*/
def view(name: String): View = new View(name, ds, dbConfig.freeBlobsEarly)
def view(name: String): View = new View(name, runContext, dbConfig.freeBlobsEarly)

def dropView(viewName: String): FutureTx[Int] = executeSql(s"DROP VIEW ${viewName}")

def dropView(viewName: String) = executeSql(s"DROP VIEW ${viewName}")
def createView(createViewSql: String): FutureTx[Int] = executeSql(createViewSql)

def createView(createViewSql: String) = executeSql(createViewSql)
def select(sql: String): Query = new Query(sql, runContext, dbConfig.freeBlobsEarly)

def select(sql: String): Query = new Query(sql, ds, dbConfig.freeBlobsEarly)

def shutdown = {
executeSql("SHUTDOWN")
ds.close
def shutdown: FutureTx[Int] = {
executeSql("SHUTDOWN") //.map{case x => Try(closeableDataSource.close()); x}
}

/**
Expand All @@ -73,7 +86,11 @@ class Db(dbConfig: DbConfig)(private[db] val ds:CloseableDataSource) extends Log
* @return
* @note This is a gateway for sql injection attacks, use with extreme caution.
*/
def executeSqls(sqls: Seq[String]): Seq[Int] = inTransaction(sqls.map(executeSql))
def executeSqls(sqls: Seq[String]): FutureTx[Seq[Int]] = {
sqls.foldLeft(FutureTx.unit(Seq.empty[Int])) {
(acc,e) => acc.flatMap(seqInt => executeSql(e).map(i => seqInt :+ i))
}
}

/**
* Execute any sql you give it on a db connection in a transaction
Expand All @@ -82,11 +99,11 @@ class Db(dbConfig: DbConfig)(private[db] val ds:CloseableDataSource) extends Log
* @return
* @note This is a gateway for sql injection attacks, use with extreme caution.
*/
def executeSql(sql: String):Int = inTransaction {
val st = conn.createStatement()
def executeSql(sql: String):FutureTx[Int] = { context =>
val st = context.conn.createStatement()
try {
st.executeUpdate(sql)
} finally st.close
LoggingFuture(st.executeUpdate(sql))(context.ec)
} finally st.close()
}


Expand Down
10 changes: 6 additions & 4 deletions src/main/scala/sss/db/DbInitialSqlExecutor.scala
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
package sss.db

import java.sql.SQLException

import javax.sql.DataSource
import sss.ancillary.Logging

import util.{Failure, Success, Try}
import scala.jdk.CollectionConverters._
import scala.util.{Failure, Success, Try}

object DbInitialSqlExecutor extends Logging {

def apply(dbConfig: DbConfig, executeSql: String => Int): Unit = {
def apply(dbConfig: DbConfig, executeSql: String => FutureTx[Int])(implicit ds: DataSource, executor: FutureTxExecutor = FutureTxExecutor): Unit = {

dbConfig.deleteSqlOpt foreach { deleteSqlAry =>

deleteSqlAry.asScala.filter(_.nonEmpty) foreach { deleteSql =>
Try(executeSql(deleteSql)) match {
Try(executeSql(deleteSql).runSync) match {
case Failure(e: SQLException) => log.warn(s"${deleteSql} failed, maybe object doesn't exist?!", e)
case Failure(e) => throw e
case Success(deleted) => log.info(s"${deleteSql} Deleted count ${deleted}")
Expand All @@ -23,7 +25,7 @@ object DbInitialSqlExecutor extends Logging {

dbConfig.createSqlOpt foreach { createSqlAry =>
createSqlAry.asScala.filter(_.nonEmpty) foreach { createSql =>
Try(executeSql(createSql)) match {
Try(executeSql(createSql).runSync) match {
case Failure(e: SQLException) => log.warn(s"Failed to create ${createSql}")
case Failure(e) => throw e //fail fast
case Success(created) => log.info(s"${createSql} Created count ${created}")
Expand Down
17 changes: 17 additions & 0 deletions src/main/scala/sss/db/ExecutionContextHelper.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package sss.db


import java.util.concurrent.Executors

import scala.concurrent.ExecutionContext

object ExecutionContextHelper {

implicit val synchronousExecutionContext = ExecutionContext.fromExecutor(task => task.run())

implicit val ioExecutionContext = ExecutionContext
.fromExecutorService(
Executors.newCachedThreadPool()
)

}
44 changes: 44 additions & 0 deletions src/main/scala/sss/db/FutureTx.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package sss.db
import java.sql.Connection

import scala.concurrent.{ExecutionContext, Future}
import scala.util.Try

case class TransactionContext(conn: Connection, implicit val ec: ExecutionContext)

trait FutureTx[+T] extends (TransactionContext => Future[T]) {

def flatMap[C](t: T => FutureTx[C]): FutureTx[C] = context => {
import context.ec
val g: Future[TransactionContext => Future[C]] = apply(context) map t
g flatMap (_(context))
}

def map[C](t: T => C): FutureTx[C] = context => {
import context.ec
apply(context) map t
}

@deprecated("You are about to filter a future?")
def withFilter(f: T => Boolean): FutureTx[T] = context => {
import context.ec
apply(context) filter f
}


def andAfter[U](pf: PartialFunction[Try[T], U]): FutureTx[T] = context => {
import context.ec
apply(context).andThen(pf)
}

}

object FutureTx {
def unit[A](a: A): FutureTx[A] = conn => Future.successful(a)

def sequence[T](seqT: Seq[FutureTx[T]]): FutureTx[Seq[T]] = {
seqT.foldLeft[FutureTx[Seq[T]]](FutureTx.unit(Seq.empty[T])) {
(acc,e) => acc.flatMap(sq => e.map (_ +: sq))
}
}
}
67 changes: 67 additions & 0 deletions src/main/scala/sss/db/FutureTxExecutor.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package sss.db

import sss.ancillary.FutureOps.AwaitReady
import sss.db.TxIsolationLevel.TxIsolationLevel

import java.sql.Connection
import javax.sql.DataSource
import scala.concurrent.duration.{Duration, DurationInt}
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success, Try}

trait FutureTxExecutor {

def execute[T](fTx: FutureTx[T],
conn: Connection,
isRollback: Boolean,
ec: ExecutionContext,
isolationLevel: Option[TxIsolationLevel]): Future[T] = {

implicit val ecImplicit = ec

Try {
isolationLevel.foreach(l => conn.setTransactionIsolation(l.id))
fTx(TransactionContext(conn, ec))
} match {
case Failure(e) =>

try conn.rollback()
finally conn.close()

Future.failed(e)

case Success(result) =>
result map { r =>
try {
if (isRollback) conn.rollback()
else conn.commit()
r
} finally conn.close()

} recoverWith { case e =>

try conn.rollback()
finally conn.close()

Future.failed[T](e)
}

}
}

def execute[T](fTx: FutureTx[T], runContext: RunContext, isRollback: Boolean): Future[T] = {
execute(fTx, runContext.ds.getConnection, isRollback, runContext.ec, runContext.isolationLevel)
}


def executeSync[T](fTx: FutureTx[T],
ds: DataSource,
isRollback: Boolean, isolationLevel: Option[TxIsolationLevel]): Try[T] = {
val ec = ExecutionContextHelper.synchronousExecutionContext
execute(fTx, ds.getConnection, isRollback, ec, isolationLevel).toTry(1.second)
}


}

object FutureTxExecutor extends FutureTxExecutor
14 changes: 14 additions & 0 deletions src/main/scala/sss/db/Main.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package sss.db

object Main {

def main(args: Array[String]): Unit = {
val plan = for {
i <- 0 to 10
if i > 100
j <- i to 100
} yield j

println(plan)
}
}

0 comments on commit 3e3e125

Please sign in to comment.