In [2]:
%pip install python-schema-registry-client

Defaulting to user installation because normal site-packages is not writeable
Collecting python-schema-registry-client
  Downloading python_schema_registry_client-2.5.0-py3-none-any.whl (22 kB)
Collecting fastavro<2.0.0,>=1.7.3
  Downloading fastavro-1.8.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (2.7 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.7/2.7 MB[0m [31m4.7 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hCollecting httpx<0.25.0,>=0.24.0
  Downloading httpx-0.24.1-py3-none-any.whl (75 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m75.4/75.4 kB[0m [31m11.9 MB/s[0m eta [36m0:00:00[0m
Collecting aiofiles<24.0.0,>=23.1.0
  Downloading aiofiles-23.1.0-py3-none-any.whl (14 kB)
Collecting httpcore<0.18.0,>=0.15.0
  Downloading httpcore-0.17.3-py3-none-any.whl (74 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m74.5/74.5 kB[0m [31m12.4 MB/s[0m eta [36m0:00:00[0m
Collecting h11<0.15,>=0.

In [3]:
import os
import io
import boto3

s3 = boto3.resource('s3',
                    endpoint_url=os.getenv("MINIO_PUBLIC_ENDPOINT"),
                    aws_access_key_id=os.getenv("MINIO_ACCESS_KEY"),
                    aws_secret_access_key=os.getenv("MINIO_SECRET_KEY"),
                    region_name='us-east-1')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
from schema_registry.client import SchemaRegistryClient, schema
from schema_registry.serializers import AvroMessageSerializer

schema_ = {
  "name": "order",
  "type": "record",
  "namespace" : "order",
  "fields": [
    {
      "name": "id",
      "type": "string"
    },
    {
      "name": "userId",
      "type": "string"
    },
    {
      "name": "customerInfo",
      "type": {
        "name": "customerInfoFields",
        "type": "record",
        "fields": [
          {
            "name": "firstName",
            "type": "string"
          },
          {
            "name": "lastName",
            "type": "string"
          },
          {
            "name": "email",
            "type": "string"
          },
          {
            "name": "tel",
            "type": "string"
          },
          {
            "name": "address",
            "type": "string"
          }
        ]
      }
      
    },
    {
      "name": "createdDate",
      "type": "string"
    },
    {
      "name": "status",
      "type": "string"
    },
    {
      "name": "items",
      "type": {
        "type": "array",
        "items": {
          "name": "item",
          "type": "record",
          "fields": [
            {
              "name": "productId",
              "type": "string"
            },
            {
              "name": "quantity",
              "type": "int"
            },
            {
              "name": "price",
              "type": "float"
            },
            {
              "name": "title",
              "type": "string"
            }
          ]
        }
      }
    }
  ]
}

client = SchemaRegistryClient(os.getenv("SCHEMA_REGISTRY_ENDPOINT"))
avro_schema = schema.AvroSchema(schema_)
schema_id = client.register("order", avro_schema)
avro_message_serializer = AvroMessageSerializer(client)

def decode(raw_bytes):
    message_decoded = avro_message_serializer.decode_message(raw_bytes)
    return message_decoded

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
from pyspark.sql import Row

rows = []
bucket_name = "order"
prefix = "new/"
bucket = s3.Bucket(bucket_name)
objects = bucket.objects.filter(Prefix = prefix)
for obj in objects:
    body = obj.get()['Body'].read()
    decoded_body = decode(body)
    if decoded_body:
        body_obj = decoded_body
        rows.append(
            Row(
                action = body_obj['action'],
                book_id = body_obj['id'],
                title = body_obj['title'],
                image = body_obj['image'],
                quantity = body_obj['quantity'],
                price = body_obj['price'],
                description = body_obj['description'],
                categoryId = body_obj['categoryId'],
                userId = body_obj['userId']
            )
        )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
from pyspark.sql import SparkSession
from pyspark.sql import Row

try:
    spark = SparkSession \
        .builder \
        .appName("Tabcorp book store") \
        .getOrCreate()

    df = spark.createDataFrame(rows)

    df \
        .select('action', 'book_id', 'title', 'image', 'quantity', 'price', 'description', 'categoryId', 'userid') \
        .write \
        .mode("append") \
        .format("jdbc") \
        .options(
                 url='jdbc:postgresql://postgres:5432/tc_book_store',
                 dbtable='tc_book_requests',
                 user='postgres',
                 password='postgres',
                 driver='org.postgresql.Driver') \
        .save()
except Exception as error:
    print(error)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
new_folder_key = "processed"
for obj in objects:
    src_key = obj.key
    if not src_key.endswith("/"):
        file_name = src_key.split("/")[-1]
        dest_file_key = new_folder_key + "/" + file_name
        copy_source = bucket_name + "/" + src_key
        s3.Object(bucket_name, dest_file_key).copy_from(CopySource = copy_source)
        # s3.Object(bucket_name, src_key).delete()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

{'ResponseMetadata': {'RequestId': '1775FD189C0A8550', 'HostId': 'dd9025bab4ad464b049177c95eb6ebf374d3b3fd1af9251148b658df7ac2e3e8', 'HTTPStatusCode': 200, 'HTTPHeaders': {'accept-ranges': 'bytes', 'content-length': '232', 'content-type': 'application/xml', 'etag': '"499c91c6fae399d9fc95264e51d33031"', 'server': 'MinIO', 'strict-transport-security': 'max-age=31536000; includeSubDomains', 'vary': 'Origin, Accept-Encoding', 'x-amz-id-2': 'dd9025bab4ad464b049177c95eb6ebf374d3b3fd1af9251148b658df7ac2e3e8', 'x-amz-request-id': '1775FD189C0A8550', 'x-content-type-options': 'nosniff', 'x-xss-protection': '1; mode=block', 'date': 'Fri, 28 Jul 2023 09:11:57 GMT'}, 'RetryAttempts': 0}, 'CopyObjectResult': {'ETag': '"499c91c6fae399d9fc95264e51d33031"', 'LastModified': datetime.datetime(2023, 7, 28, 9, 11, 57, 287000, tzinfo=tzlocal())}}
{'ResponseMetadata': {'RequestId': '1775FD18A0CE9E9F', 'HostId': 'dd9025bab4ad464b049177c95eb6ebf374d3b3fd1af9251148b658df7ac2e3e8', 'HTTPStatusCode': 200, 'HTTPH