In [1]:
import pyspark
from delta import *
from confluent_kafka.schema_registry import SchemaRegistryClient
from pyspark.sql.avro.functions import from_avro
from pyspark.sql.functions import expr

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-avro_2.12:3.0.1,org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1,org.apache.spark:spark-streaming-kafka-0-10_2.12:3.0.1,io.delta:delta-core_2.12:0.8.0,com.amazonaws:aws-java-sdk:1.11.950,org.apache.hadoop:hadoop-aws:3.2.0,net.java.dev.jets3t:jets3t:0.9.4 pyspark-shell'

builder = pyspark.sql.SparkSession.builder \
    .appName("kafka-delta-app") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("fs.s3a.path.style.access", True) \
    .config("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("com.amazonaws.services.s3.enableV4", True) \
    .config("spark.driver.extraJavaOptions", "-Dcom.amazonaws.services.s3.enableV4=true") \
    .config("fs.s3a.access.key", "crypto-prices") \
    .config("fs.s3a.secret.key", "crypto-prices") \
    .config("fs.s3a.endpoint", "http://minio:9000")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

spark.conf.set("spark.sql.adaptive.enabled", False)

In [3]:
schema_registry_conf = {'url': "http://schema-registry:8081"}

schema_registry_client = SchemaRegistryClient(schema_registry_conf)
schema_response = schema_registry_client.get_latest_version("crypto-prices" + "-value").schema
schema = schema_response.schema_str

kafka_topic_name = "crypto-prices"
kafka_bootstrap_servers = 'kafka:9092'

dsraw = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
        .option("subscribe", kafka_topic_name) \
        .option("startingOffsets", "earliest") \
        .option("failOnDataLoss", "false") \
        .load()

from_avro_options= {"mode":"PERMISSIVE"}

ds = (
  dsraw
  .select(from_avro(expr("substring(value, 6, length(value)-5)"), schema, from_avro_options).alias("value"))
  .selectExpr(
      "value.id", 
      "value.rank", 
      "value.symbol", 
      "value.name", 
      "value.supply", 
      "value.maxSupply", 
      "value.marketCapUsd", 
      "value.volumeUsd24Hr", 
      "value.priceUsd", 
      "value.changePercent24Hr", 
      "value.vwap24Hr", 
      "value.explorer") \
)

delta_table = ds.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/delta/events/_checkpoints/etl-from-json") \
  .start("s3a://crypto-prices/events")

In [5]:
crypto_prices = spark.read.format("delta").load("s3a://crypto-prices/events")
crypto_prices.count()

3500

In [17]:
# Leer y ver en consola en real time el streaming
# stream2 = spark.readStream.format("delta").load("events").writeStream.format("console").start()

22/02/17 21:55:08 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-e040a711-902a-4a3b-9dc7-c0cf872b43fb. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.


-------------------------------------------
Batch: 0
-------------------------------------------
+--------------------+
|               value|
+--------------------+
|{"user_id": 35, "...|
|{"user_id": 94, "...|
|{"user_id": 99, "...|
|{"user_id": 38, "...|
|{"user_id": 24, "...|
|{"user_id": 58, "...|
|{"user_id": 48, "...|
|{"user_id": 69, "...|
|{"user_id": 80, "...|
|{"user_id": 72, "...|
|{"user_id": 91, "...|
|{"user_id": 15, "...|
|{"user_id": 14, "...|
|{"user_id": 44, "...|
|{"user_id": 73, "...|
|{"user_id": 41, "...|
|{"user_id": 25, "...|
|{"user_id": 51, "...|
|{"user_id": 30, "...|
|{"user_id": 25, "...|
+--------------------+
only showing top 20 rows

-------------------------------------------
Batch: 1
-------------------------------------------
+--------------------+
|               value|
+--------------------+
|{"user_id": 54, "...|
+--------------------+

-------------------------------------------
Batch: 2
-------------------------------------------
+--------------