Skip to content

Commit

Permalink
Convert varbinary MySQL columns to a readable form when sampling them
Browse files Browse the repository at this point in the history
Summary: EN-160 #close

Test Plan: manual

Reviewers: carl, hurshal, ankur, john, allan

Reviewed By: allan

Subscribers: engineering-list

Differential Revision: https://grizzly.memsql.com/D14946
  • Loading branch information
Wayne Song committed Mar 1, 2016
1 parent e443b97 commit c80241a
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 19 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ project/project/*
project/target/*
tests/target/*
examples/target/*
common/target/*
connectorLib/target/*
connectorLib/src/main/scala/com/memsql/spark/connector/target/*
etlLib/target/*
Expand Down
8 changes: 8 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ lazy val commonSettings = Seq(
excludeFilter in unmanagedSources := HiddenFileFilter || "prelude.scala"
)

lazy val common = (project in file("common")).
settings(commonSettings: _*).
settings(
name := "common"
)

lazy val connectorLib = (project in file("connectorLib")).
settings(commonSettings: _*).
settings(
Expand Down Expand Up @@ -149,6 +155,7 @@ lazy val hdfsUtils = (project in file("hdfsUtils")).

lazy val samplingUtils = (project in file("samplingUtils")).
settings(commonSettings: _*).
dependsOn(common).
settings(
name := "samplingUtils",
libraryDependencies ++= Seq(
Expand All @@ -163,6 +170,7 @@ lazy val samplingUtils = (project in file("samplingUtils")).
lazy val interface = (project in file("interface")).
dependsOn(connectorLib % "test->test;compile->compile").
dependsOn(etlLib % "test->test;compile->compile").
dependsOn(common).
dependsOn(jarInspector).
dependsOn(hdfsUtils).
dependsOn(samplingUtils).
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.memsql.spark.util

object StringConversionUtils {
def byteArrayToReadableString(bytes: Array[Byte]): String = {
// Build up a string with hex encoding such that printable ASCII
// characters get added as-is but other characters are added as an
// escape sequence (e.g. \x7f).
val sb = new StringBuilder()
bytes.foreach(b => {
if (b >= 0x20 && b <= 0x7e) {
sb.append(b.toChar)
} else {
sb.append("\\x%02x".format(b))
}
})
sb.toString
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import com.memsql.spark.etl.utils.Logging
import com.memsql.spark.interface.api._
import com.memsql.spark.interface.api.PipelineBatchType.PipelineBatchType
import com.memsql.spark.interface.util.ErrorUtils._
import com.memsql.spark.util.StringConversionUtils
import ApiActor._
import com.memsql.spark.interface.util.{PipelineLogger, BaseException}
import org.apache.spark.SparkContext
Expand Down Expand Up @@ -449,23 +450,10 @@ class DefaultPipelineMonitor(override val api: ActorRef,
row.toSeq.map(record => {
val sb = new StringBuilder()
record match {
case null => sb.append("null")
case bytes: Array[Byte] => {
// Build up a string with hex encoding such that printable ASCII
// characters get added as-is but other characters are added as an
// escape sequence (e.g. \x7f).
bytes.foreach(b => {
if (b >= 0x20 && b <= 0x7e) {
sb.append(b.toChar)
} else {
sb.append("\\x%02x".format(b))
}
})
}
case default => sb.append(record.toString)
case null => "null"
case bytes: Array[Byte] => StringConversionUtils.byteArrayToReadableString(bytes)
case default => record.toString
}

sb.toString
}).toList
} catch {
case e: Exception => List(s"Could not get string representation of record: $e")
Expand Down
15 changes: 12 additions & 3 deletions samplingUtils/samplingUtils.scala
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package com.memsql.sampling_utils

import org.apache.spark.sql.execution.datasources.jdbc.JDBCRDDHelper
import com.memsql.spark.util.StringConversionUtils
import java.io.{BufferedReader, InputStreamReader, InputStream, PrintWriter, StringWriter}
import java.util.Properties
import java.sql.{DriverManager, ResultSet}
import java.util.Properties
import org.apache.commons.csv._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.compress.CompressionCodecFactory
import org.apache.log4j.PropertyConfigurator
import org.apache.spark.sql.execution.datasources.jdbc.JDBCRDDHelper
import scala.collection.JavaConversions._
import spray.json._

Expand Down Expand Up @@ -224,7 +225,15 @@ object SamplingUtils {
def next() = rs
}
val rows = resultSetIterator.take(SAMPLE_SIZE).map(rs => {
(1 to metadata.getColumnCount).map(i => JsString(rs.getObject(i).toString)).toList
(1 to metadata.getColumnCount).map(i => {
val obj = rs.getObject(i)
val stringValue = obj match {
case null => "null"
case bytes: Array[Byte] => StringConversionUtils.byteArrayToReadableString(bytes)
case default => obj.toString
}
JsString(stringValue)
}).toList
}).toList
SamplingResult(success = true, columns = Some(columns), records = Some(rows))
}
Expand Down

0 comments on commit c80241a

Please sign in to comment.