In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql.functions import *
from pyspark.sql.types import *
import base64

In [2]:
spark = SparkSession.builder \
    .appName("KafkaSparkStreaming") \
    .master("local[*]") \
    .config("spark.streaming.stopGracefullyOnShutdown","true") \
    .config("spark.jars.packeges","org.apache.spark:spark-sql-kafka-0-10_2.13:3.1.2") \
    .config("spark.sql.warehouse.dir", "/user/hive/warehouse") \
     .config("hive.metastore.uris", "thrift://your-hive-metastore-uri:port") \
    .enableHiveSupport() \
    .getOrCreate()

In [4]:
# Kafka configs
kafka_input_config = {
    "kafka.bootstrap.servers": "localhost:9092",  # Adjust as necessary
    "subscribe": "fraudtraindata",  # Your topic name
    "startingOffsets": "latest",
    "failOnDataLoss": "false"
}

In [5]:
schema = StructType([
    StructField("trans_date_trans_time", StringType(), True),
    StructField("cc_num", StringType(), True),
    StructField("merchant", StringType(), True),
    StructField("category", StringType(), True),
    StructField("amt", DoubleType(), True),
    StructField("first", StringType(), True),
    StructField("last", StringType(), True),
    StructField("gender", StringType(), True),
    StructField("street", StringType(), True),
    StructField("city", StringType(), True),
    StructField("state", StringType(), True),
    StructField("zip", StringType(), True),
    StructField("lat", DoubleType(), True),
    StructField("long", DoubleType(), True),
    StructField("city_pop", IntegerType(), True),
    StructField("job", StringType(), True),
    StructField("dob", StringType(), True),
    StructField("trans_num", StringType(), True),
    StructField("unix_time", LongType(), True),
    StructField("merch_lat", DoubleType(), True),
    StructField("merch_long", DoubleType(), True),
    StructField("is_fraud", IntegerType(), True)
])

In [6]:
def hiveInsert(df, epoch_id):
        df.write.mode("append").insertInto("fraud_detection.streamtransactions")

In [7]:
df = spark \
    .readStream \
    .format("kafka") \
    .options(**kafka_input_config) \
    .load()

In [8]:
value_df = df.selectExpr("CAST(value AS STRING) as value")

json_df = value_df.withColumn("value", from_json(col("value"), schema))

flattened_df = json_df.select("value.*") \
    .withColumn("trans_date_trans_time", to_timestamp("trans_date_trans_time", "yyyy-MM-dd HH:mm:ss")) \
    .withColumn("cc_num", col("cc_num").cast("BIGINT")) \
    .withColumn("amt", col("amt").cast("DECIMAL(20,15)")) \
    .withColumn("lat", col("lat").cast("DECIMAL(9,6)")) \
    .withColumn("long", col("long").cast("DECIMAL(9,6)")) \
    .withColumn("merch_lat", col("merch_lat").cast("DECIMAL(20,13)")) \
    .withColumn("merch_long", col("merch_long").cast("DECIMAL(20,13)")) \
    .withColumn("dob", col("dob").cast("DATE"))

In [None]:
myStream = flattened_df.writeStream \
    .foreachBatch(hiveInsert) \
    .option("checkpointLocation", "/Data/hive_checkpoint") \
    .outputMode("append") \
    .start()
myStream.awaitTermination()