In [1]:
from avro.datafile import DataFileReader
from avro.io import DatumReader, BinaryDecoder
import avro.schema as AvroSchema
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.utils import getResolvedOptions
import boto3
from botocore.client import Config
from datetime import datetime, date, timedelta
from io import BytesIO
from pyspark.context import SparkContext
from pyspark.sql.functions import max, min, sum, avg, countDistinct, lit, to_timestamp 
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, DateType
import pytz
import sys

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,,pyspark,idle,,,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
session = boto3.Session()
s3 = session.resource(
    's3',
    endpoint_url='http://minio:9000',
    config=Config(signature_version='s3v4')
)
glue_client = session.client('glue')
kafka_bucket = s3.Bucket('kafka-messages')

url = "jdbc:postgresql://postgres:5432/postgres"
properties = {
    "user" : "admin",
    "password" : "admin"
}

schema_file = "schema.avsc"

book_fields = {
    'string': ['id', 'name', 'author','category'], 
    'double': ['quantity', 'price', 'final_price']
}

book_schema = StructType(
    [
        StructField("report_time",DateType(),True),
        StructField("book_id",StringType(),True),
        StructField("name",StringType(),True),
        StructField("author",StringType(),True),
        StructField("category",StringType(),True),
        StructField("quantity",DoubleType(),True),
        StructField("price",DoubleType(),True),
        StructField("final_price",DoubleType(),True),
        StructField("created_at",DateType(),True),
        StructField("updated_at",DateType(),True),
        StructField("deleted_at",DateType(),True),
    ]
)

book_table_name = 'books'
book_key = 'id'

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Kafka messages reader

In [23]:
class KafkaMessagesReader:
    def __init__(self, bucket, schema_file):
        self.bucket = bucket
        self.schema = AvroSchema.Parse(open(schema_file, "r").read())
        self.reader = DatumReader(self.schema)
        
    def read_avro_message(self, message):
        avro_bytes = BytesIO(message)
        avro_bytes.seek(5)
        decoder = BinaryDecoder(avro_bytes)
        return self.reader.read(decoder)
    
    def get_all_messages(self):
        return [message for message in self.bucket.objects.all()]
    
    def get_new_messages(self):
        # Get yesterday messages
        today = datetime.now(pytz.timezone('Asia/Ho_Chi_Minh')).date()
        return [message for message in self.bucket.objects.all() if message.last_modified.replace(tzinfo = None) > datetime(today.year, today.month, today.day - 1, tzinfo = None)]
    
    def get_new_message_contents(self):
        kafka_messages = self.get_new_messages()
        return [self.read_avro_message(kafka_message.get()['Body'].read()) for kafka_message in kafka_messages]

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Book statistics

In [8]:
class BaseActionStatistics:
    def __init__(self, fields, messages, key, report_time, data):
        self.fields = fields
        self.messages = messages
        self.key = key
        self.report_time = report_time
        self.data = data
    
    def get_field_index(self, field):
        return [index for index, value in enumerate(self.fields.get('double')) if value.get('field') == field][0]+len(self.fields.get('string'))+1
    
    def get_created_message_date(self, message):
        return datetime.strptime(message.get('date'), '%a %b %d %Y').date()
    
    def generate_string_statistics_item_field(self, message):
        values = [self.report_time]
        values.extend([message.get(field) for field in self.fields.get('string')])
        values.extend([float(message.get(field)) for field in self.fields.get('double')])
        return values
    
    def generate_time_statistics_item_field(self, message):
        return [None, None, None]
    
    def generate_statistics_item(self, message):
        item = self.generate_string_statistics_item_field(message)
        item.extend(self.generate_time_statistics_item_field(message))
        return item
        
    def generate(self):
        for message in self.messages:
            self.data.update({message.get(self.key):self.generate_statistics_item(message)})
        return self.data
    
class CreateActionStatistics(BaseActionStatistics):
    def __init__(self, fields, messages, key, report_time, data):
        create_messages = [message for message in messages if message['action'] == 'CREATE']
        super().__init__(fields, create_messages, key, report_time, data)
    
    def generate_time_statistics_item_field(self, message):
        return [self.get_created_message_date(message), None, None]
    
class UpdateActionStatistics(BaseActionStatistics):
    def __init__(self, fields, messages, key, report_time, data):
        update_messages = [message for message in messages if message['action'] == 'UPDATE']
        super().__init__(fields, update_messages, key, report_time, data)
        
    def generate_time_statistics_item_field(self, message):
        created_at = self.data.get(message.get(self.key))[-3] if self.data.get(message.get(self.key)) else None
        return [created_at, self.get_created_message_date(message), None]
    
class DeleteActionStatistics(BaseActionStatistics):
    def __init__(self, fields, messages, key, report_time, data):
        delete_messages = [message for message in messages if message['action'] == 'DELETE']
        super().__init__(fields, delete_messages, key, report_time, data)
        
    def generate_statistics_item(self, message):
        item = self.data.get(message.get(self.key))
        item[-1] = self.get_created_message_date(message)
        return self.data.get(message.get(self.key))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [18]:
