In [3]:
!pip install kafka-python

Collecting kafka-python
  Downloading kafka_python-2.0.2-py2.py3-none-any.whl (246 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m246.5/246.5 kB[0m [31m9.6 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: kafka-python
Successfully installed kafka-python-2.0.2

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip available: [0m[31;49m22.2.2[0m[39;49m -> [0m[32;49m24.1.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [6]:
!pip install pandas 
!pip install psycopg2-binary


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip available: [0m[31;49m22.2.2[0m[39;49m -> [0m[32;49m24.1.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
Collecting psycopg2-binary
  Downloading psycopg2_binary-2.9.9-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.0/3.0 MB[0m [31m94.8 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: psycopg2-binary
Successfully installed psycopg2-binary-2.9.9

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip available: [0m[31;49m22.2.2[0m[39;49m -> [0m[32;49m24.1.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [2]:
!pip install sqlalchemy

Collecting sqlalchemy
  Using cached SQLAlchemy-2.0.31-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.1 MB)
Installing collected packages: sqlalchemy
Successfully installed sqlalchemy-2.0.31

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip available: [0m[31;49m22.2.2[0m[39;49m -> [0m[32;49m24.1.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [3]:
import boto3
from botocore.exceptions import ClientError
import json

def get_secret():

    secret_name = "DBCreds"
    region_name = "us-east-1"

    # Create a Secrets Manager client
    session = boto3.session.Session()
    client = session.client(
        service_name='secretsmanager',
        region_name=region_name
    )

    try:
        get_secret_value_response = client.get_secret_value(
            SecretId=secret_name
        )
    except ClientError as e:
        raise e

    secret = get_secret_value_response['SecretString']
    
    # Parse the secret string to get the credentials
    secret_dict = json.loads(secret)
    username = secret_dict['username']
    password = secret_dict['password']
    host = secret_dict['host']
    port = secret_dict['port']
    dbname = secret_dict['dbname']

    return username, password, host, port, dbname


(user,pswd,host,port,db) = get_secret()

In [5]:
import logging
from kafka import KafkaConsumer
import requests
import json
import psycopg2
import uuid
import pandas as pd

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

resp = []

# Replace with your Kafka broker address(es)
brokers = [f"{host}:9092"]

# Replace with your topic name
topic = "three"


db_details = {
    'dbname': db,
    'user': user,
    'password': pswd,
    'host': host,
    'port': port
}

try:
    conn = psycopg2.connect(**db_details)
    cursor = conn.cursor()
    print("Connected to PostgreSQL successfully.")
except Exception as e:
    print(f"Failed to connect to PostgreSQL: {e}")
    exit()
    
try:
    fetch_query = "SELECT * FROM malware_model_metrics where in_use is true LIMIT 1;"
    model = pd.read_sql(fetch_query, conn)
except Exception as e:
    print(f"Failed to fetch data: {e}")
finally:
    if cursor:
        cursor.close()
    if conn:
        conn.close()

# KServe inference service URL
kserve_url = f"http://{model['name'][0]}-version{model['version'][0]}.kubeflow-user-example-com.svc.cluster.local/v1/models/{model['name'][0]}-version{model['version'][0]}:predict"

# Create a Kafka consumer
logger.info("Creating Kafka consumer...")
consumer = KafkaConsumer(
    topic,
    bootstrap_servers=brokers,
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    group_id='my-group',
    value_deserializer=lambda v: v.decode('utf-8')
)

logger.info(f"Subscribed to topic: {topic}")


# Function to send data to KServe
def send_to_kserve(data):
    headers = {'Content-Type': 'application/json'}
    response = requests.post(kserve_url, headers=headers, data=json.dumps(data))
    return response.json()

def outcome_to_database(values):
    db_details = {
        'dbname': db,
        'user': user,
        'password': pswd,
        'host': host,
        'port': port
    }
    # Data to insert
    outcome_data = {
        'uid': values[0],  # Generating a unique ID
        'outcome': values[1]  # This can be 0 or 1
    }

# Connect to PostgreSQL
    try:
        conn = psycopg2.connect(**db_details)
        cursor = conn.cursor()
        print("Connected to PostgreSQL successfully.")
    except Exception as e:
        print(f"Failed to connect to PostgreSQL: {e}")
        exit()

    # Insert data into the table
    try:
        insert_query = """
        INSERT INTO malware_outcomes (uid, outcome)
        VALUES (%s, %s)
        """
        cursor.execute(insert_query, (outcome_data['uid'], outcome_data['outcome']))
 
        update_query = """
        UPDATE malware_data
        SET outcome = %s
        WHERE uid = %s
        """
        cursor.execute(update_query, (outcome_data['outcome'], outcome_data['uid']))
        conn.commit()
        print("Data inserted and updated successfully.")
    except Exception as e:
        print(f"Failed to insert/update data: {e}")
        conn.rollback()
    finally:
        if cursor:
            cursor.close()
        if conn:
            conn.close()
        print("PostgreSQL connection closed.")

        
        

try:
    for message in consumer:
        try:
            resp = []
            message = json.loads(message.value)  # Message value is already a dict
            print(f"Received message: {message}")
            uid = message.pop('uid', None)
            resp.append(uid)
            #print(f"new message: {message}")
            # Prepare data for KServe
            kserve_payload = {
                "instances": [list(message.values())]  # Reshape the data to 2D array
            }

            # Send data to KServe
            kserve_response = send_to_kserve(kserve_payload)
            resp.append(kserve_response['predictions'][0])
            print(f"KServe response: {kserve_response}")
            print(resp)
            outcome_to_database(resp)
        except json.JSONDecodeError:
            print(f"Received non-JSON message: {message.value}")
        except Exception as e:
            logger.error(f"Error sending to KServe: {e}")
        #logger.info(f"Partition: {message.partition}, Offset: {message.offset}")
except KeyboardInterrupt:
    logger.info("Consumer stopped manually.")
finally:
    consumer.close()
    logger.info("Consumer closed.")


INFO:__main__:Creating Kafka consumer...
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=3.81.167.121:9092 <connecting> [IPv4 ('3.81.167.121', 9092)]>: connecting to 3.81.167.121:9092 [('3.81.167.121', 9092) IPv4]
INFO:kafka.conn:Probing node bootstrap-0 broker version
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=3.81.167.121:9092 <connecting> [IPv4 ('3.81.167.121', 9092)]>: Connection complete.
INFO:kafka.conn:Broker version identified as 2.5.0
INFO:kafka.conn:Set configuration api_version=(2, 5, 0) to skip auto check_version requests on startup
INFO:kafka.consumer.subscription_state:Updating subscribed topics to: ('three',)
INFO:__main__:Subscribed to topic: three
INFO:kafka.cluster:Group coordinator for my-group is BrokerMetadata(nodeId='coordinator-2', host='ec2-3-81-167-121.compute-1.amazonaws.com', port=9094, rack=None)
INFO:kafka.coordinator:Discovered coordinator coordinator-2 for group my-group
INFO:kafka.coordinator:Starting new heartbeat thread
INFO:

Connected to PostgreSQL successfully.


INFO:kafka.coordinator:(Re-)joining group my-group
INFO:kafka.coordinator:Elected group leader -- performing partition assignments using range
INFO:kafka.conn:<BrokerConnection node_id=1 host=ec2-3-81-167-121.compute-1.amazonaws.com:9093 <connecting> [IPv4 ('3.81.167.121', 9093)]>: connecting to ec2-3-81-167-121.compute-1.amazonaws.com:9093 [('3.81.167.121', 9093) IPv4]
INFO:kafka.conn:<BrokerConnection node_id=1 host=ec2-3-81-167-121.compute-1.amazonaws.com:9093 <connecting> [IPv4 ('3.81.167.121', 9093)]>: Connection complete.
INFO:kafka.coordinator:Successfully joined group my-group with generation 1
INFO:kafka.consumer.subscription_state:Updated partition assignment: [TopicPartition(topic='three', partition=0)]
INFO:kafka.coordinator.consumer:Setting newly assigned partitions {TopicPartition(topic='three', partition=0)} for group my-group
INFO:__main__:Consumer stopped manually.
INFO:kafka.coordinator:Stopping heartbeat thread
INFO:kafka.coordinator:Leaving consumer group (my-group)