Skip to content

Commit

Permalink
remove ByteBuffer coder, fix CassandraIO warnings (#1380)
Browse files Browse the repository at this point in the history
  • Loading branch information
nevillelyh committed Sep 19, 2018
1 parent 7200eff commit b56a83f
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
package com.spotify.scio.cassandra

import java.lang.management.ManagementFactory
import java.nio.ByteBuffer

import com.datastax.driver.core.{Cluster, ProtocolVersion}
import com.google.protobuf.ByteString
import org.apache.cassandra.db.marshal.CompositeType
import org.apache.cassandra.hadoop.cql3._
import org.apache.hadoop.conf.Configuration
Expand Down Expand Up @@ -75,17 +75,17 @@ private[cassandra] class BulkOperations(val opts: CassandraOptions, val parallel
BulkConfig(protocol, partitioner, numOfNodes, tableSchema, partitionKeyIndices, dataTypes)
}

val serializeFn: Seq[Any] => Array[ByteBuffer] = (values: Seq[Any]) => {
val b = Array.newBuilder[ByteBuffer]
val serializeFn: Seq[Any] => Array[ByteString] = (values: Seq[Any]) => {
val b = Array.newBuilder[ByteString]
val i = values.iterator
val j = config.dataTypes.iterator
while (i.hasNext && j.hasNext) {
b += CompatUtil.serialize(j.next().get, i.next(), config.protocol)
b += ByteString.copyFrom(CompatUtil.serialize(j.next().get, i.next(), config.protocol))
}
b.result()
}

val partitionFn: Array[ByteBuffer] => Int = {
val partitionFn: Array[ByteString] => Int = {
// Partition tokens equally across workers regardless of cluster token distribution
// This may not create 1-to-1 mapping between partitions and C* nodes but handles multi-DC
// clusters better
Expand All @@ -95,23 +95,23 @@ private[cassandra] class BulkOperations(val opts: CassandraOptions, val parallel
val (q, mod) = (maxToken - minToken + 1) /% numPartitions
val rangePerGroup = (if (mod != 0) q + 1 else q).bigInteger

(values: Array[ByteBuffer]) =>
values: Array[ByteString] =>
{
val key = if (config.partitionKeyIndices.length == 1) {
values(config.partitionKeyIndices.head)
values(config.partitionKeyIndices.head).asReadOnlyByteBuffer()
} else {
val keys = config.partitionKeyIndices.map(values)
val keys = config.partitionKeyIndices.map(values).map(_.asReadOnlyByteBuffer())
CompositeType.build(keys: _*)
}
val token = CompatUtil.getToken(config.partitioner, key)
token.divide(rangePerGroup).intValue()
}
}

val writeFn: ((Int, Iterable[Array[ByteBuffer]])) => Unit =
(kv: (Int, Iterable[Array[ByteBuffer]])) => {
val writeFn: ((Int, Iterable[Array[ByteString]])) => Unit =
(kv: (Int, Iterable[Array[ByteString]])) => {
val w = newWriter
kv._2.foreach(row => w.write(null, row.toList.asJava))
kv._2.foreach(row => w.write(null, row.map(_.asReadOnlyByteBuffer()).toList.asJava))
w.close(null: TaskAttemptContext)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
package com.spotify.scio.cassandra

import java.lang.management.ManagementFactory
import java.nio.ByteBuffer

import com.datastax.driver.core.{Cluster, ProtocolVersion}
import com.google.protobuf.ByteString
import org.apache.cassandra.db.marshal.CompositeType
import org.apache.cassandra.hadoop.cql3._
import org.apache.hadoop.conf.Configuration
Expand Down Expand Up @@ -75,17 +75,17 @@ private[cassandra] class BulkOperations(val opts: CassandraOptions, val parallel
BulkConfig(protocol, partitioner, numOfNodes, tableSchema, partitionKeyIndices, dataTypes)
}

val serializeFn: Seq[Any] => Array[ByteBuffer] = (values: Seq[Any]) => {
val b = Array.newBuilder[ByteBuffer]
val serializeFn: Seq[Any] => Array[ByteString] = (values: Seq[Any]) => {
val b = Array.newBuilder[ByteString]
val i = values.iterator
val j = config.dataTypes.iterator
while (i.hasNext && j.hasNext) {
b += CompatUtil.serialize(j.next().get, i.next(), config.protocol)
b += ByteString.copyFrom(CompatUtil.serialize(j.next().get, i.next(), config.protocol))
}
b.result()
}

val partitionFn: Array[ByteBuffer] => Int = {
val partitionFn: Array[ByteString] => Int = {
// Partition tokens equally across workers regardless of cluster token distribution
// This may not create 1-to-1 mapping between partitions and C* nodes but handles multi-DC
// clusters better
Expand All @@ -95,23 +95,23 @@ private[cassandra] class BulkOperations(val opts: CassandraOptions, val parallel
val (q, mod) = (maxToken - minToken + 1) /% numPartitions
val rangePerGroup = (if (mod != 0) q + 1 else q).bigInteger

(values: Array[ByteBuffer]) =>
values: Array[ByteString] =>
{
val key = if (config.partitionKeyIndices.length == 1) {
values(config.partitionKeyIndices.head)
values(config.partitionKeyIndices.head).asReadOnlyByteBuffer()
} else {
val keys = config.partitionKeyIndices.map(values)
val keys = config.partitionKeyIndices.map(values).map(_.asReadOnlyByteBuffer())
CompositeType.build(keys: _*)
}
val token = CompatUtil.getToken(config.partitioner, key)
token.divide(rangePerGroup).intValue()
}
}

val writeFn: ((Int, Iterable[Array[ByteBuffer]])) => Unit =
(kv: (Int, Iterable[Array[ByteBuffer]])) => {
val writeFn: ((Int, Iterable[Array[ByteString]])) => Unit =
(kv: (Int, Iterable[Array[ByteString]])) => {
val w = newWriter
kv._2.foreach(row => w.write(null, row.toList.asJava))
kv._2.foreach(row => w.write(null, row.map(_.asReadOnlyByteBuffer()).toList.asJava))
w.close(null: TaskAttemptContext)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,29 +20,6 @@ package com.spotify.scio.coders
import org.apache.beam.sdk.{coders => bcoders}
import org.apache.beam.sdk.coders.{Coder => _, _}
import org.apache.beam.sdk.values.KV
import java.nio.ByteBuffer
import java.io.{InputStream, OutputStream}

final class ByteBufferCoder private[coders] () extends AtomicCoder[ByteBuffer] {
val bac = ByteArrayCoder.of()
def encode(value: ByteBuffer, os: OutputStream): Unit = {
val array =
if (value.hasArray) {
value.array()
} else {
value.clear()
val a = new Array[Byte](value.capacity())
value.get(a, 0, a.length)
a
}
bac.encode(array, os)
}

def decode(is: InputStream): ByteBuffer = {
val bytes = bac.decode(is)
ByteBuffer.wrap(bytes)
}
}

//
// Java Coders
Expand Down Expand Up @@ -101,10 +78,6 @@ trait JavaCoders {
implicit def messageCoder: Coder[org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage] =
Coder.beam(org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder.of())

import java.nio.ByteBuffer
implicit def byteBufferCoder: Coder[ByteBuffer] =
Coder.beam(new ByteBufferCoder())

implicit def beamKVCoder[K: Coder, V: Coder]: Coder[KV[K, V]] =
Coder.kv(Coder[K], Coder[V])
}

0 comments on commit b56a83f

Please sign in to comment.