Skip to content

Commit

Permalink
Merge pull request #17 from mmolimar/develop
Browse files Browse the repository at this point in the history
Release 1.1 with KSQL 5.3.0
  • Loading branch information
mmolimar committed Aug 5, 2019
2 parents 644ab5d + 5cd325f commit a28d17e
Show file tree
Hide file tree
Showing 19 changed files with 266 additions and 247 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ or

### Connection URL

The URL has the form ``jdbc:ksql://<ksql-engine>:<port>[?<property1>=<value>,<property2>=<value>...]``
The URL has the form ``jdbc:ksql://<ksql-engine>:<port>[?<property1>=<value>&<property2>=<value>...]``

where:

Expand Down
10 changes: 5 additions & 5 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name := "ksql-jdbc-driver"

version := "1.0"
version := "1.1"

initialize := {
assert(Integer.parseInt(sys.props("java.specification.version").split("\\.")(1)) >= 8, "Java 8 or above required")
Expand All @@ -12,11 +12,11 @@ resolvers += "Confluent Maven Repo" at "http://packages.confluent.io/maven/"
resolvers += "Confluent Snapshots Maven Repo" at "https://s3-us-west-2.amazonaws.com/confluent-snapshots/"
resolvers += Resolver.mavenLocal

libraryDependencies += "io.confluent.ksql" % "ksql-rest-app" % "5.1.2"
libraryDependencies += "org.apache.kafka" %% "kafka" % "2.1.1" % "test"
libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.5" % "test"
libraryDependencies += "io.confluent.ksql" % "ksql-rest-app" % "5.3.0"
libraryDependencies += "org.apache.kafka" %% "kafka" % "2.3.0" % "test"
libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.8" % "test"
libraryDependencies += "org.scalamock" %% "scalamock-scalatest-support" % "3.6.0" % "test"
libraryDependencies += "javax.ws.rs" % "javax.ws.rs-api" % "2.1.1" artifacts (Artifact("javax.ws.rs-api", "jar", "jar"))
libraryDependencies += "javax.ws.rs" % "javax.ws.rs-api" % "2.1.1" artifacts Artifact("javax.ws.rs-api", "jar", "jar")

assemblyMergeStrategy in assembly := {
case PathList("javax", "inject", xs@_*) => MergeStrategy.first
Expand Down
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version = 0.13.8
sbt.version = 0.13.9
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import java.util.concurrent.atomic.AtomicBoolean
import com.github.mmolimar.ksql.jdbc.KsqlEntityHeaders._
import com.github.mmolimar.ksql.jdbc.embedded.{EmbeddedKafkaCluster, EmbeddedKsqlEngine, EmbeddedZookeeperServer}
import com.github.mmolimar.ksql.jdbc.utils.TestUtils
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.scalatest._

class KsqlDriverIntegrationTest extends WordSpec with Matchers with BeforeAndAfterAll {
Expand All @@ -16,14 +16,14 @@ class KsqlDriverIntegrationTest extends WordSpec with Matchers with BeforeAndAft
val kafkaCluster = new EmbeddedKafkaCluster(zkServer.getConnection)
val ksqlEngine = new EmbeddedKsqlEngine(kafkaCluster.getBrokerList)

lazy val kafkaProducer = TestUtils.buildProducer(kafkaCluster.getBrokerList)
lazy val kafkaProducer: KafkaProducer[Array[Byte], Array[Byte]] = TestUtils.buildProducer(kafkaCluster.getBrokerList)

val ksqlUrl = s"jdbc:ksql://localhost:${ksqlEngine.getPort}?timeout=20000"
var ksqlConnection: Connection = _
val topic = TestUtils.randomString()
val topic: String = TestUtils.randomString()

val stop = new AtomicBoolean(false)
val producerThread = new BackgroundOps(stop, () => produceMessages)
val producerThread = new BackgroundOps(stop, () => produceMessages())

"A KsqlConnection" when {

Expand All @@ -35,24 +35,24 @@ class KsqlDriverIntegrationTest extends WordSpec with Matchers with BeforeAndAft
"create the table properly" in {
val resultSet = createTestTableOrStream(table)
resultSet.next should be(true)
resultSet.getString(commandStatusEntity(0).name) should be("TABLE")
resultSet.getString(commandStatusEntity.head.name) should be("TABLE")
resultSet.getString(commandStatusEntity(1).name) should be(table.toUpperCase)
resultSet.getString(commandStatusEntity(2).name) should be("CREATE")
resultSet.getString(commandStatusEntity(3).name) should be("SUCCESS")
resultSet.getString(commandStatusEntity(4).name) should be("Table created")
resultSet.next should be(false)
resultSet.close
resultSet.close()
}

"list the table already created" in {
val resultSet = ksqlConnection.createStatement.executeQuery(s"SHOW TABLES")
resultSet.next should be(true)
resultSet.getString(tablesListEntity(0).name) should be(table.toUpperCase)
resultSet.getString(tablesListEntity.head.name) should be(table.toUpperCase)
resultSet.getString(tablesListEntity(1).name) should be(topic)
resultSet.getString(tablesListEntity(2).name) should be("JSON")
resultSet.getBoolean(tablesListEntity(3).name) should be(false)
resultSet.next should be(false)
resultSet.close
resultSet.close()
}

"be able to get the execution plan for a query in a table" in {
Expand All @@ -61,17 +61,19 @@ class KsqlDriverIntegrationTest extends WordSpec with Matchers with BeforeAndAft
resultSet.getString(queryDescriptionEntity(1).name) should be("ROWTIME, ROWKEY, FIELD1, FIELD2, FIELD3")
resultSet.getString(queryDescriptionEntity(2).name) should be(table.toUpperCase)
resultSet.next should be(false)
resultSet.close
resultSet.close()
}

"be able to query all fields in the table" in {
var counter = 0
val statement = ksqlConnection.createStatement
statement.setMaxRows(maxRecords)
statement.getMoreResults(1) should be(false)
val resultSet = statement.executeQuery(s"SELECT * FROM $table")
statement.getMoreResults(1) should be(true)
while (resultSet.next) {
resultSet.getLong(1) should not be (-1)
Option(resultSet.getString(2)) should not be (None)
Option(resultSet.getString(2)) should not be None
resultSet.getInt(3) should be(123)
resultSet.getDouble(4) should be(45.4)
resultSet.getString(5) should be("lorem ipsum")
Expand All @@ -81,9 +83,10 @@ class KsqlDriverIntegrationTest extends WordSpec with Matchers with BeforeAndAft
counter += 1
}
counter should be(maxRecords)
statement.getMoreResults() should be(false)

resultSet.close
statement.close
resultSet.close()
statement.close()

val metadata = resultSet.getMetaData
metadata.getColumnCount should be(5)
Expand Down Expand Up @@ -168,19 +171,19 @@ class KsqlDriverIntegrationTest extends WordSpec with Matchers with BeforeAndAft
resultSet.getString(sourceDescriptionEntity(3).name) should be("TABLE")
resultSet.getString(sourceDescriptionEntity(4).name) should be("JSON")
resultSet.next should be(false)
resultSet.close
resultSet.close()
}

"drop the table" in {
val resultSet = ksqlConnection.createStatement.executeQuery(s"DROP TABLE $table")
resultSet.next should be(true)
resultSet.getString(commandStatusEntity(0).name) should be("TABLE")
resultSet.getString(commandStatusEntity.head.name) should be("TABLE")
resultSet.getString(commandStatusEntity(1).name) should be(table.toUpperCase)
resultSet.getString(commandStatusEntity(2).name) should be("DROP")
resultSet.getString(commandStatusEntity(3).name) should be("SUCCESS")
resultSet.getString(commandStatusEntity(4).name) should be(s"Source ${table.toUpperCase} was dropped. ")
resultSet.getString(commandStatusEntity(4).name) should be(s"Source ${table.toUpperCase} (topic: $topic) was dropped.")
resultSet.next should be(false)
resultSet.close
resultSet.close()
}
}

Expand All @@ -190,25 +193,25 @@ class KsqlDriverIntegrationTest extends WordSpec with Matchers with BeforeAndAft
val stream = TestUtils.randomString()

"create the stream properly" in {
val resultSet = createTestTableOrStream(stream, true)
val resultSet = createTestTableOrStream(str = stream, isStream = true)
resultSet.next should be(true)
resultSet.getString(commandStatusEntity(0).name) should be("STREAM")
resultSet.getString(commandStatusEntity.head.name) should be("STREAM")
resultSet.getString(commandStatusEntity(1).name) should be(stream.toUpperCase)
resultSet.getString(commandStatusEntity(2).name) should be("CREATE")
resultSet.getString(commandStatusEntity(3).name) should be("SUCCESS")
resultSet.getString(commandStatusEntity(4).name) should be("Stream created")
resultSet.next should be(false)
resultSet.close
resultSet.close()
}

"list the stream already created" in {
val resultSet = ksqlConnection.createStatement.executeQuery(s"SHOW STREAMS")
resultSet.next should be(true)
resultSet.getString(streamsListEntity(0).name) should be(stream.toUpperCase)
resultSet.getString(streamsListEntity.head.name) should be(stream.toUpperCase)
resultSet.getString(streamsListEntity(1).name) should be(topic)
resultSet.getString(streamsListEntity(2).name) should be("JSON")
resultSet.next should be(false)
resultSet.close
resultSet.close()
}

"be able to get the execution plan for a query in a stream" in {
Expand All @@ -217,17 +220,19 @@ class KsqlDriverIntegrationTest extends WordSpec with Matchers with BeforeAndAft
resultSet.getString(queryDescriptionEntity(1).name) should be("ROWTIME, ROWKEY, FIELD1, FIELD2, FIELD3")
resultSet.getString(queryDescriptionEntity(2).name) should be(stream.toUpperCase)
resultSet.next should be(false)
resultSet.close
resultSet.close()
}

"be able to query all fields in the stream" in {
var counter = 0
val statement = ksqlConnection.createStatement
statement.setMaxRows(maxRecords)
statement.getMoreResults(1) should be(false)
val resultSet = statement.executeQuery(s"SELECT * FROM $stream")
statement.getMoreResults(1) should be(true)
while (resultSet.next) {
resultSet.getLong(1) should not be (-1)
Option(resultSet.getString(2)) should not be (None)
Option(resultSet.getString(2)) should not be None
resultSet.getInt(3) should be(123)
resultSet.getDouble(4) should be(45.4)
resultSet.getString(5) should be("lorem ipsum")
Expand All @@ -237,9 +242,10 @@ class KsqlDriverIntegrationTest extends WordSpec with Matchers with BeforeAndAft
counter += 1
}
counter should be(maxRecords)
statement.getMoreResults(1) should be(false)

resultSet.close
statement.close
resultSet.close()
statement.close()

val metadata = resultSet.getMetaData
metadata.getColumnCount should be(5)
Expand Down Expand Up @@ -323,19 +329,19 @@ class KsqlDriverIntegrationTest extends WordSpec with Matchers with BeforeAndAft
resultSet.getString(sourceDescriptionEntity(3).name) should be("STREAM")
resultSet.getString(sourceDescriptionEntity(4).name) should be("JSON")
resultSet.next should be(false)
resultSet.close
resultSet.close()
}

"drop the stream" in {
val resultSet = ksqlConnection.createStatement.executeQuery(s"DROP STREAM $stream")
resultSet.next should be(true)
resultSet.getString(commandStatusEntity(0).name) should be("STREAM")
resultSet.getString(commandStatusEntity.head.name) should be("STREAM")
resultSet.getString(commandStatusEntity(1).name) should be(stream.toUpperCase)
resultSet.getString(commandStatusEntity(2).name) should be("DROP")
resultSet.getString(commandStatusEntity(3).name) should be("SUCCESS")
resultSet.getString(commandStatusEntity(4).name) should be(s"Source ${stream.toUpperCase} was dropped. ")
resultSet.getString(commandStatusEntity(4).name) should be(s"Source ${stream.toUpperCase} (topic: $topic) was dropped.")
resultSet.next should be(false)
resultSet.close
resultSet.close()
}
}

Expand All @@ -344,19 +350,22 @@ class KsqlDriverIntegrationTest extends WordSpec with Matchers with BeforeAndAft
"show the content of that topic" in {
val statement = ksqlConnection.createStatement
statement.setMaxRows(3)
statement.getMoreResults(1) should be(false)
val resultSet = statement.executeQuery(s"PRINT '$topic'")
statement.getMoreResults(1) should be(true)
resultSet.next should be(true)
resultSet.getString(printTopic(0).name) should be("Format:STRING")
resultSet.getString(printTopic.head.name) should be("Format:STRING")
resultSet.next should be(true)
resultSet.next should be(true)
resultSet.next should be(false)
resultSet.close
statement.close
statement.getMoreResults() should be(false)
resultSet.close()
statement.close()
}
}
}

