In [1]:
! pip install datetime

Collecting datetime
  Downloading DateTime-4.3-py2.py3-none-any.whl (60 kB)
[K     |████████████████████████████████| 60 kB 3.4 MB/s eta 0:00:011
[?25hCollecting zope.interface
  Downloading zope.interface-5.4.0-cp39-cp39-manylinux2010_x86_64.whl (255 kB)
[K     |████████████████████████████████| 255 kB 10.4 MB/s eta 0:00:01
Installing collected packages: zope.interface, datetime
Successfully installed datetime-4.3 zope.interface-5.4.0


In [39]:
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import split, col, lit, window, sum, count, round, max, avg, broadcast
from pyspark.sql.window import Window
from time import sleep
from datetime import datetime 

sparkConf = SparkConf()
sparkConf.setMaster("spark://spark-master:7077")
sparkConf.setAppName("Assignment_2")
sparkConf.set("spark.driver.memory", "2g")
sparkConf.set("spark.executor.cores", "1")
sparkConf.set("spark.driver.cores", "1")

date = datetime.now().strftime("%m%d%M")

# create the spark session, which is the entry point to Spark SQL engine.
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()

# Setup hadoop fs configuration for schema gs://
conf = spark.sparkContext._jsc.hadoopConfiguration()
conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
conf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")

# Use the Cloud Storage bucket for temporary BigQuery export data used by the connector.
bucket = "dejads_temp_yk"
spark.conf.set('temporaryGcsBucket', bucket)

# Read the whole dataset as a batch
df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka1:9093") \
        .option("subscribe", "records") \
        .option("startingOffsets", "earliest") \
        .option("failOnDataLoss", 'False') \
        .load()

# split the value
records = df.selectExpr("CAST(value AS STRING)").select(split(col('value'), ",").alias('splitted'))

# define the schema in this way
new_df =  records.selectExpr('splitted[0] as id', 
                             'cast(splitted[1] as timestamp) as event_time',
                             'cast(splitted[2] as long) as cc_num',
                             'splitted[3] as merchant',
                             'splitted[4] as category',
                             'cast(splitted[5] as double) as amt',
                             'splitted[6] as first',
                             'splitted[7] as last',
                             'splitted[8] as gender',
                             'splitted[9] as street',
                             'splitted[10] as city',
                             'splitted[11] as state',
                             'splitted[12] as zip',
                             'splitted[13] as lat',
                             'splitted[14] as long',
                             'cast(splitted[15] as long) as city_pop',
                             'splitted[16] as job',
                             'splitted[17] as dob',
                             'splitted[18] as trans_num',
                             'cast(splitted[19] as long) as unix_time',
                             'splitted[20] as merch_lat',
                             'splitted[21] as merch_long',
                             'cast(splitted[22] as int) as is_fraud')

new_df = new_df.select('event_time', 'category', 'amt', 'gender', 'is_fraud')

# filter out the fraud
#new_df = new_df.where(col('is_fraud') == 1)

window_5min = window(col('event_time'), '5 minutes').alias('time_slot')
sdf = new_df.groupBy(window_5min, 'gender', 'category') \
            .agg(sum('amt').alias('amt'), count('*').alias('count'))

def defined_for_each_batch_function(df, epoch_id):
    temp_df = df.groupBy(col('time_slot').alias('t_time_slot'), 
                      col("gender").alias("t_gender"))\
             .agg(max('amt').alias('max_amt'))
    join_exp = (df['time_slot'] == temp_df['t_time_slot']) & \
               (df['gender'] == temp_df['t_gender']) & \
               (df['amt'] == temp_df['max_amt'])
    df = df.join(broadcast(temp_df), join_exp)\
         .drop('t_gender')\
         .drop('t_time_slot')\
         .drop('max_amt')
    
    df.select('time_slot', 'gender', 'category',
              round('amt', 2).alias('amt'), 'count') \
      .writeStream \
      .queryName("most_fraud_table") \
      .format("memory").outputMode("complete") \
      .start()
    
    spark.sql("SELECT * FROM most_fraud_table").show(truncate=False)
    
    df = df.select(df.time_slot.start.cast('string').alias('start_time'), 
                           df.time_slot.end.cast('string').alias('end_time'), 
                           'gender', 'category',
                           'amt', 'count')
    
    # Saving the data to BigQuery
    df.write.format('bigquery') \
       .option('table', 'jads-de-2021.assignment_2.streaming_table') \
       .mode("overwrite") \
       .save()
            
# Write to a sink - here, the output is memory (only for testing). 
activityQuery = sdf.select('time_slot', 
                           'gender', 'category',
                           round('amt', 2).alias('amt'), 'count') \
                   .writeStream \
                   .foreachBatch(defined_for_each_batch_function) \
                   .outputMode("complete") \
                   .start()

try:
    activityQuery.awaitTermination()
except KeyboardInterrupt:
    activityQuery.stop()
    # Stop the spark context
    spark.stop()
    print("Stoped the streaming query and the spark context")

Stoped the streaming query and the spark context


In [40]:
spark.stop()