Skip to content

Commit

Permalink
Merge pull request AbsaOSS#1 from hotels-hde/adding_sensor_for_headers
Browse files Browse the repository at this point in the history
EGUAS-499 adding headers to dataframe
  • Loading branch information
Randal Boyle authored and GitHub Enterprise committed Sep 6, 2019
2 parents adbedc0 + 70aedac commit a771551
Showing 1 changed file with 11 additions and 5 deletions.
16 changes: 11 additions & 5 deletions src/main/scala/za/co/absa/abris/avro/AvroSerDeWithKeyColumn.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.security.InvalidParameterException
import org.apache.avro.Schema
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.streaming.DataStreamReader
import org.apache.spark.sql.types.{BinaryType, StringType, StructField, StructType}
import org.apache.spark.sql.types.{BinaryType, MapType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Dataset, Encoder, Row}
import org.slf4j.LoggerFactory
import za.co.absa.abris.avro.format.SparkAvroConversions
Expand Down Expand Up @@ -676,8 +676,12 @@ object AvroSerDeWithKeyColumn {
* This method converts the key into a string by invoking .toString()
*/
private def toAvroWithPlainStringKey(rows: Dataset[Row], valueSchemas: SchemasProcessor)(keySchemaId: Option[Int], valueSchemaId: Option[Int]) = {

val resultingRowSchema = StructType(List(StructField(KEY_COLUMN_NAME, StringType, false),StructField(VALUE_COLUMN_NAME, BinaryType, false)))
val includeHeader = rows.toDF().schema.exists(_.name == "headers")
val resultingRowSchema =
if (includeHeader)
StructType(List(StructField(KEY_COLUMN_NAME, StringType, false),StructField(VALUE_COLUMN_NAME, BinaryType, false),StructField("headers", MapType(StringType, StringType), false)))
else
StructType(List(StructField(KEY_COLUMN_NAME, StringType, false),StructField(VALUE_COLUMN_NAME, BinaryType, false)))

implicit val recEncoder: Encoder[Row] = AvroToRowEncoderFactory.createRowEncoder(resultingRowSchema)

Expand All @@ -693,8 +697,10 @@ object AvroSerDeWithKeyColumn {
val value = row.get(1).asInstanceOf[Row]

val binaryValueRecord = SparkAvroConversions.rowToBinaryAvro(value, valueSparkSchema, valueAvroSchema, valueSchemaId)

new GenericRowWithSchema(Array(key, binaryValueRecord), resultingRowSchema).asInstanceOf[Row]
if (includeHeader)
new GenericRowWithSchema(Array(key, binaryValueRecord, row.get(2)), resultingRowSchema).asInstanceOf[Row]
else
new GenericRowWithSchema(Array(key, binaryValueRecord), resultingRowSchema).asInstanceOf[Row]
}
)
})
Expand Down

0 comments on commit a771551

Please sign in to comment.