# Kafka Demo Steps

## Install and Import required libraries

In [None]:
!pip install kafka-python

In [None]:
import vastdb

from kafka.admin import KafkaAdminClient, NewTopic
from kafka import KafkaProducer
from kafka import KafkaConsumer , TopicPartition

import boto3
from botocore.config import Config

import json
import logging
import pandas as pd
import pyarrow as pa
import urllib.parse
from collections import defaultdict

from datetime import datetime
import time
import re


## SDK Logging

In [None]:
# Create a logger
logging.basicConfig(
    level=logging.INFO,    
    format='%(asctime)s - %(levelname)s - %(funcName)s - %(message)s', 
    handlers=[
        logging.FileHandler('vastdb_sdk.log', mode = 'a'), 
        logging.StreamHandler()  # Log to console
    ]
)
logger = logging.getLogger()
log = logging.getLogger(__name__)

## Define Functions

In [None]:
def infer_tag_value_type(tag_map, object_id):
    """ 
    Infers the data type of each tag value from an S3 object's tag map and 
    returns a dictionary of column-aligned lists for insertion into a structured table.

    Parameters:
        tag_map (dict): A dictionary of S3 user-defined tags in the form {key: value}, 
                        where the value is a string.
        object_id (tuple): A tuple in the format (awsRegion, bucket_name, object_key) 
                           that identifies the S3 object associated with the tags.

    Returns:
        dict: A dictionary with column names as keys and lists of values as values.
              Each list corresponds to a column in a table (e.g., tag_metadata), and
              contains values inferred as one of the supported types: string, int, 
              float, or boolean. Only one type column will be populated per row.
    """
    awsRegion_list = []
    bucket_list = []
    object_key_list = []
    key_list = []
    string_value_list = []
    int_value_list = []
    float_value_list = []
    bool_value_list = []

    for k, v in tag_map.items():
        awsRegion_list.append(object_id[0])
        bucket_list.append(object_id[1])
        object_key_list.append(object_id[2])
        key_list.append(k)

        val = v.strip() if isinstance(v, str) else str(v).strip()

        # Try boolean
        if val.lower() in ['true', 'false']:
            string_value_list.append(None)
            int_value_list.append(None)
            float_value_list.append(None)
            bool_value_list.append(val.lower() == 'true')

        # Try int
        elif val.isdigit() or (val.startswith('-') and val[1:].isdigit()):
            try:
                int_val = int(val)
                string_value_list.append(None)
                int_value_list.append(int_val)
                float_value_list.append(None)
                bool_value_list.append(None)
            except ValueError:
                string_value_list.append(val)
                int_value_list.append(None)
                float_value_list.append(None)
                bool_value_list.append(None)

        # Try float
        else:
            try:
                float_val = float(val)
                string_value_list.append(None)
                int_value_list.append(None)
                float_value_list.append(float_val)
                bool_value_list.append(None)
            except ValueError:
                string_value_list.append(val)
                int_value_list.append(None)
                float_value_list.append(None)
                bool_value_list.append(None)

    return {
        'awsRegion': awsRegion_list,
        's3_bucket_name': bucket_list,
        's3_object_key': object_key_list,
        'key': key_list,
        'string_value': string_value_list,
        'int_value': int_value_list,
        'float_value': float_value_list,
        'bool_value': bool_value_list
    }

def get_s3_object_metadata(object_id, s3_client):
    """
    Retrieves metadata for a specific S3 object.

    Parameters:
        object_id (tuple): A 3-element tuple in the format 
                           (awsRegion, bucket_name, object_key),
                           identifying the S3 object.
        s3_client (boto3.client): An initialized boto3 S3 client.

    Returns:
        dict: A dictionary containing standard S3 object metadata fields:
              - 'Size': The object's size in bytes.
              - 'LastModified': The datetime the object was last modified.
              - 'ETag': The entity tag (usually an MD5 hash).
              - 'ContentType': The object's MIME type.
              - 'Metadata': Any user-defined metadata (as a sub-dict).
              
              Returns an empty dictionary if the object doesn't exist
              or if an error occurs during the request.
    """
    try: 
        response = s3_client.head_object(
                Bucket=object_id[1],
                Key=object_id[2])
        return {
            'Size': response.get('ContentLength'),
            'LastModified': response.get('LastModified'),
            'ETag': response.get('ETag'),
            'ContentType': response.get('ContentType'),
            'Metadata': response.get('Metadata', {})  # Custom metadata if any
        }
        
    except s3_client.exceptions.NoSuchKey:
        print(f"Object '{object_id[2]}' not found in bucket '{object_id[1]}'.")
    except s3_client.exceptions.ClientError as e:
        print(f"Client error occurred: {e}")
    except Exception as e:
        print(f"Unexpected error: {e}")

    return {}  # Return empty dict on failure
    
