Skip to content

Commit

Permalink
Switched over SQL parsing form ZQL to JSQLParser.
Browse files Browse the repository at this point in the history
  • Loading branch information
mardambey committed Sep 3, 2011
1 parent dea5f69 commit f1b0722
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 16 deletions.
Binary file added lib/jsqlparser-0.7.0.jar
Binary file not shown.
Binary file removed lib/zql-20110618.jar
Binary file not shown.
16 changes: 8 additions & 8 deletions src/main/scala/guzzler/Guzzler.scala
Expand Up @@ -21,14 +21,15 @@ package guzzler

import scala.sys.process.Process
import scala.sys.process.ProcessIO
import org.gibello.zql._
import net.lag.logging.Logger
import ssh.{SshdMessage, SshdSubscribe, Sshd}
import java.io.{ByteArrayInputStream, InputStreamReader, BufferedReader}
import akka.actor.Actor
import akka.actor.Actor._
import akka.dispatch.Future
import akka.util.duration._
import net.sf.jsqlparser.parser.CCJSqlParserManager
import java.io.{StringReader, ByteArrayInputStream, InputStreamReader, BufferedReader}
import net.sf.jsqlparser.statement.Statement

/**
* Guzzler - streams binary logs from a remote MySQL
Expand Down Expand Up @@ -113,25 +114,24 @@ class GuzzlerSshdSubscriber extends Actor {
object Util {

val logger = Logger.get
val parser = new ZqlParser()
val parser = new CCJSqlParserManager()

def processSql(sql:String) {
// FIXME: this is an ugly hack
val scrubbedSql = sql.replaceAll("""\\'""", "") + ";"
parser.initParser(new ByteArrayInputStream(scrubbedSql.getBytes))

try {
val statement = parser.readStatement()
Config.consumers.par.foreach(_ ! Statement(statement))
val statement = parser.parse(new StringReader(scrubbedSql))
Config.consumers.par.foreach(_ ! SqlStatement(statement, sql))
} catch {
case e:Exception => logger.error(e, " [guzzler] Exception caught while parsing SQL '" + scrubbedSql)
case ignore => logger.error(" [guzzler] Could not process SQL (unknown error): " + scrubbedSql + " -> " + ignore)
}
}
}

// wraps ZQL's ZStatement
case class Statement(s:ZStatement)
// wraps a Statement
case class SqlStatement(statement:Statement, sql:String)

// pauses the queue, consumers stop getting messages
case class QueuePause()
Expand Down
18 changes: 10 additions & 8 deletions src/main/scala/guzzler/consumers/RabbitMQDeliveryConsumer.scala
Expand Up @@ -21,13 +21,15 @@ package guzzler.consumers

import akka.actor.Actor
import akka.actor.Actor._
import org.gibello.zql._
import guzzler.rabbitmq._
import java.lang.Exception
import net.lag.logging.Logger
import net.lag.configgy.ConfigMap
import guzzler._
import ssh.{SshdMessage, SshdSubscribe}
import net.sf.jsqlparser.statement.delete.Delete
import net.sf.jsqlparser.statement.update.Update
import net.sf.jsqlparser.statement.insert.Insert

/**
* Accepts bin log messages and pushes them into
Expand Down Expand Up @@ -80,16 +82,16 @@ class RabbitMQDeliveryConsumer extends Actor {
case SshdMessage(msg) => {
logger.info("RabbitMQDeliveryConsumer: Got an ssh message: " + msg)
}
case Statement(s) => {
case SqlStatement(s, sql) => {
s match {
case q:ZInsert => {
beamOff(q.toString.getBytes, q.getTable, INSERT)
case q:Insert => {
beamOff(sql.getBytes, q.getTable.getName, INSERT)
}
case q:ZUpdate => {
beamOff(q.toString.getBytes, q.getTable, UPDATE)
case q:Update => {
beamOff(sql.getBytes, q.getTable.getName, UPDATE)
}
case q:ZDelete => {
beamOff(q.toString.getBytes, q.getTable, DELETE)
case q:Delete => {
beamOff(sql.getBytes, q.getTable.getName, DELETE)
}
}
}
Expand Down

0 comments on commit f1b0722

Please sign in to comment.