class BookStatistics:
    def __init__(self, fields, schema, table, key, messages, mode, report_time):
        self.fields = fields
        self.schema = schema
        self.table = table
        self.key = key
        self.messages = messages
        self.mode = mode
        self.report_time = report_time
        self.data = self.get_exist_data(table) if mode == "append" else dict()
        
    def get_latest_data(self, table):
        yesterday = datetime.now(pytz.timezone('Asia/Ho_Chi_Minh')).date() - timedelta(days=1)
        records = spark.read.jdbc(url, table, properties=properties)
        return records.filter(records.report_time==yesterday).collect()
        
    def get_exist_data(self, table):
        data = dict()
        records = self.get_latest_data(table)
        for item in records:
            values = [datetime.now(pytz.timezone('Asia/Ho_Chi_Minh'))]
            values.extend([value for value in item.asDict().values()][1:])
            data.update({item.book_id:values})
        return data
        
    def generate(self):        
        self.data = CreateActionStatistics(self.fields, self.messages, self.key, self.report_time, self.data.copy()).generate()
        self.data = UpdateActionStatistics(self.fields, self.messages, self.key, self.report_time, self.data.copy()).generate()
        self.data = DeleteActionStatistics(self.fields, self.messages, self.key,self.report_time, self.data.copy()).generate()
        data = [self.data[item] for item in self.data]
        df = spark.createDataFrame(data=data,schema=self.schema)
        df.write.jdbc(url=url, table=self.table, mode=self.mode, properties=properties)
        df.show(truncate=False)
        return df

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Category statistics

In [19]:
class CategoryStatistics:
    def __init__(self, book_df, mode):
        self.book_df = book_df.filter("deleted_at is null")
        self.mode = mode
        
    def generate(self):
        category_df = self.book_df.groupBy('report_time', 'category').agg(
            countDistinct('book_id').alias('books'),
            countDistinct('author').alias('authors'),
            sum('price').alias('price'),
            avg('price').alias('average_price'),
            sum('final_price').alias('final_price'),
            avg('final_price').alias('average_final_price'),
            sum('quantity').alias('quantity'),
        ).sort('category')
        category_df.write.jdbc(url=url, table='category_reports', mode=self.mode, properties=properties)
        category_df.show(truncate=False)
        return category_df

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Author statistics

In [20]:
class AuthorStatistics:
    def __init__(self, book_df, mode):
        self.book_df = book_df.filter("deleted_at is null")
        self.mode = mode
        
    def generate(self):
        author_df = self.book_df.groupBy('report_time', 'author', 'category').agg(
            countDistinct('book_id').alias('books'),
            sum('price').alias('price'),
            avg('price').alias('average_price'),
            sum('final_price').alias('final_price'),
            avg('final_price').alias('average_final_price'),
            sum('quantity').alias('quantity')
        ).sort('author')
        author_df.write.jdbc(url=url, table='author_reports', mode=self.mode, properties=properties)
        author_df.show(truncate=False)
        return author_df

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Report

In [21]:
class Report:
    def __init__(self, bucket, schema_file, book_fields, book_schema, table="book_reports", book_key="id"):
        self.reader = KafkaMessagesReader(bucket, schema_file)
        self.book_fields = book_fields
        self.book_schema = book_schema        
        self.table = table
        self.book_key = book_key
        self.mode = self.get_mode()
        
    def get_mode(self):
        mode = "overwrite" if self.reader.get_new_messages() == self.reader.get_all_messages() else "append"
        return mode
        
    def report(self, book_df):
        book_df = book_df.filter("deleted_at is null")
        report_df = book_df.groupBy('report_time').agg(
            countDistinct('book_id').alias('books'),
            countDistinct('author').alias('authors'),
            countDistinct('category').alias('categories'),
            sum('price').alias('price'),
            avg('price').alias('average_price'),
            sum('final_price').alias('final_price'),
            avg('final_price').alias('average_final_price'),
            sum('quantity').alias('quantity'),
            min('final_price').alias('cheapest'),
            max('final_price').alias('most_expensive')
        ).sort('report_time')
        report_df.write.jdbc(url=url, table='reports', mode=self.mode, properties=properties)
        return report_df
        
    def generate(self):
        report_time = datetime.now(pytz.timezone('Asia/Ho_Chi_Minh'))
        book_df = BookStatistics(self.book_fields, self.book_schema, self.table, self.book_key, self.reader.get_new_message_contents(), self.mode, report_time).generate()
        category_df = CategoryStatistics(book_df, self.mode).generate()
        author_df = AuthorStatistics(book_df, self.mode).generate()
        return self.report(book_df)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [22]:
report_df = Report(kafka_bucket, schema_file, book_fields, book_schema).generate()
report_df.printSchema()
report_df.show(truncate=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+------------------------+-----------------------------+-----------------------------+--------+--------+-----+-----------+----------+----------+----------+
|report_time|book_id                 |name                         |author                       |category|quantity|price|final_price|created_at|updated_at|deleted_at|
+-----------+------------------------+-----------------------------+-----------------------------+--------+--------+-----+-----------+----------+----------+----------+
|2023-07-27 |64ba0673fa834134be56ffd4|Test new kafka message schema|Test new kafka message schema|Comedy  |100.0   |60.5 |60.5       |2023-07-21|null      |null      |
|2023-07-27 |64ba067ffa834134be56ffd6|Test new kafka message schema|Test new kafka message schema|Sport   |100.0   |100.0|100.0      |2023-07-21|null      |null      |
|2023-07-27 |64ba068bfa834134be56ffd8|Test new kafka message schema|Test new kafka message schema|Sport   |10.0    |79.0 |79.0       |2023-07-21|null      |null

#### AWS Glue

In [10]:
class GlueJob:
    def __init__(self):
        self.context = GlueContext(SparkContext.getOrCreate())
        self.job = Job(self.context)
        args = self.get_args()
        self.job_name = args['JOB_NAME'] if 'JOB_NAME' in args else 'test'
        self.job.init(self.job_name, args)
    
    def get_args(self):
        params = ['JOB_NAME'] if '--JOB_NAME' in sys.argv else []
        return getResolvedOptions(sys.argv, params)
    
    def run(self):
        if self.job_name == 'report':
            report_df = Report(kafka_bucket, schema_file, book_fields, book_schema).generate()
            report_df.printSchema()
            report_df.show(truncate=False)
        self.job.commit()
    
if __name__ == '__main__':
    GlueJob().run()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…