def build_idrive_recordbatch(
    object_id: tuple,
    s3_client,
    s3_tags: dict,
    s3_object_ext: str
) -> pa.RecordBatch:
    """
    Builds an iDRive PyArrow RecordBatch for a single S3 object using metadata and tags.
    
    Parameters:
        object_id (tuple): A 3-element tuple in the format (awsRegion, bucket_name, object_key)
                           identifying the S3 object.
        s3_client (boto3.client): An initialized Boto3 S3 client used to query the object's metadata.
        s3_tags (dict): A dictionary of S3 user-defined tags associated with the object.
        s3_object_ext (str): The file extension of the S3 object (e.g., 'txt', 'csv').
    
    Returns:
        pyarrow.RecordBatch: A record batch containing all fields required by the 'idrive' table schema.
                             It includes metadata fields like lastModified, object size, and user tags.
                             Missing or optional event-related values are initialized as None.
    """

    obj_metadata = get_s3_object_metadata(object_id, s3_client)
 
    # Default values for missing fields in the schema
    eventSource = None
    s3_bucket_name = object_id[1]
    s3_object_key = object_id[2]
    awsRegion = object_id[0]
    lastModified = obj_metadata['LastModified']
    eventName = None
    userIdentity_principalId = None
    requestParameters_sourceIPAddress = None
    responseElements_x_amz_request_id = None
    responseElements_x_amz_id_2 = None
    s3_configurationId = None
    s3_bucket_ownerIdentity_principalId = None
    s3_bucket_arn = None
    s3_object_size = obj_metadata['Size']
    s3_object_sequencer = None

    # Define schema
    schema = pa.schema([
        ('eventSource', pa.string()),
        ('awsRegion', pa.string()),
        ('lastModified', pa.timestamp('ms')),
        ('eventName', pa.string()),
        ('userIdentity_principalId', pa.string()),
        ('requestParameters_sourceIPAddress', pa.string()),
        ('responseElements_x_amz_request_id', pa.string()),
        ('responseElements_x_amz_id_2', pa.string()),
        ('s3_configurationId', pa.string()),
        ('s3_bucket_name', pa.string()),
        ('s3_bucket_ownerIdentity_principalId', pa.string()),
        ('s3_bucket_arn', pa.string()),
        ('s3_object_ext', pa.string()),
        ('s3_object_key', pa.string()),
        ('s3_object_size', pa.int64()),
        ('s3_object_sequencer', pa.string()),
        ('s3_tags', pa.map_(pa.string(), pa.string()))
    ])

    # Build record as a list of columns (each a list of one element)
    record = [
        [eventSource],
        [awsRegion],
        [lastModified],
        [eventName],
        [userIdentity_principalId],
        [requestParameters_sourceIPAddress],
        [responseElements_x_amz_request_id],
        [responseElements_x_amz_id_2],
        [s3_configurationId],
        [s3_bucket_name],
        [s3_bucket_ownerIdentity_principalId],
        [s3_bucket_arn],
        [s3_object_ext],
        [s3_object_key],
        [s3_object_size],
        [s3_object_sequencer],
        [s3_tags]
    ]

    # Create and return RecordBatch
    return pa.record_batch(record, schema=schema)

