Skip to content

Commit

Permalink
Merge pull request #1 from tikal-fuseday/mybranch
Browse files Browse the repository at this point in the history
Mybranch
  • Loading branch information
yinondn committed Mar 5, 2020
2 parents 8d5edac + 2e86b99 commit d76a2f9
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 0 deletions.
1 change: 1 addition & 0 deletions README.md
Expand Up @@ -18,6 +18,7 @@ See medium post for more details
- compose: Docker-Compose configuration that deploys containers with Debezium stack (Kafka, Zookeepr and Kafka-Connect), reads changes from the source databases and streams them to S3
- voter-processing: Notebook with PySpark code that transforms Debezium messages to INSERT, UPDATE and DELETE operations
- fake_it: For an end-to-end example, a simulator of a voters book application's database with live input
- analytics: a spark job that simulates reading all history versions from delta lake, and then storing the most updated data, for each poll.

## Instructions
### Start up docker compose
Expand Down
56 changes: 56 additions & 0 deletions analytics/pom.xml
@@ -0,0 +1,56 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.tikalk.delta-lake</groupId>
<artifactId>analytics</artifactId>
<version>1.0-SNAPSHOT</version>

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>2.4.5</version>
</dependency>


<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>2.8.0</version>
</dependency>

<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<version>4.4.10</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.6.7</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.6.7</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.6.7</version>
</dependency>

<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-core_2.12</artifactId>
<version>0.5.0</version>
</dependency>
</dependencies>

</project>
27 changes: 27 additions & 0 deletions analytics/src/main/resources/log4j.properties
@@ -0,0 +1,27 @@

# Set everything to be logged to the console
log4j.rootCategory=WARN, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

# Set the default spark-shell log level to WARN. When running the spark-shell, the
# log level for this class is used to overwrite the root logger's log level, so that
# the user can have different defaults for the shell and regular Spark apps.
log4j.logger.org.apache.spark.repl.Main=WARN

# Settings to quiet third party logs that are too verbose
log4j.logger.org.spark-project.jetty=WARN
log4j.logger.org.spark-project.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
log4j.logger.org.apache.parquet=ERROR
log4j.logger.parquet=ERROR

# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR

# Example
log4j.logger.example=DEBUG
66 changes: 66 additions & 0 deletions analytics/src/main/scala/deltalake/AnalyticsSparkJob.scala
@@ -0,0 +1,66 @@
package deltalake

import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import io.delta.tables._
import org.apache.spark.SparkConf


object AnalyticsSparkJob {
def main(args: Array[String]): Unit = {

val conf = new SparkConf()
.setMaster("local[2]")
.set("fs.s3.awsAccessKeyId", sys.props("AWS_ACCESS_KEY_ID"))
.set("fs.s3.awsSecretAccessKey", sys.props("AWS_SECRET_ACCESS_KEY"))
// val conf = new SparkConf().setMaster("local[2]").set("fs.s3.access.key", sys.props("AWS_ACCESS_KEY_ID")).set("fs.s3.secret.key", sys.props("AWS_SECRET_ACCESS_KEY"))

val deltaLakeInputPath = "s3a://delta-data.fuze/voter/silver/"
val deltaLakeOutputPath = "s3a://delta-data.fuze/voter/gold"
val spark = SparkSession.builder
.config(conf)
.appName("Spark SQL basic example")
.getOrCreate()

spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key",sys.props("AWS_ACCESS_KEY_ID"))
spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key",sys.props("AWS_SECRET_ACCESS_KEY"))

import spark.implicits._
val deltaTable = DeltaTable.forPath(spark, deltaLakeInputPath)
val fullHistoryDf = deltaTable.history()
printDf("fullHistoryDf", fullHistoryDf)
val allVersions = fullHistoryDf.select($"version").collect()
for (version <- allVersions) {
val versionedDf = spark.read.format("delta").option("versionAsOf", version.get(0).toString).load(deltaLakeInputPath)
versionedDf.show(false)
// versionedDf.write.partitionBy("pollId").mode(SaveMode.Append).save(deltaLakeOutputPath)


DeltaTable.forPath(spark, deltaLakeOutputPath)
.as("gold")
.merge(
versionedDf.as("updates"),
"gold.pollId = updates.pollId and gold.voterId = updates.voterId and gold.ts < updates.ts")
.whenMatched
.updateExpr(
Map("vote" -> "updates.vote"))
.whenNotMatched
.insertExpr(
Map(
"pollId" -> "updates.pollId",
"voterId" -> "updates.voterId",
"ts" -> "updates.ts",
"vote" -> "updates.vote"))
.execute()
}
}

private def printDf(dfName: String, df: DataFrame, printSchema: Boolean = false): Unit = {
println(s"$dfName's df")
df.show(false)
if (printSchema) {
println(s"$dfName's schema:")
df.printSchema()
}
}

}

0 comments on commit d76a2f9

Please sign in to comment.