private def produceMessages: Unit = {
private def produceMessages(): Unit = {
val key = TestUtils.randomString().getBytes
val value =
"""
Expand All @@ -378,53 +387,53 @@ class KsqlDriverIntegrationTest extends WordSpec with Matchers with BeforeAndAft
s"WITH (KAFKA_TOPIC='$topic', VALUE_FORMAT='JSON', KEY='FIELD1');")
}

override def beforeAll = {
DriverManager.registerDriver(new KsqlDriver);
override def beforeAll(): Unit = {
DriverManager.registerDriver(new KsqlDriver)

zkServer.startup
zkServer.startup()
TestUtils.waitTillAvailable("localhost", zkServer.getPort, 5000)

kafkaCluster.startup
kafkaCluster.startup()
kafkaCluster.getPorts.foreach { port =>
TestUtils.waitTillAvailable("localhost", port, 5000)
}

kafkaCluster.createTopic(topic)
kafkaCluster.existTopic(topic) should be(true)
producerThread.start
producerThread.start()

ksqlEngine.startup
ksqlEngine.startup()
TestUtils.waitTillAvailable("localhost", ksqlEngine.getPort, 5000)

ksqlConnection = DriverManager.getConnection(ksqlUrl)

}

override def afterAll = {
override def afterAll(): Unit = {
info(s"Produced ${producerThread.getNumExecs} messages")
stop.set(true)
TestUtils.swallow(producerThread.interrupt)
TestUtils.swallow(producerThread.interrupt())

TestUtils.swallow(ksqlConnection.close)
ksqlEngine.shutdown
TestUtils.swallow(kafkaProducer.close)
TestUtils.swallow(ksqlConnection.close())
ksqlEngine.shutdown()
TestUtils.swallow(kafkaProducer.close())

kafkaCluster.shutdown
zkServer.shutdown
kafkaCluster.shutdown()
zkServer.shutdown()
}

}

class BackgroundOps(stop: AtomicBoolean, exec: () => Unit) extends Thread {
private var count = 0L

override def run = {
override def run(): Unit = {
while (!stop.get) {
exec()
this.count += 1
}
}

def getNumExecs = this.count
def getNumExecs: Long = this.count
}

Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,21 @@ import kafka.server.{KafkaConfig, KafkaServer}
import kafka.utils.Logging
import kafka.zk.AdminZkClient

import scala.collection.Seq

class EmbeddedKafkaCluster(zkConnection: String,
ports: Seq[Int] = Seq(TestUtils.getAvailablePort),
baseProps: Properties = new Properties) extends Logging {

private val actualPorts: Seq[Int] = ports.map(resolvePort(_))
private val actualPorts: Seq[Int] = ports.map(resolvePort)

private var brokers: Seq[KafkaServer] = Seq.empty
private var logDirs: Seq[File] = Seq.empty

private lazy val zkClient = TestUtils.buildZkClient(zkConnection)
private lazy val adminZkClient = new AdminZkClient(zkClient)

def startup = {
def startup(): Unit = {
info("Starting up embedded Kafka brokers")

for ((port, i) <- actualPorts.zipWithIndex) {
Expand Down Expand Up @@ -50,7 +52,7 @@ class EmbeddedKafkaCluster(zkConnection: String,
info(s"Started embedded Kafka brokers: $getBrokerList")
}

def shutdown = {
def shutdown(): Unit = {
brokers.foreach(broker => TestUtils.swallow(broker.shutdown))
logDirs.foreach(logDir => TestUtils.swallow(TestUtils.deleteFile(logDir)))
}
Expand All @@ -59,7 +61,7 @@ class EmbeddedKafkaCluster(zkConnection: String,

def getBrokerList: String = actualPorts.map("localhost:" + _).mkString(",")

def createTopic(topic: String, numPartitions: Int = 1, replicationFactor: Int = 1) = {
def createTopic(topic: String, numPartitions: Int = 1, replicationFactor: Int = 1): Unit = {
info(s"Creating topic $topic")
adminZkClient.createTopic(topic, numPartitions, replicationFactor)
}
Expand All @@ -69,11 +71,11 @@ class EmbeddedKafkaCluster(zkConnection: String,
adminZkClient.deleteTopic(topic)
}

def deleteTopics(topics: Seq[String]) = topics.foreach(deleteTopic(_))
def deleteTopics(topics: Seq[String]): Unit = topics.foreach(deleteTopic)

def existTopic(topic: String): Boolean = zkClient.topicExists(topic)

def listTopics = zkClient.getAllTopicsInCluster
def listTopics: Seq[String] = zkClient.getAllTopicsInCluster

private def resolvePort(port: Int) = if (port <= 0) TestUtils.getAvailablePort else port

Expand Down

0 comments on commit a28d17e

Please sign in to comment.