def get_s3_object_tags(object_id, s3_client, return_map: bool = False):
    """
    Retrieves the tags associated with a specific S3 object and returns either a dictionary or a PyArrow RecordBatch,
    depending on the `return_map` flag.
    
    Parameters:
        object_id (tuple): A 3-element tuple representing the S3 object in the form (awsRegion, bucket_name, object_key).
        s3_client (boto3.client): An initialized Boto3 S3 client used to access S3 APIs.
        return_map (bool, optional): If True, returns a dictionary of tag key-value pairs. Defaults to False.
    
    Returns:
        dict or pyarrow.RecordBatch:
            - If return_map is True: returns a dictionary mapping tag keys to their string values.
            - If return_map is False: returns a PyArrow RecordBatch with typed columns for each tag
              (string, int, float, bool), along with identifying fields like region and object key.
    
    Functionality:
        - Uses the S3 client's `get_object_tagging` to retrieve user-defined tags.
        - Optionally returns raw tag data or performs type inference to organize the data into a structured RecordBatch.
        - Handles S3-specific errors gracefully and returns an empty dict on failure.
    """

    # Prepare empty column arrays
    awsRegion_list = []
    bucket_list = []
    object_key_list = []
    key_list = []
    string_value_list = []
    int_value_list = []
    float_value_list = []
    bool_value_list = []
    
    try:
        response = s3_client.get_object_tagging(
            Bucket=object_id[1],
            Key=object_id[2]
        )

        tag_set = response.get('TagSet', [])
        tag_map = {tag['Key']: tag['Value'] for tag in tag_set}
        if return_map:
          return tag_map
            
        # Type inference and column population    
        tag_columns = infer_tag_value_type(tag_map, object_id)    
    
        # Define PyArrow schema
        schema = pa.schema([
            ('awsRegion', pa.string()),
            ('s3_bucket_name', pa.string()),
            ('s3_object_key', pa.string()),
            ('key', pa.string()),
            ('string_value', pa.string()),
            ('int_value', pa.int32()),
            ('float_value', pa.float32()),
            ('bool_value', pa.bool_())
        ])
        
        # Create RecordBatch
        return pa.record_batch(tag_columns, schema=schema)

    except s3_client.exceptions.NoSuchKey:
        print(f"Object '{object_id[2]}' not found in bucket '{object_id[1]}'.")
    except s3_client.exceptions.ClientError as e:
        print(f"Client error occurred: {e}")
    except Exception as e:
        print(f"Unexpected error: {e}")

    return {}  # Return empty dict on failure


def create_db_record(object_id, s3_client, db_session):
    """
    Creates and inserts metadata records for an S3 object into the iDrive VAST database.
    
    Parameters:
        object_id (tuple): A 3-element tuple representing the S3 object in the form (awsRegion, bucket_name, object_key).
        s3_client (boto3.client): An initialized Boto3 S3 client used to retrieve tags and metadata from the S3 object.
        db_session: A database session used to access and modify tables within the VAST database.
    
    Returns:
        None
    
    Functionality:
        - Retrieves typed S3 object tags as a PyArrow RecordBatch and raw key-value map.
        - Extracts the object's file extension (if present) for metadata tracking.
        - Builds a second RecordBatch containing standard S3 object metadata.
        - Opens a database transaction and inserts both tag metadata and object metadata into the appropriate tables
          ("tag_metadata" and "idrive") within a VAST DB schema.
    """
    print(f"\rCREATE: {object_id}")
    pa_recordbatch_tags = None
    pa_recordbatch_tags = get_s3_object_tags(object_id, s3_client)
    if pa_recordbatch_tags: 
        s3_tags = get_s3_object_tags(object_id, s3_client, return_map = True)
        
        if '.' in object_id[2]:
            s3_object_ext = object_id[2].rsplit('.', 1)[-1]
        else:
            s3_object_ext = None  
            
        pa_recordbatch_idrive = build_idrive_recordbatch(object_id, s3_client, s3_tags, s3_object_ext)
        with db_session.transaction() as tx:
            bucket = tx.bucket(vastdb_bucket)       # Start a transaction for this Database
            schema = bucket.schema(demo_suffix)     # Get the Schema for the database to use
            tagmeta = schema.table("tag_metadata")  # Get the table from the Schema.
            tagmeta.insert(pa_recordbatch_tags)     # Insert the data from the Arrow table into the VAST DB table.
            idrive = schema.table("idrive")
            idrive.insert(pa_recordbatch_idrive)
    else:
        print(f"Failed to create database entry for {object_id[1]}\\{object_id[2]}.")
        
