In [1]:
from pyspark.sql import SparkSession

sparkPackages = [
    "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1",
    "org.apache.spark:spark-avro_2.12:3.5.1",
    "org.apache.spark:spark-streaming-kafka-0-10_2.12:3.5.1",
    "org.apache.kafka:kafka-clients:2.8.2",
    "org.apache.kafka:kafka_2.13:2.8.2"
]
spark = (
    SparkSession.builder.appName("ingest-kafka-data")
    .config('spark.jars.packages', ",".join(sparkPackages))
).getOrCreate()

In [2]:
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "kafka:9092") \
  .option("subscribe", "sv-uploads-default-topic") \
  .load()

In [18]:
stream = (
    df
    .writeStream
    .outputMode("append")
    .format("parquet")
    .option("checkpointLocation", "/tmp/")
    .toTable("default.kafka_stream_events")
)

In [19]:
stream.processAllAvailable()

StreamingQueryException: [STREAM_FAILED] Query [id = 3d22317a-8455-45e7-9765-b30b46da7cb9, runId = 4261f9a5-b1ef-42ca-ab69-dd49ca8611c1] terminated with exception: Failed to create new KafkaAdminClient

In [17]:
spark.sql("select * from default.kafka_stream_events").show()

+---+-----+-----+---------+------+---------+-------------+
|key|value|topic|partition|offset|timestamp|timestampType|
+---+-----+-----+---------+------+---------+-------------+
+---+-----+-----+---------+------+---------+-------------+



In [41]:
import org.apache.spark.sql.avro._
import java.nio.file.{Files, Paths}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions._
import scala.collection.mutable.WrappedArray
import scala.collection.immutable.Map

In [42]:
val kafka_topic = "sv-uploads-default-topic"
if (kafka_topic == "") {
      throw new IllegalStateException("topic variable is empty!");
}

val kafka_brokers = "kafka:9092"
val kafka_from = "earliest"
val kafka_maxoffsets = "10000"
val kafka_maxfiles = "1"
val s3_bucket = "eventsgateway-local"

val jsonFormatSchema : String = """{
  "namespace": "com.tfgco.eventsgateway",
  "type": "record",
  "name": "Event",
  "fields": [
    {
      "name": "id",
      "type": "string"
    },
    {
      "name": "name",
      "type": "string"
    },
    {
      "name": "props",
      "default": {},
      "type": {
        "type": "map",
        "values": "string"
      }
    },
    {
      "name": "serverTimestamp",
      "type": "long"
    },
    {
      "name": "clientTimestamp",
      "type": "long"
    }
  ]
}"""

kafka_topic = sv-uploads-default-topic
kafka_brokers = kafka:9092
kafka_from = earliest
kafka_maxoffsets = 10000
kafka_maxfiles = 1
s3_bucket = eventsgateway-local
environment = development
jsonFormatSchema = 


{
  "namespace": "com.tfgco.eventsgateway",
  "type": "record",
  "name": "Event",
  "fields": [
    {
      "name": "id",
      "type": "string"
    },
    {
      "name": "name",
      "type": "string"
    },
    {
      "name": "props",
      "default": {},
      "type": {
        "type": "map",
        "values": "string"
      }
    },
    {
      "name": "serverTimestamp",
      "type": "long"
    },
    {
      "name": "clientTimestamp",
      "type": "long"
    }
  ]
}


In [43]:
val input_df = spark
  .readStream
  .format("kafka")
  .option("maxFilesPerTrigger", kafka_maxfiles)
  .option("kafka.bootstrap.servers", kafka_brokers)
  .option("subscribe", kafka_topic)
  .option("startingOffsets", kafka_from)
  .option("maxOffsetsPerTrigger", kafka_maxoffsets)
  .load()

input_df = [key: binary, value: binary ... 5 more fields]


[key: binary, value: binary ... 5 more fields]

In [44]:
input_df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [45]:
val df = input_df
    .withColumn("event", from_avro($"value", jsonFormatSchema))
    .withColumn("id", $"event.id")
    .withColumn("name", $"event.name")
    .withColumn("props", $"event.props")
    .withColumn("clienttimestamp", $"event.clientTimestamp")
    .withColumn("servertimestamp", $"event.serverTimestamp")
    .withColumn("date", to_date(from_unixtime($"event.clientTimestamp" / 1000)))
    .withColumn("year", date_format($"date", "YYYY"))
    .withColumn("month", date_format($"date", "MM"))
    .withColumn("day", date_format($"date", "dd"))
    .select("id", "name", "props", "clienttimestamp", "servertimestamp", "year", "month", "day")

df = [id: string, name: string ... 6 more fields]


[id: string, name: string ... 6 more fields]

In [46]:
val hadoop_conf = spark.sparkContext.hadoopConfiguration
hadoop_conf.set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
// hadoop_conf.set("fs.s3a.endpoint", "http://localstack:4572/eventsgateway-local")
// hadoop_conf.set("fs.s3a.path.style.access", "true")
hadoop_conf.set("fs.s3a.connection.ssl.enabled", "false")
hadoop_conf.set("fs.s3a.access.key", "dummy")
hadoop_conf.set("fs.s3a.secret.key", "dummy")
hadoop_conf.set("fs.s3a.proxy.host", "eventsgatewaylocalproxy")
hadoop_conf.set("fs.s3a.proxy.port", "80")

hadoop_conf = Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml, __spark_hadoop_conf__.xml


Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml, __spark_hadoop_conf__.xml

In [48]:
val query = df
    .writeStream
    .format("orc")
    .outputMode("append")
    .option("path", s"s3a://$s3_bucket/var/standard/$kafka_topic/data/")
    .option("checkpointLocation", "/tmp/checkpoints")
    // .option("checkpointLocation", s"s3a://$s3_bucket/var/standard/checkpoints/$kafka_topic/") // databricks only
    .partitionBy("year", "month", "day")
    .start()

query = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@704f7333


org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@704f7333

In [50]:
val dbname = s3_bucket.replace('-', '_')
val tablename = kafka_topic.replace('-', '_')

// RUN THIS INSIDE HIVE ITSELF THROUGH `ocker exec -it hive_hive-server_1 sh -c "/opt/hive/bin/beeline -u jdbc:hive2://localhost:10000"`

println(s"""CREATE DATABASE IF NOT EXISTS $dbname LOCATION 's3a://$s3_bucket/var/standard/databases/' """)
println()
println(s"""CREATE EXTERNAL TABLE IF NOT EXISTS `$dbname.$tablename` (`id` string, `name` string, `props` map<string,string>, `servertimestamp` bigint, `clienttimestamp` bigint) PARTITIONED BY (`year` STRING, `month` STRING, `day` STRING) STORED AS ORC LOCATION 's3a://$s3_bucket/var/standard/$kafka_topic/data/';""")

CREATE DATABASE IF NOT EXISTS eventsgateway_local LOCATION 's3a://eventsgateway-local/var/standard/databases/' 

CREATE EXTERNAL TABLE IF NOT EXISTS `eventsgateway_local.sv_uploads_default_topic` (`id` string, `name` string, `props` map<string,string>, `servertimestamp` bigint, `clienttimestamp` bigint) PARTITIONED BY (`year` STRING, `month` STRING, `day` STRING) STORED AS ORC LOCATION 's3a://eventsgateway-local/var/standard/sv-uploads-default-topic/data/';


dbname = eventsgateway_local
tablename = sv_uploads_default_topic


sv_uploads_default_topic