Skip to content

Commit

Permalink
using ASM and ByteBuddy to create GetRequest's and Scanner's subclasses
Browse files Browse the repository at this point in the history
  • Loading branch information
jongwook committed Sep 27, 2016
1 parent c50d43a commit 770917e
Show file tree
Hide file tree
Showing 5 changed files with 287 additions and 6 deletions.
3 changes: 2 additions & 1 deletion s2core/build.sbt
Expand Up @@ -40,7 +40,8 @@ libraryDependencies ++= Seq(
"com.github.danielwegener" % "logback-kafka-appender" % "0.0.4",
"com.stumbleupon" % "async" % "1.4.1",
"io.netty" % "netty" % "3.9.4.Final" force(),
"org.hbase" % "asynchbase" % "1.7.2-S2GRAPH" from "https://github.com/SteamShon/asynchbase/raw/mvn-repo/org/hbase/asynchbase/1.7.2-S2GRAPH/asynchbase-1.7.2-S2GRAPH.jar"
"org.hbase" % "asynchbase" % "1.7.2",
"net.bytebuddy" % "byte-buddy" % "1.4.26"
)

libraryDependencies := {
Expand Down
@@ -0,0 +1,139 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.s2graph.core.storage.hbase

import java.lang.Integer.valueOf
import java.nio.charset.StandardCharsets

import net.bytebuddy.ByteBuddy
import net.bytebuddy.description.modifier.Visibility.PUBLIC
import net.bytebuddy.dynamic.loading.ClassLoadingStrategy
import net.bytebuddy.implementation.FieldAccessor
import net.bytebuddy.implementation.MethodDelegation.to
import net.bytebuddy.matcher.ElementMatchers._
import org.apache.commons.io.IOUtils
import org.hbase.async.GetRequestExtra.GetRequestExtra
import org.hbase.async.ScannerExtra.ScannerExtra
import org.hbase.async._
import org.objectweb.asm.Opcodes.{ACC_FINAL, ACC_PRIVATE, ACC_PROTECTED, ACC_PUBLIC}
import org.objectweb.asm._

import scala.collection.JavaConversions._

/**
* Upon initialization, it loads a patched version of Asynchbase's GetRequest and Scanner class,
* modified using ASM to make the classes non-final and their methods are all public,
* so that ByteBuddy can create subclasses of them.
*
* This object has to be initialized before any access to (i.e. any classloading of) Asynchbase,
* since the ClassLoader does not allow redefining already loaded classes unless we use instrumentation.
*/
object AsynchbasePatcher {

/** invoking this method will force loading this class, thus triggering the patch mechanism below */
def init(): Unit = {
assert(getRequestClass.getMethod("setStoreLimit", classOf[Int]) != null)
assert(getRequestClass.getMethod("setStoreOffset", classOf[Int]) != null)
assert(scannerClass.getMethod("setRpcTimeout", classOf[Int]) != null)
}

/** instantiates a new GetRequest, patched to support storeLimit/storeOffset */
def newGetRequest(table: Array[Byte], key: Array[Byte], family: Array[Byte]): GetRequestExtra = {
getRequestClass.getConstructor(BA, BA, BA)
.newInstance(table, key, family).asInstanceOf[GetRequestExtra]
}

/** instantiates a new GetRequest, patched to support storeLimit/storeOffset */
def newGetRequest(table: Array[Byte], key: Array[Byte], family: Array[Byte], qualifier: Array[Byte]): GetRequestExtra = {
getRequestClass.getConstructor(BA, BA, BA, BA)
.newInstance(table, key, family, qualifier).asInstanceOf[GetRequestExtra]
}

/** instantiate a new Scanner, patched to support RPC timeout */
def newScanner(client: HBaseClient, table: Array[Byte]): ScannerExtra = {
val constructor = scannerClass.getConstructor(classOf[HBaseClient], BA)
constructor.setAccessible(true)
constructor.newInstance(client, table).asInstanceOf[ScannerExtra]
}

/** instantiate a new Scanner, patched to support RPC timeout */
def newScanner(client: HBaseClient, table: String): ScannerExtra = {
newScanner(client, table.getBytes(StandardCharsets.UTF_8))
}


private val BA = classOf[Array[Byte]]
private val classLoader = getClass.getClassLoader
private val defineClass = classOf[ClassLoader].getDeclaredMethod("defineClass", classOf[String], classOf[Array[Byte]], classOf[Int], classOf[Int])
defineClass.setAccessible(true)

/** loads Asynchbase classes from s2core's classpath
* *MUST* be called before any access to those classes,
* otherwise the classloading will fail with an "attempted duplicate class definition" error.
**/
private def loadClass(name: String): Class[_] = {
classLoader.getResources(s"org/hbase/async/$name.class").toSeq.headOption match {
case Some(url) =>
val stream = url.openStream()
val bytes = try { IOUtils.toByteArray(stream) } finally { stream.close() }

// patch the bytecode so that the class is no longer final and the methods are all accessible
val cw = new ClassWriter(ClassWriter.COMPUTE_FRAMES)
new ClassReader(bytes).accept(new ClassAdapter(cw) {
override def visit(version: Int, access: Int, name: String, signature: String, superName: String, interfaces: Array[String]): Unit = {
super.visit(version, access & ~ACC_FINAL, name, signature, superName, interfaces)
}
override def visitMethod(access: Int, name: String, desc: String, signature: String, exceptions: Array[String]): MethodVisitor = {
super.visitMethod(access & ~ACC_PRIVATE & ~ACC_PROTECTED & ~ACC_FINAL | ACC_PUBLIC, name, desc, signature, exceptions)
}
}, 0)
val patched = cw.toByteArray

defineClass.invoke(classLoader, s"org.hbase.async.$name", patched, valueOf(0), valueOf(patched.length)).asInstanceOf[Class[_]]
case None =>
throw new ClassNotFoundException(s"Could not find Asynchbase class: $name")
}
}

/** a java.lang.Class instance for the patched GetRequest class */
private val getRequestClass: Class[_] = {
new ByteBuddy()
.subclass(loadClass("GetRequest"))
.name("org.hbase.async.GetRequestEx")
.implement(classOf[GetRequestExtra.IsGetRequest])
.implement(classOf[GetRequestExtra.StoreLimitOffset]).intercept(FieldAccessor.ofBeanProperty())
.defineField("storeLimit", classOf[Int], PUBLIC)
.defineField("storeOffset", classOf[Int], PUBLIC)
.method(named("serialize")).intercept(to(GetRequestExtra))
.make.load(classLoader, ClassLoadingStrategy.Default.INJECTION).getLoaded
}

/** a java.lang.Class instance for the patched Scanner class */
private val scannerClass = {
new ByteBuddy()
.subclass(loadClass("Scanner"))
.name("org.hbase.async.ScannerEx")
.implement(classOf[ScannerExtra.RpcTimeout]).intercept(FieldAccessor.ofBeanProperty())
.defineField("rpcTimeout", classOf[Int], PUBLIC)
.method(named("getNextRowsRequest")).intercept(to(ScannerExtra))
.make.load(classLoader, ClassLoadingStrategy.Default.INJECTION).getLoaded
}

}
Expand Up @@ -51,6 +51,7 @@ object AsynchbaseStorage {
val edgeCf = Serializable.edgeCf
val emptyKVs = new util.ArrayList[KeyValue]()

AsynchbasePatcher.init()

def makeClient(config: Config, overrideKv: (String, String)*) = {
val asyncConfig: org.hbase.async.Config =
Expand Down Expand Up @@ -198,7 +199,7 @@ class AsynchbaseStorage(override val config: Config)(implicit ec: ExecutionConte

label.schemaVersion match {
case HBaseType.VERSION4 if queryParam.tgtVertexInnerIdOpt.isEmpty =>
val scanner = client.newScanner(label.hbaseTableName.getBytes)
val scanner = AsynchbasePatcher.newScanner(client, label.hbaseTableName)
scanner.setFamily(edgeCf)

/*
Expand Down Expand Up @@ -247,13 +248,13 @@ class AsynchbaseStorage(override val config: Config)(implicit ec: ExecutionConte
scanner
case _ =>
val get =
if (queryParam.tgtVertexInnerIdOpt.isDefined) new GetRequest(label.hbaseTableName.getBytes, rowKey, edgeCf, qualifier)
else new GetRequest(label.hbaseTableName.getBytes, rowKey, edgeCf)
if (queryParam.tgtVertexInnerIdOpt.isDefined) AsynchbasePatcher.newGetRequest(label.hbaseTableName.getBytes, rowKey, edgeCf, qualifier)
else AsynchbasePatcher.newGetRequest(label.hbaseTableName.getBytes, rowKey, edgeCf)

get.maxVersions(1)
get.setFailfast(true)
get.setMaxResultsPerColumnFamily(queryParam.limit)
get.setRowOffsetPerColumnFamily(queryParam.offset)
get.setStoreLimit(queryParam.limit)
get.setStoreOffset(queryParam.offset)
get.setMinTimestamp(minTs)
get.setMaxTimestamp(maxTs)
get.setTimeout(queryParam.rpcTimeoutInMillis)
Expand Down
96 changes: 96 additions & 0 deletions s2core/src/main/scala/org/hbase/async/GetRequestExtra.scala
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.hbase.async

import net.bytebuddy.implementation.bind.annotation.This
import org.hbase.async.generated.HBasePB.TimeRange
import org.hbase.async.generated.{FilterPB, ClientPB}
import org.jboss.netty.buffer.ChannelBuffer

object GetRequestExtra {

trait IsGetRequest {
def isGetRequest: Boolean
}

trait StoreLimitOffset {
def getStoreLimit: Int
def getStoreOffset: Int
def setStoreLimit(storeLimit: Int): Unit
def setStoreOffset(storeOffset: Int): Unit
}

type GetRequestExtra = org.hbase.async.GetRequest with IsGetRequest with StoreLimitOffset

def serialize(server_version: Byte, @This request: GetRequestExtra): ChannelBuffer = {
val getpb = ClientPB.Get.newBuilder.setRow(Bytes.wrap(request.key()))

if (request.family != null) {
val column = ClientPB.Column.newBuilder
column.setFamily(Bytes.wrap(request.family))
if (request.qualifiers != null) {
for (qualifier <- request.qualifiers) {
column.addQualifier(Bytes.wrap(qualifier))
}
}
getpb.addColumn(column.build)
}

// Filters
val filter = request.getFilter
if (filter != null) {
getpb.setFilter(FilterPB.Filter.newBuilder.setNameBytes(Bytes.wrap(filter.name)).setSerializedFilter(Bytes.wrap(filter.serialize)).build)
}

// TimeRange
val min_ts = request.getMinTimestamp
val max_ts = request.getMaxTimestamp
if (min_ts != 0 || max_ts != Long.MaxValue) {
val time = TimeRange.newBuilder
if (min_ts != 0) {
time.setFrom(min_ts)
}
if (max_ts != Long.MaxValue) {
time.setTo(max_ts)
}
getpb.setTimeRange(time.build)
}

if (request.maxVersions != 1) {
getpb.setMaxVersions(request.maxVersions)
}
if (!request.isGetRequest) {
getpb.setExistenceOnly(true)
}

// storeOffset and storeLimit
if (request.getStoreLimit >= 0) {
getpb.setStoreLimit(request.getStoreLimit)
}
if (request.getStoreOffset != 0) {
getpb.setStoreOffset(request.getStoreOffset)
}

val get = ClientPB.GetRequest.newBuilder.setRegion(request.region.toProtobuf).setGet(getpb.build)

HBaseRpc.toChannelBuffer(GetRequest.GGET, get.build)
}

}
44 changes: 44 additions & 0 deletions s2core/src/main/scala/org/hbase/async/ScannerExtra.scala
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.hbase.async

import java.util.concurrent.Callable

import net.bytebuddy.implementation.bind.annotation.{SuperCall, This}

object ScannerExtra {

trait RpcTimeout {
def getRpcTimeout: Int
def setRpcTimeout(timeout: Int): Unit
}

type ScannerExtra = Scanner with RpcTimeout

def getNextRowsRequest(@This scanner: ScannerExtra, @SuperCall getNextRowsRequest: Callable[HBaseRpc]): HBaseRpc = {
val request = getNextRowsRequest.call()
val rpcTimeout = scanner.getRpcTimeout
if (rpcTimeout > 0) {
request.setTimeout(rpcTimeout)
}
request
}

}

0 comments on commit 770917e

Please sign in to comment.