diff --git a/src/main/scala/com/memsql/streamliner/examples/Extractors.scala b/src/main/scala/com/memsql/streamliner/examples/Extractors.scala index e569bfc..7375bb9 100644 --- a/src/main/scala/com/memsql/streamliner/examples/Extractors.scala +++ b/src/main/scala/com/memsql/streamliner/examples/Extractors.scala @@ -85,39 +85,3 @@ class SequenceExtractor extends Extractor { Some(df) } } - -// Finally, an Extractor can be implemented using any existing DStream that -// works with Spark Streaming. -class DStreamExtractor extends Extractor { - // InputDStream with type as per StreamingContext.fileStream - private var dStream: InputDStream[(LongWritable, Text)] = null - - def schema: StructType = StructType(StructField("word", StringType, false) :: Nil) - - override def initialize(ssc: StreamingContext, sqlContext: SQLContext, config: PhaseConfig, batchInterval: Long, - logger: PhaseLogger): Unit = { - val userConfig = config.asInstanceOf[UserExtractConfig] - val dataDir = userConfig.getConfigString("dataDir").getOrElse("/tmp") - - // we need an InputDStream to be able to call start(), but textFileStream would return a DStream - // we then "re-implement" textFileStream by calling directly fileStream here - dStream = ssc.fileStream[LongWritable, Text, TextInputFormat](dataDir) - dStream.start() - } - - override def cleanup(ssc: StreamingContext, sqlContext: SQLContext, config: PhaseConfig, batchInterval: Long, - logger: PhaseLogger): Unit = { - dStream.stop() - } - - override def next(ssc: StreamingContext, time: Long, sqlContext: SQLContext, config: PhaseConfig, batchInterval: Long, - logger: PhaseLogger): Option[DataFrame] = { - dStream.compute(Time(time)).map(rdd => { - // InputDStream[(LongWritable, Text)] = (key, value), retrieve the value lines - val lines = rdd.map(_._2.toString) - val words = lines.flatMap(_.split(" ")) - val rowRDD = words.map(Row(_)) - sqlContext.createDataFrame(rowRDD, schema) - }) - } -} diff --git a/src/main/scala/com/memsql/streamliner/examples/S3AccessLogsTransformer.scala b/src/main/scala/com/memsql/streamliner/examples/S3AccessLogsTransformer.scala index 684adf9..a46a377 100644 --- a/src/main/scala/com/memsql/streamliner/examples/S3AccessLogsTransformer.scala +++ b/src/main/scala/com/memsql/streamliner/examples/S3AccessLogsTransformer.scala @@ -37,11 +37,27 @@ class S3AccessLogsTransformer extends Transformer { new java.sql.Timestamp(PARSER.parse(s).getTime()) } + def parseIntOrDash(s: String) = s match { + case "-" => null + case str => str.toInt + } + override def transform(sqlContext: SQLContext, df: DataFrame, config: PhaseConfig, logger: PhaseLogger): DataFrame = { val stringRDD = df.rdd val parsedRDD = stringRDD.flatMap(x => S3_LINE_LOGPATS.findAllMatchIn(x(0).asInstanceOf[String]).map(y => y.subgroups)) - val timestampedRDD = parsedRDD.map(r => r.zipWithIndex.map({case (x, i) => if (i == 2) parseDateTime(x) else x})) - val rowRDD = timestampedRDD.map(x => Row.fromSeq(x)) + + val typeConvertedRdd = parsedRDD.map(r => + r.zipWithIndex.map({case (x, i) => + if (i == 2) { + parseDateTime(x) + } else if (Set(9, 11, 12, 13, 14) contains i) { + parseIntOrDash(x) + } else { + x + } + })) + + val rowRDD = typeConvertedRdd.map(x => Row.fromSeq(x)) sqlContext.createDataFrame(rowRDD, SCHEMA) } } diff --git a/src/test/scala/test/ExtractorsSpec.scala b/src/test/scala/test/ExtractorsSpec.scala index 63fe97a..7da863d 100644 --- a/src/test/scala/test/ExtractorsSpec.scala +++ b/src/test/scala/test/ExtractorsSpec.scala @@ -80,52 +80,6 @@ class ExtractorsSpec extends UnitSpec with LocalSparkContext { extract.cleanup(ssc, sqlContext, config, 1, logger) } - "DStreamExtractor" should "produce DataFrames from the InputDStream" in { - val extract = new DStreamExtractor - - // create tmp directory - val dataDir = System.getProperty("java.io.tmpdir") + "dstream/" - try { - val dir = new File(dataDir) - dir.mkdir() - } - finally { - System.setProperty("java.io.tmpdir", dataDir) - } - - // initialize the extractor - val config = UserExtractConfig( - class_name = "test", - value = JsObject( - "dataDir" -> JsString(dataDir) - ) - ) - extract.initialize(ssc, sqlContext, config, 1, logger) - - // wait some time, the check for new files is time sensitive - Thread sleep 1000 - - // create a new file, write data, rename the file to trigger the new file check - val file = File.createTempFile("tmp", ".txt") - val writer = new FileWriter(file) - try { - writer.write("hello world\nhello foo bar\nbar world") - } - finally writer.close() - file.renameTo(new File(dataDir, "testfile.txt")) - - // extract data - val maybeDf = extract.next(ssc, System.currentTimeMillis(), sqlContext, config, 1, logger) - assert(maybeDf.isDefined) - - // test that the dstream successfully created a dataframe - // http://spark.apache.org/docs/latest/streaming-programming-guide.html#dataframe-and-sql-operations - maybeDf.get.registerTempTable("words") - val wordCountsDataFrame = sqlContext.sql("select word, count(*) as total from words group by word order by word") - assert(wordCountsDataFrame.count == 4) - assert(wordCountsDataFrame.head == Row("bar", 2)) - } - "FileExtractor" should "produce DataFrame from files" in { val extract = new FileExtractor diff --git a/src/test/scala/test/TransformersSpec.scala b/src/test/scala/test/TransformersSpec.scala index 13aa009..5a7e97c 100644 --- a/src/test/scala/test/TransformersSpec.scala +++ b/src/test/scala/test/TransformersSpec.scala @@ -229,5 +229,7 @@ class TransformersSpec extends UnitSpec with LocalSparkContext { assert(first.getAs[String]("ip") == "192.0.2.3") assert(first.getAs[String]("user_agent") == "S3Console/0.4") assert(first.getAs[String]("version_id") == "-") + assert(first.getAs[Int]("http_status") == 200) + assert(first.getAs[Int]("object_size") === null) } }