def update_db_record(object_id, s3_client, db_session):
    """
         This function updates the metadata and tag records of an S3 object in the iDrive VAST database.
    
    Parameters:
     - object_id: A tuple (awsRegion, bucket_name, object_key) that uniquely identifies an S3 object.
     - s3_client: A Boto3 S3 client instance used to retrieve object metadata and tags.
     - db_session: An active session to the VAST database used for reading, updating, and inserting records.
    
    Functionality:
     - Logs the update action for the specified object.
     - Retrieves the latest S3 tags and infers their types (string, int, float, bool).
     - Retrieves the object’s file extension, if present.
     - Builds two PyArrow RecordBatches:
        - One for the idrive table, containing metadata about the object.
        - One for the tag_metadata table, containing the parsed and typed tags.
     - Begins a transaction with the VAST database.
     - For the idrive table:
        - Deletes the existing record matching the object_id.
        - Inserts the new metadata record.
     - For the tag_metadata table:
        - Reads existing tag records for the object.
        - For each tag:
           - If it already exists and any of its values differ, it is updated.
           - If it's a new tag, it is inserted.
     - All database changes occur within the same transaction to maintain consistency.
    """
    print(f"\rUPDATE: {object_id}")
    
    pa_recordbatch_tags = get_s3_object_tags(object_id, s3_client)
    # Get the user tags from the S3 Object
    s3_tags = get_s3_object_tags(object_id, s3_client, return_map=True)
    # Infer the data types of the tags for storage / comparison with tag_metadata.
    tag_columns = infer_tag_value_type(s3_tags, object_id)
    if '.' in object_id[2]:
        s3_object_ext = object_id[2].rsplit('.', 1)[-1]
    else:
        s3_object_ext = None

    pa_recordbatch_idrive = build_idrive_recordbatch(object_id, s3_client, s3_tags, s3_object_ext)

    with db_session.transaction() as tx:
        bucket = tx.bucket(vastdb_bucket)
        schema = bucket.schema(demo_suffix)

        idrive = schema.table("idrive")
        idrive_reader = idrive.select(columns=[], internal_row_id=True,
            predicate=(idrive['awsRegion'] == object_id[0]) &
                      (idrive['s3_bucket_name'] == object_id[1]) &
                      (idrive['s3_object_key'] == object_id[2]))
        idrive_results = idrive_reader.read_all()
        if idrive_results.num_rows > 0:
            idrive.delete(idrive_results)
        # Really an Exception if there wasnt a record found...    
        idrive.insert(pa_recordbatch_idrive)

        tagmeta = schema.table("tag_metadata")
        reader = tagmeta.select(columns=['key', 'string_value', 'int_value', 'float_value', 'bool_value'],
                                internal_row_id=True,
                                predicate=(tagmeta['awsRegion'] == object_id[0]) &
                                          (tagmeta['s3_bucket_name'] == object_id[1]) &
                                          (tagmeta['s3_object_key'] == object_id[2]))
        # Convert pyarrow record to a python dict.
        existing = {r['key']: r for r in reader.read_all().to_pylist()}
        update_cols = ['string_value', 'int_value', 'float_value', 'bool_value']

        for k, v in s3_tags.items():
            # From the inferred data types build a record so we can compare the old and new. 
            indices = [i for i, val in enumerate(tag_columns['key']) if val == k]
            new_row = {k1: v1[indices[0]] for k1, v1 in tag_columns.items()}

            if k in existing:
                existing_row = existing[k]
                # Any tag change results in all the tags being updated.
                if any(new_row[col] != existing_row.get(col) for col in update_cols):
                    print(f"Updating tag key: {k}")
                    # Append $row_id to enable update
                    updated = pa.RecordBatch.from_pylist([new_row]).append_column(
                        '$row_id', pa.array([existing_row['$row_id']])
                    )
                    tagmeta.update(updated, update_cols)
            else:
                print(f"Inserting new tag key: {k}")
                tagmeta.insert(pa.Table.from_pylist([new_row]))


