Skip to content

Commit

Permalink
First commit
Browse files Browse the repository at this point in the history
  • Loading branch information
mauricio committed Feb 26, 2012
0 parents commit 1285964
Show file tree
Hide file tree
Showing 18 changed files with 675 additions and 0 deletions.
9 changes: 9 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
.cache
target/*
bin/*
.idea*
.classpath
.project
.settings/*
project/target/*
project/project/*
54 changes: 54 additions & 0 deletions project/Build.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/**
* User: Maurício Linhares
* Date: 2/18/12
* Time: 6:34 PM
*/

import sbt._
import Keys._

object Build extends sbt.Build {

lazy val project = Project(
"postgresql-netty",
file(".")
).settings(
organization := "org.postgresql",
version := "1.0.0",
scalaVersion := "2.9.1",
scalacOptions := Seq("-deprecation", "-encoding", "utf8"),
resolvers ++= Configuration.resolutionRepos,
libraryDependencies ++= Configuration.dependencies)

}


object Configuration {

object Repos {
val JavaNet = "java-net-maven" at "http://download.java.net/maven/2"
val SonatypeSnapshots = "sonatype-snapshots" at "http://oss.sonatype.org/content/repositories/snapshots"
val SonatypeReleases = "sonatype-releases" at "http://oss.sonatype.org/content/repositories/releases"
val SpringSource = "spring-source" at "http://repository.springsource.com/ivy/bundles/release"
val SpringExternal = "spring-external" at "http://repository.springsource.com/maven/bundles/external"
}

val resolutionRepos = Seq[Resolver](
ScalaToolsReleases,
ScalaToolsSnapshots,
Repos.SonatypeReleases,
Repos.SonatypeSnapshots,
Repos.SpringExternal,
Repos.SpringSource,
Repos.JavaNet
)

val dependencies = Seq(
"io.netty" % "netty" % "3.3.1.Final",
"org.scala-lang" % "scala-compiler" % "2.9.1",
"org.scala-lang" % "scala-library" % "2.9.1",
"org.specs2" % "specs2_2.9.1" % "1.8.1" % "test",
"org.slf4j" % "slf4j-log4j12" % "1.6.3"
)

}
5 changes: 5 additions & 0 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.0.0")

resolvers += "sbt-idea-repo" at "http://mpeltonen.github.com/maven/"

addSbtPlugin("com.github.mpeltonen" % "sbt-idea" % "1.0.0")
28 changes: 28 additions & 0 deletions src/main/resources/log4j.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">

<appender name="stdout" class="org.apache.log4j.ConsoleAppender">
<layout class="org.apache.log4j.PatternLayout">
<!-- Pattern to output the caller's file name and line number -->
<param name="ConversionPattern" value="%5p [%t] (%F:%L) - %m%n" />
</layout>
</appender>

<appender name="R" class="org.apache.log4j.RollingFileAppender">
<param name="file" value="logs/jobs.log" />
<param name="MaxFileSize" value="1MB" />
<!-- Keep one backup file -->

<param name="MaxBackupIndex" value="50" />
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%5p [%t] (%F:%L) - %m%n" />
</layout>
</appender>

<root>
<priority value="DEBUG" />
<appender-ref ref="stdout" />
</root>

</log4j:configuration>
54 changes: 54 additions & 0 deletions src/main/scala/com/github/mauricio/postgresql/Connection.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package com.github.mauricio.postgresql

import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
import java.util.concurrent.Executors
import org.jboss.netty.bootstrap.ClientBootstrap
import java.net.InetSocketAddress
import scala.collection.JavaConversions
import collection.Map
import org.jboss.netty.channel.{ChannelFuture, Channels, ChannelPipeline, ChannelPipelineFactory}

/**
* User: Maurício Linhares
* Date: 2/25/12
* Time: 7:46 PM
*/

object Connection {

val Name = "Netty-PostgreSQL-driver-0.0.1"

}

