Skip to content

Commit

Permalink
More change for parquet support
Browse files Browse the repository at this point in the history
  • Loading branch information
pondzix committed Jul 10, 2023
1 parent 78a3a70 commit ef87621
Show file tree
Hide file tree
Showing 7 changed files with 141 additions and 454 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,27 +40,35 @@ import java.util.TimeZone
import scala.jdk.CollectionConverters._
import fs2.io.file.Path

import scala.annotation.nowarn

object ParquetUtils {

val config = ValueCodecConfiguration(TimeZone.getTimeZone(ZoneOffset.UTC))

def readParquetColumns(path: Path): Map[File, List[ColumnDescriptor]] = {
val conf = new Configuration();
val parquetFileFilter = new FileFilter {
override def accept(pathname: File): Boolean = pathname.toString.endsWith(".parquet")
}

new File(path.toString)
.listFiles(parquetFileFilter)
.map { parquetFile =>
@annotation.nowarn("cat=deprecation")
val parquetMetadata = ParquetFileReader.readFooter(conf, new HadoopPath(parquetFile.toString), ParquetMetadataConverter.NO_FILTER)
val columns = parquetMetadata.getFileMetaData.getSchema.getColumns.asScala.toList
(parquetFile, columns)
(parquetFile, readFileColumns(parquetFile))
}
.toMap
}

@nowarn("cat=deprecation")
def readFileColumns(parquetFile: File): List[ColumnDescriptor] =
ParquetFileReader
.readFooter(new Configuration(), new HadoopPath(parquetFile.toString), ParquetMetadataConverter.NO_FILTER)
.getFileMetaData
.getSchema
.getColumns
.asScala
.toList

def extractColumnsFromSchemaString(schema: String) =
MessageTypeParser
.parseMessageType(schema)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package com.snowplowanalytics.snowplow.rdbloader.transformer.stream.kafka.experimental
import cats.effect.IO
import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.ParquetUtils
import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.kafka.experimental.TransformerSpecification.{Blob, DataRow}
import fs2.io.file.{Files, Path}
import io.circe.parser

import java.nio.charset.StandardCharsets

object OutputDataRowReader {

def fromJson(blob: Blob): IO[List[DataRow]] =
fs2.Stream
.emits[IO, Byte](blob)
.through(fs2.text.decodeWithCharset(StandardCharsets.UTF_8))
.through(fs2.text.lines)
.map(parser.parse(_).right.get)
.compile
.toList

// For parquet we fetch all bytes from remote blob storage and store them in the temporary local output.
// Then we use hadoop API (details in the `ParquetUtils`) to decode it and convert to human-readable JSON format.
def fromParquet(blob: Blob): IO[List[DataRow]] =
Files[IO].tempFile
.use { tempOutput =>
for {
_ <- saveParquetDataToTemporaryOutput(tempOutput, blob)
outputParquetColumns = ParquetUtils.readFileColumns(tempOutput.toNioPath.toFile)
parquetRows <- ParquetUtils.readParquetRowsAsJsonFrom(tempOutput, outputParquetColumns)
} yield parquetRows
}

private def saveParquetDataToTemporaryOutput(outputPath: Path, blob: Blob): IO[Unit] =
fs2.Stream
.emits(blob)
.through(Files[IO].writeAll(outputPath))
.compile
.drain

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@ import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage
import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.TypesInfo
import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.TypesInfo.WideRow.WideRowFormat
import com.snowplowanalytics.snowplow.rdbloader.common.cloud.BlobStorage
import com.snowplowanalytics.snowplow.rdbloader.common.cloud.BlobStorage.Folder
import com.snowplowanalytics.snowplow.rdbloader.common.cloud.BlobStorage.{BlobObject, Folder}
import com.snowplowanalytics.snowplow.rdbloader.common.config.TransformerConfig.Compression
import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.kafka.experimental.TransformerSpecification._
import fs2.Stream
import fs2.compression.{Compression => FS2Compression}
import fs2.concurrent.Signal.SignalOps
import fs2.concurrent.{Signal, SignallingRef}
import io.circe.Json
import org.specs2.matcher.MatchResult
import org.specs2.mutable.Specification

Expand Down Expand Up @@ -146,8 +147,8 @@ abstract class TransformerSpecification extends Specification with AppDependenci
private def readWindowOutput(blobClient: BlobStorage[IO])(message: LoaderMessage.ShreddingComplete): IO[WindowOutput] =
for {
scMessageInStorage <- readSCMessageFromBlobStorage(message, blobClient)
transformedEvents <- readDataFrom(scMessageInStorage.base.append("output=good"), blobClient)
badEvents <- readDataFrom(scMessageInStorage.base.append("output=bad"), blobClient)
transformedEvents <- readDataRowsFromFolder(scMessageInStorage.base.append("output=good"), blobClient)
badEvents <- readDataRowsFromFolder(scMessageInStorage.base.append("output=bad"), blobClient)
} yield {

message must beEqualTo(scMessageInStorage)
Expand All @@ -168,35 +169,49 @@ abstract class TransformerSpecification extends Specification with AppDependenci
WindowOutput(appID, parsedTime, message, transformedEvents, badEvents)
}

private def readDataFrom(
private def readDataRowsFromFolder(
folder: Folder,
blobClient: BlobStorage[IO]
): IO[List[FileData]] =
): IO[List[DataRow]] =
blobClient
.list(folder, recursive = false)
.evalMap(blob => readSingleBlob(blobClient, blob))
.evalMap(blob => readDataRowsFromBlob(blobClient, blob))
.flatMap(dataRows => Stream.emits(dataRows))
.compile
.toList

private def readSingleBlob(
private def readDataRowsFromBlob(
blobClient: BlobStorage[IO],
blob: BlobStorage.BlobObject
): IO[FileData] =
blob: BlobObject
): IO[List[DataRow]] =
blobClient
.getBytes(blob.key)
.through(decompressIfNeeded)
.through(decompressIfNeeded(blob.key))
.compile
.to(Array)
.flatMap(convertBlobToListOfRows)

private def decompressIfNeeded: fs2.Pipe[IO, Byte, Byte] =
private def convertBlobToListOfRows(blob: Blob): IO[List[DataRow]] =
requiredAppConfig.fileFormat match {
case WideRowFormat.JSON => OutputDataRowReader.fromJson(blob)
case WideRowFormat.PARQUET => OutputDataRowReader.fromParquet(blob)
}

// Decompress only for:
// - JSON good/bad output
// - Parquet bad output
// Decompression for parquet good data is handled by hadoop API in parquet-oriented scenarios
private def decompressIfNeeded(blobKey: BlobStorage.Key): fs2.Pipe[IO, Byte, Byte] =
requiredAppConfig.compression match {
// decompress only for JSON output, for parquet it's handled by hadoop client in parquet-oriented scenarios
case Compression.Gzip if requiredAppConfig.fileFormat == WideRowFormat.JSON =>
case Compression.Gzip if !isBlobGoodParquetData(blobKey) =>
FS2Compression[IO].gunzip().andThen(_.flatMap(_.content))
case _ =>
identity
}

private def isBlobGoodParquetData(blobKey: BlobStorage.Key): Boolean =
requiredAppConfig.fileFormat == WideRowFormat.PARQUET && blobKey.contains("output=good")

private def readSCMessageFromBlobStorage(
message: LoaderMessage.ShreddingComplete,
blobClient: BlobStorage[IO]
Expand Down Expand Up @@ -224,7 +239,8 @@ abstract class TransformerSpecification extends Specification with AppDependenci

object TransformerSpecification {

type FileData = Array[Byte]
type Blob = Array[Byte]
type DataRow = Json
type DataAssertion = AggregatedData => MatchResult[Any]

final case class CountExpectations(good: Int, bad: Int) {
Expand All @@ -247,13 +263,13 @@ object TransformerSpecification {
appId: String,
producedAt: LocalDateTime,
`shredding_complete.json`: LoaderMessage.ShreddingComplete,
goodEvents: List[FileData],
badEvents: List[FileData]
goodEvents: List[DataRow],
badEvents: List[DataRow]
)

final case class AggregatedData(
good: List[FileData],
bad: List[FileData],
good: List[DataRow],
bad: List[DataRow],
types: List[TypesInfo.WideRow.Type]
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@ import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.kafka.experim
AzureTransformerSpecification,
InputBatch
}
import io.circe.parser

import java.nio.charset.StandardCharsets

class BadDetailsScenario extends AzureTransformerSpecification {

Expand All @@ -21,7 +18,7 @@ class BadDetailsScenario extends AzureTransformerSpecification {
override def countExpectations = CountExpectations(good = 0, bad = 1)

override def customDataAssertion = Some { outputData =>
val badRow = parser.parse(new String(outputData.bad.head, StandardCharsets.UTF_8)).right.get
val badRow = outputData.bad.head
badRow.hcursor.get[String]("schema").right.get must beEqualTo(
"iglu:com.snowplowanalytics.snowplow.badrows/loader_parsing_error/jsonschema/2-0-0"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.kafka.experim
}
import io.circe.parser

import java.nio.charset.StandardCharsets

class JsonOutputDetailsScenario extends AzureTransformerSpecification {

private val goodEvent = Content.TextLines(
Expand All @@ -24,7 +22,7 @@ class JsonOutputDetailsScenario extends AzureTransformerSpecification {
override def countExpectations = CountExpectations(good = 1, bad = 0)

override def customDataAssertion = Some { outputData =>
val transformedEvent = parser.parse(new String(outputData.good.head, StandardCharsets.UTF_8)).right.get
val transformedEvent = outputData.good.head
val expectedEvent = parser
.parse("""
|{
Expand Down
Loading

0 comments on commit ef87621

Please sign in to comment.