def delete_db_record(object_id, s3_client, db_session):
    """
    Deletes metadata records associated with a specific S3 object from the iDrive VAST database.
    
    Parameters:
        object_id (tuple): A tuple in the format (awsRegion, bucket_name, object_key) identifying the S3 object.
        s3_client (boto3.client): An initialized Boto3 S3 client (not used directly in this function but kept for interface consistency).
        db_session: A database session object used to interact with the VAST database.
    
    Returns:
        None
    
    Functionality:
        - Begins a transaction on the VAST database.
        - Accesses the "tag_metadata" and "idrive" tables from the schema defined by the global `demo_suffix`.
        - Queries each table for rows matching the given object_id values (region, bucket, key).
        - If matching rows are found, deletes them from the respective table and prints the number of deleted rows.
    """
    print(f"\rDELETE: {object_id}")
    with db_session.transaction() as tx:
        bucket = tx.bucket(vastdb_bucket)
        schema = bucket.schema(demo_suffix)
        tagmeta = schema.table("tag_metadata")
        reader = tagmeta.select(columns=[], 
                              internal_row_id=True,
                              predicate=(tagmeta['awsRegion']== object_id[0]) &
                                        (tagmeta['s3_bucket_name']== object_id[1]) &
                                        (tagmeta['s3_object_key']== object_id[2]))
        results = reader.read_all() # Are there any ROWIDs to delete?
        if results.num_rows > 0 :
            print(f"Deleting {results.num_rows} rows from {tagmeta.name}.")
            tagmeta.delete(results)
        else:
            print(f"No {tagmeta.name} records found for S3 Object.")
        idrive = schema.table("idrive")
        reader = idrive.select(columns=[], 
                              internal_row_id=True,
                              predicate=(idrive['awsRegion']== object_id[0]) &
                                        (idrive['s3_bucket_name']== object_id[1]) &
                                        (idrive['s3_object_key']== object_id[2]))
        results = reader.read_all() # Are there any ROWIDs to delete?
        if results.num_rows > 0 :
            print(f"Deleting {results.num_rows} rows from {idrive.name}.")
            idrive.delete(results)
        else:
            print(f"No {idrive.name} records found for S3 Object.")    

class ObjectStateManager:
    """
    A class that tracks and manages the state of S3 objects based on a stream of event records.

    The class processes a batch of S3 event data, groups events by object, and determines whether
    an object should be created, updated, deleted, or ignored in the database based on the sequence of events.
    """
    def __init__(self, s3_client,db_session):
        if s3_client is None:
            raise ValueError("s3 boto client must be provided")
        if db_session is None:
            raise ValueError("Vast Database Session must be provided")
        self.s3_client = s3_client
        self.db_session = db_session
        self.events_by_object = defaultdict(list)

    def process_stream(self, df):
        """
          Process a DataFrame of event records to determine final object states.

        :param df: Pandas DataFrame with S3 event records. Each row must contain eventTime, eventName,
                   awsRegion, s3.bucket.name, and s3.object.key columns.
        """
        # Sort by eventTime
        df = df.sort_values('eventTime')
        
        # Group events by unique resource
        for _, row in df.iterrows():
            object_id = self._get_object_id(row)
            self.events_by_object[object_id].append(row['eventName'])

        # Resolve final state per resource
        for object_id, events in self.events_by_object.items():
            self._resolve_object_events(object_id, events)

        # Clear for next batch
        self.events_by_object.clear()

    def _resolve_object_events(self, object_id, events):
        """
          Determine the final state of an S3 object based on its list of events,
          and apply the appropriate database operation.

        :param object_id: A tuple (awsRegion, bucket_name, object_key) identifying the S3 object
        :param events: A list of event names for the given object
        """
        state = None
        for event in events:
            if event.startswith('ObjectCreated:'):
                if state == "Deleted":
                    # Deleted then re-created: treat as created
                    state = "Created"
                elif state is None:
                    state = "Created"
                # An Error could be raised if the state was Updated before created...    
            elif event.startswith('ObjectTagging'):
                if state == "Created":
                    state = "Created"  # Creation subsumes update
                elif state is None:
                    state = "Updated"
            elif event.startswith('ObjectRemoved:'):
                if state == "Created":
                    state = None  # Created + Deleted cancels out
                elif state is None:
                    state = "Deleted"

        # Execute the correct function
        if state == "Created":
            create_db_record(object_id, self.s3_client, self.db_session)
        elif state == "Updated":
            update_db_record(object_id, self.s3_client, self.db_session)
        elif state == "Deleted":
            delete_db_record(object_id, self.s3_client, self.db_session)
        else:    
            print(f"Ignoring object: {object_id}.")

    def _get_object_id(self, row):
        """
          Generate a unique object identifier tuple from a row of event data.

        :param row: A pandas Series (row) representing an event and object
        :return: Tuple of (awsRegion, bucket_name, object_key)
        """        
        return (row['awsRegion'], row['s3.bucket.name'], row['s3.object.key'])