class Connection(host: String, port: Int, username: String, password: String, database: String) {

val factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
val handler = new DatabaseConnectionHandler( username , database)
val bootstrap = new ClientBootstrap(this.factory)
var channelFuture : ChannelFuture = null

def connect : Unit = {

this.bootstrap.setPipelineFactory(new ChannelPipelineFactory() {

override def getPipeline(): ChannelPipeline = {
return Channels.pipeline(new MessageDecoder(), handler);
}

});

this.bootstrap.setOption("child.tcpNoDelay", true);
this.bootstrap.setOption("child.keepAlive", true);
this.channelFuture = this.bootstrap.connect(new InetSocketAddress( this.host, this.port)).awaitUninterruptibly()

}

def disconnect : Unit = {
this.channelFuture.getChannel.getCloseFuture.awaitUninterruptibly()
}

def parameterStatuses : Map[String, String] = {
JavaConversions.mapAsScalaMap(this.handler.parameterStatus)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package com.github.mauricio.postgresql

import org.jboss.netty.channel.SimpleChannelHandler
import org.jboss.netty.channel.ChannelHandlerContext
import org.jboss.netty.channel.ExceptionEvent
import org.jboss.netty.channel.ChannelStateEvent
import org.jboss.netty.channel.MessageEvent
import org.jboss.netty.buffer.ChannelBuffer
import java.util.concurrent.{ConcurrentHashMap}
import util.Log

object DatabaseConnectionHandler {
val log = Log.get[DatabaseConnectionHandler]
}

class DatabaseConnectionHandler(val user: String, val database: String) extends SimpleChannelHandler {

private val log = DatabaseConnectionHandler.log

val properties = List(
"user" -> user,
"database" -> database,
"application_name" -> Connection.Name,
"client_encoding" -> "UTF8",
"DateStyle" -> "ISO",
"extra_float_digits" -> "2")

val parameterStatus = new ConcurrentHashMap[String, String]()

override def channelConnected(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = {

val buffer = new OutputBuffer()

buffer.writeInteger2(3)
buffer.writeInteger2(0)

properties.foreach {
entry =>
buffer.writeCString(entry._1)
buffer.writeCString(entry._2)
}

buffer.writeByte(0)

e.getChannel().write(buffer.toBuffer)
}

override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent): Unit = {

e.getMessage() match {
case m: Message => {

log.debug( "name -> %s -> %s", m.name, m.content )

m.name match {
case Message.ParameterStatus => {
val pair = m.content.asInstanceOf[(String, String)]
this.parameterStatus.put( pair._1, pair._2 )
}
case _ => {
throw new IllegalStateException("Handler not implemented for message %s".format( m.name ))
}
}

}
case buffer: ChannelBuffer => {
if (buffer.readableBytes() > 0) {
val result = new Array[Byte](buffer.readableBytes())
buffer.readBytes(result)
log.debug( "message result is => %s", new String(result) )
}
}
case _ => {
throw new IllegalArgumentException( "Unknown message type - %s".format( e.getMessage() ) )
}

}

}

override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent): Unit = {
e.getCause().printStackTrace()
e.getChannel().close()
}

}
23 changes: 23 additions & 0 deletions src/main/scala/com/github/mauricio/postgresql/Message.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.github.mauricio.postgresql

object Message {

val AuthenticationOk = 'R'
val ParameterStatus = 'S'
val BackendKeyData = 'K'
val CommandComplete = 'C'
val ReadyForQuery = 'Z'
val RowDescription = 'T'
val DataRow = 'D'
val Error = 'R'
val Notice = 'N'
val ParseComplete = '1'
val BindComplete = '2'
val Notification = 'A'
val NoData = 'n'
val EmptyQuery = 'I'
val PortalSuspended = 's'

}

class Message ( val name : Char, val content : Any )
46 changes: 46 additions & 0 deletions src/main/scala/com/github/mauricio/postgresql/MessageDecoder.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package com.github.mauricio.postgresql

import org.jboss.netty.handler.codec.frame.FrameDecoder
import org.jboss.netty.buffer.ChannelBuffer
import org.jboss.netty.channel.ChannelHandlerContext
import org.jboss.netty.channel.Channel
import org.jboss.netty.buffer.ChannelBuffers
import parsers.ParserR

class MessageDecoder extends FrameDecoder {

protected def decode(ctx: ChannelHandlerContext, c: Channel, b: ChannelBuffer): Object = {

var code : Char = 0
var length : Int = 0

if ( b.readableBytes() >= 5 ) {

b.markReaderIndex()

code = b.readByte().asInstanceOf[Char]
length = b.readInt() - 4

code match {
case 'R' => {
ParserR.Instance.parseMessage( b )
}
case _ => {

if ( b.readableBytes() >= length ) {
MessageParser.parse( code, b.readSlice( length ) )
} else {
return null
}

}
}


} else {
return null
}

}

}
33 changes: 33 additions & 0 deletions src/main/scala/com/github/mauricio/postgresql/MessageParser.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.github.mauricio.postgresql

import org.jboss.netty.buffer.ChannelBuffer
import parsers.{ParserS, ParserE, ParserR}

object MessageParser {

private val parsers = Map(
'E' -> new ParserE(),
'R' -> ParserR.Instance,
'S' -> new ParserS()
)

def parserFor( t : Char ) : MessageParser = {
this.parsers.get(t).getOrElse {
throw new ParserNotAvailableException(t)
}
}

def parse( t : Char, b : ChannelBuffer ) : Message = {
this.parserFor( t ).parseMessage( b )
}

}

trait MessageParser {

def parseMessage( buffer : ChannelBuffer ) : Message

}

class ParserNotAvailableException ( t : Char )
extends RuntimeException( "There is no parser available for message type '%s'".format(t) )
Loading

0 comments on commit 1285964

Please sign in to comment.