Skip to content

Commit

Permalink
Fixed S3AccessLogsTransformer and removed DStreamExtractor
Browse files Browse the repository at this point in the history
Summary:
* Fixed S3AccessLogsTransformer by adding explicit type conversions
* Removed DStreamExtractor pending EN-142
  • Loading branch information
jwbowler committed Feb 23, 2016
1 parent 8b3f61d commit 15159e4
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 84 deletions.
36 changes: 0 additions & 36 deletions src/main/scala/com/memsql/streamliner/examples/Extractors.scala
Expand Up @@ -85,39 +85,3 @@ class SequenceExtractor extends Extractor {
Some(df)
}
}

// Finally, an Extractor can be implemented using any existing DStream that
// works with Spark Streaming.
class DStreamExtractor extends Extractor {
// InputDStream with type as per StreamingContext.fileStream
private var dStream: InputDStream[(LongWritable, Text)] = null

def schema: StructType = StructType(StructField("word", StringType, false) :: Nil)

override def initialize(ssc: StreamingContext, sqlContext: SQLContext, config: PhaseConfig, batchInterval: Long,
logger: PhaseLogger): Unit = {
val userConfig = config.asInstanceOf[UserExtractConfig]
val dataDir = userConfig.getConfigString("dataDir").getOrElse("/tmp")

// we need an InputDStream to be able to call start(), but textFileStream would return a DStream
// we then "re-implement" textFileStream by calling directly fileStream here
dStream = ssc.fileStream[LongWritable, Text, TextInputFormat](dataDir)
dStream.start()
}

override def cleanup(ssc: StreamingContext, sqlContext: SQLContext, config: PhaseConfig, batchInterval: Long,
logger: PhaseLogger): Unit = {
dStream.stop()
}

override def next(ssc: StreamingContext, time: Long, sqlContext: SQLContext, config: PhaseConfig, batchInterval: Long,
logger: PhaseLogger): Option[DataFrame] = {
dStream.compute(Time(time)).map(rdd => {
// InputDStream[(LongWritable, Text)] = (key, value), retrieve the value lines
val lines = rdd.map(_._2.toString)
val words = lines.flatMap(_.split(" "))
val rowRDD = words.map(Row(_))
sqlContext.createDataFrame(rowRDD, schema)
})
}
}
Expand Up @@ -37,11 +37,27 @@ class S3AccessLogsTransformer extends Transformer {
new java.sql.Timestamp(PARSER.parse(s).getTime())
}

def parseIntOrDash(s: String) = s match {
case "-" => null
case str => str.toInt
}

override def transform(sqlContext: SQLContext, df: DataFrame, config: PhaseConfig, logger: PhaseLogger): DataFrame = {
val stringRDD = df.rdd
val parsedRDD = stringRDD.flatMap(x => S3_LINE_LOGPATS.findAllMatchIn(x(0).asInstanceOf[String]).map(y => y.subgroups))
val timestampedRDD = parsedRDD.map(r => r.zipWithIndex.map({case (x, i) => if (i == 2) parseDateTime(x) else x}))
val rowRDD = timestampedRDD.map(x => Row.fromSeq(x))

val typeConvertedRdd = parsedRDD.map(r =>
r.zipWithIndex.map({case (x, i) =>
if (i == 2) {
parseDateTime(x)
} else if (Set(9, 11, 12, 13, 14) contains i) {
parseIntOrDash(x)
} else {
x
}
}))

val rowRDD = typeConvertedRdd.map(x => Row.fromSeq(x))
sqlContext.createDataFrame(rowRDD, SCHEMA)
}
}
46 changes: 0 additions & 46 deletions src/test/scala/test/ExtractorsSpec.scala
Expand Up @@ -80,52 +80,6 @@ class ExtractorsSpec extends UnitSpec with LocalSparkContext {
extract.cleanup(ssc, sqlContext, config, 1, logger)
}

"DStreamExtractor" should "produce DataFrames from the InputDStream" in {
val extract = new DStreamExtractor

// create tmp directory
val dataDir = System.getProperty("java.io.tmpdir") + "dstream/"
try {
val dir = new File(dataDir)
dir.mkdir()
}
finally {
System.setProperty("java.io.tmpdir", dataDir)
}

// initialize the extractor
val config = UserExtractConfig(
class_name = "test",
value = JsObject(
"dataDir" -> JsString(dataDir)
)
)
extract.initialize(ssc, sqlContext, config, 1, logger)

// wait some time, the check for new files is time sensitive
Thread sleep 1000

// create a new file, write data, rename the file to trigger the new file check
val file = File.createTempFile("tmp", ".txt")
val writer = new FileWriter(file)
try {
writer.write("hello world\nhello foo bar\nbar world")
}
finally writer.close()
file.renameTo(new File(dataDir, "testfile.txt"))

// extract data
val maybeDf = extract.next(ssc, System.currentTimeMillis(), sqlContext, config, 1, logger)
assert(maybeDf.isDefined)

// test that the dstream successfully created a dataframe
// http://spark.apache.org/docs/latest/streaming-programming-guide.html#dataframe-and-sql-operations
maybeDf.get.registerTempTable("words")
val wordCountsDataFrame = sqlContext.sql("select word, count(*) as total from words group by word order by word")
assert(wordCountsDataFrame.count == 4)
assert(wordCountsDataFrame.head == Row("bar", 2))
}

"FileExtractor" should "produce DataFrame from files" in {
val extract = new FileExtractor

Expand Down
2 changes: 2 additions & 0 deletions src/test/scala/test/TransformersSpec.scala
Expand Up @@ -229,5 +229,7 @@ class TransformersSpec extends UnitSpec with LocalSparkContext {
assert(first.getAs[String]("ip") == "192.0.2.3")
assert(first.getAs[String]("user_agent") == "S3Console/0.4")
assert(first.getAs[String]("version_id") == "-")
assert(first.getAs[Int]("http_status") == 200)
assert(first.getAs[Int]("object_size") === null)
}
}

0 comments on commit 15159e4

Please sign in to comment.