## Load Demo State and define variables

In [None]:
# Read in the Demo Variables and values.
with open("demo_state.json", "r") as f:
    data = json.load(f)
# Dynamically create python variables needed for the Demo.
for key, value in data.items():
    globals()[key] = value

## Create Kafka Consumer

In [None]:
logging.disable(logging.CRITICAL)
consumer = KafkaConsumer(
    kafka_topic, 
    bootstrap_servers=[vip_pool_ip],
    auto_offset_reset='earliest',         # Start from the beginning of the topic
    enable_auto_commit=False,  # Disable to avoid background thread
    consumer_timeout_ms=2000,  # Return if no message in 1 sec
    group_id='consumer-group',
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)

## Listen to topic for events

In [None]:

print(f"Listening to topic '{kafka_topic}'...\n")
messages_data = []
# Inactivity timeout in seconds
inactivity_timeout = 5
# Time-limited consumption
start_time = time.time()
last_message_time = time.time()

while True:
    message_pack = consumer.poll(timeout_ms=2000)
    new_message_received = False

    for tp, messages in message_pack.items():
        for message in messages:
            try:
                # Decode and parse the JSON message
                
                msg_value = message.value
                save_it = message
                if 'Records' in msg_value: 
                  for record in message.value['Records']:
                      enriched_record = {
                            **record,
                            'kafka_offset': message.offset,
                            'kafka_key': message.key.decode('utf-8') if isinstance(message.key, bytes) else str(message.key)
                        }
                      messages_data.append(enriched_record) 
                  new_message_received = True
                  
            except (json.JSONDecodeError, AttributeError) as e:
                print(f"Failed to parse message: {message.value} - Error: {e}")

    if new_message_received:
        last_message_time = time.time()

    if time.time() - last_message_time > inactivity_timeout:
        print(f"\nNo new messages for {inactivity_timeout} seconds. Stopping listener.")
        break
consumer.commit()  
consumer.close()
print("\nStopped listening.")

logging.disable(logging.NOTSET)
# Convert to a pandas DataFrame
df1 = pd.json_normalize(messages_data)

In [None]:
if df1.empty:
    print('No new messages processed.')
else:    
    #
    # Convert Object data type to datetime and remove url parse .
    #
    df1['eventTime'] = pd.to_datetime(df1['eventTime'])
    df1['s3.object.key'] = df1['s3.object.key'].apply(urllib.parse.unquote)
    if 'result' in locals():
      result = pd.concat([result, df1], ignore_index=True)
    else:
      result = df1  

In [None]:
result.shape

## Process Events

In [None]:
endpoint_url = f"http://{vip_pool_ip}"
#
# Create an S3 Client used to query Objects
#
s3_client = boto3.client(
    's3',
    endpoint_url=endpoint_url,
    aws_access_key_id=S3_ACCESS_KEY,
    aws_secret_access_key=S3_SECRET_KEY,
    config=Config(signature_version='s3v4', 
                  parameter_validation=False, 
                  s3={'payload_signing_enabled':False,'addressing_style':'path','checksum_algorithm': None}
                 ),
    verify=False  # Set to False if the endpoint doesn't use SSL (http)
)

#
# Establish Session with VAST database
#
db_session = {}
try:
    db_session = vastdb.connect(
              endpoint=endpoint_url,
              access=S3_ACCESS_KEY,
              secret=S3_SECRET_KEY
             )
except Exception as e:
    log.critical(e)
if db_session:
   log.info("VAST DB Session started")
else:
   log.critical("Unable to connect to VAST DB.")

#
# Instantiate OSM Class
#
osm = ObjectStateManager(s3_client, db_session)


In [None]:
#
# NOTE: Once the Events have been processed set the "result" variable to None.
#       This will ensure that events are not processed more than once.
#
osm.process_stream(result)

In [None]:
result = None