In [1]:
!pip install kafka-python



In [2]:
from kafka import KafkaProducer
import json
import time
import random

In [3]:
# Kafka producer setup
producer = KafkaProducer(
    bootstrap_servers=['54.235.34.127'],  # Change to your EC2 Kafka server
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

In [4]:
from kafka.admin import KafkaAdminClient

admin_client = KafkaAdminClient(
    bootstrap_servers=['54.235.34.127:9092'],  # Remplacez par l'adresse de votre broker
    client_id='test_admin'
)

topics = admin_client.list_topics()
print("Topics existants :", topics)

if 'sensor-data' in topics:
    print("The topic 'sensor-data' exists.")
else:
    print("The topic 'sensor-data' doesn't exists.")


Topics existants : ['sensor-data']
The topic 'sensor-data' exists.


In [5]:
!pip install xgboost



In [6]:
pip install transformers torch pandas xgboost kafka-python

Note: you may need to restart the kernel to use updated packages.


In [7]:
pip install openai langchain

Note: you may need to restart the kernel to use updated packages.


In [None]:
import xgboost as xgb
import joblib
import boto3
import os
import json
import random
import time
from datetime import datetime
from kafka import KafkaProducer
import pandas as pd
from openai import AzureOpenAI

# Initialize Azure OpenAI client
client = AzureOpenAI(
    api_key="my_api_key",
    api_version="2023-03-15-preview",
    azure_endpoint="https://tuteureopenai.openai.azure.com"
)

# Get interpretation from LLM
def get_interpretation(prediction_value):
    if prediction_value <= 0:
        situation = "optimal soil moisture"
    elif 0 < prediction_value <= 0.2:
        situation = "slightly dry soil"
    elif 0.2 < prediction_value < 0.8:
        situation = "moderately dry soil"
    else:
        situation = "very dry soil"
    
    try:
        response = client.chat.completions.create(
            model="gpt-35-turbo",
            messages=[
                {"role": "system", "content": "You are an irrigation advisor. Give very brief recommendations in 5-7 words."},
                {"role": "user", "content": f"Given {situation} with a moisture prediction value of {prediction_value}, provide a very brief irrigation recommendation."}
            ]
        )
        return response.choices[0].message.content.strip()
    except Exception as e:
        print(f"Error getting LLM interpretation: {e}")
        return get_fallback_interpretation(prediction_value)

# Fallback interpretation if LLM fails
def get_fallback_interpretation(prediction_value):
    if prediction_value <= 0:
        return "No irrigation needed now."
    elif 0 < prediction_value <= 0.2:
        return "Soil slightly dry, monitor only."
    elif 0.2 < prediction_value < 0.8:
        return "Monitor moisture levels closely."
    else:
        return "Start irrigation immediately."

# Download model from S3
def download_model_from_s3(model_url, local_path):
    s3_client = boto3.client('s3', region_name='us-east-1')
    bucket_name = 'projectkafkabucket'
    object_key = 'models/xgboost_model.joblib'

    try:
        s3_client.download_file(bucket_name, object_key, local_path)
        print(f"Model downloaded successfully to {local_path}")
    except Exception as e:
        print(f"Error downloading model: {e}")

# Set local model path
local_model_path = '/tmp/xgboost_model.joblib'

# Download the model
download_model_from_s3("https://projectkafkabucket.s3.us-east-1.amazonaws.com/models/xgboost_model.joblib", local_model_path)

# Load the model
model = joblib.load(local_model_path)

# Get feature names
if isinstance(model, xgb.Booster):
    feature_names = model.feature_names
else:
    feature_names = model.get_booster().feature_names

print("Expected features:", feature_names)

# Global list for batch data
data_batch = []

# Generate and send data continuously
def generate_data():
    global data_batch

    while True:
        # Generate random data
        data = {
            "Soil Moisture": random.randint(0, 100),
            "Temperature": random.randint(0, 50),
            "Soil Humidity": round(random.uniform(10.0, 50.0), 1),
        }
        print(f"Generating data: {data}")

        # Convert to DMatrix
        dmatrix_input = xgb.DMatrix(
            [[data["Soil Moisture"], data["Temperature"], data["Soil Humidity"]]],
            feature_names=["Soil Moisture", "Temperature", "Soil Humidity"]
        )

        # Get prediction and interpretation
        prediction = model.predict(dmatrix_input)[0]
        status = 1 if prediction > 0.5 else 0
        interpretation = get_interpretation(prediction)

        # Add additional fields
        data["Status"] = status
        data["Time"] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        data["Interpretation"] = interpretation

        # Create a copy for Kafka/Kinesis with Prediction_Value
        data_with_prediction = data.copy()
        data_with_prediction["Prediction_Value"] = float(prediction)

        print(f"Complete data with interpretation: {data_with_prediction}")

        # Send to Kafka
        try:
            producer = KafkaProducer(
                bootstrap_servers='54.235.34.127:9092',
                value_serializer=lambda v: json.dumps(v).encode('utf-8')
            )
            producer.send('sensor-data', value=data_with_prediction)
            print(f"Sent to Kafka: {data_with_prediction}")
        except Exception as e:
            print(f"Error sending to Kafka: {e}")

        # Send to Kinesis
        try:
            kinesis_client = boto3.client('kinesis', region_name='us-east-1')
            stream_name = 'kafka-stream'
            response = kinesis_client.put_record(
                StreamName=stream_name,
                Data=json.dumps(data_with_prediction),
                PartitionKey=str(random.randint(1, 1000))
            )
            print(f"Sent to Kinesis: {response}")
        except Exception as e:
            print(f"Error sending to Kinesis: {e}")

        # Add to batch (without Prediction_Value)
        data_batch.append(data)

        # Send to S3 if batch is full
        if len(data_batch) >= 10:
            send_to_s3_csv(data_batch)
            data_batch = []

        time.sleep(2)

# Send batch data to S3
def send_to_s3_csv(batch_data):
    timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
    s3_key = f"sensor_data_2/sensor_data_{timestamp}.csv"

    df = pd.DataFrame(batch_data)
    csv_data = df.to_csv(index=False)

    s3_client = boto3.client('s3', region_name='us-east-1')
    bucket_name = 'projectkafkabucket'

    try:
        s3_client.put_object(
            Bucket=bucket_name,
            Key=s3_key,
            Body=csv_data.encode('utf-8'),
            ContentType='text/csv'
        )
        print(f"Successfully uploaded to S3 as CSV: {s3_key}")
    except Exception as e:
        print(f"Error uploading CSV to S3: {e}")

# Start data generation
if __name__ == "__main__":
    try:
        generate_data()
    except KeyboardInterrupt:
        print("Stopping data generation...")
    except Exception as e:
        print(f"Error in main loop: {e}")

Note: You have installed the 'manylinux2014' variant of XGBoost. Certain features such as GPU algorithms or federated learning are not available. To use these features, please upgrade to a recent Linux distro with glibc 2.28+, and install the 'manylinux_2_28' variant.


Model downloaded successfully to /tmp/xgboost_model.joblib
Expected features: ['Soil Moisture', 'Temperature', 'Soil Humidity']
Generating data: {'Soil Moisture': 65, 'Temperature': 2, 'Soil Humidity': 12.6}


configuration generated by an older version of XGBoost, please export the model by calling
`Booster.save_model` from that version first, then load it back in current version. See:

    https://xgboost.readthedocs.io/en/stable/tutorials/saving_model.html

for more details about differences between saving model and serializing.



Complete data with interpretation: {'Soil Moisture': 65, 'Temperature': 2, 'Soil Humidity': 12.6, 'Status': 1, 'Time': '2024-11-27 20:34:05', 'Interpretation': 'Irrigate immediately to avoid drought stress.', 'Prediction_Value': 0.9556980133056641}
Sent to Kafka: {'Soil Moisture': 65, 'Temperature': 2, 'Soil Humidity': 12.6, 'Status': 1, 'Time': '2024-11-27 20:34:05', 'Interpretation': 'Irrigate immediately to avoid drought stress.', 'Prediction_Value': 0.9556980133056641}
Sent to Kinesis: {'ShardId': 'shardId-000000000001', 'SequenceNumber': '49658034342854651653229049389141406758740230229731573778', 'ResponseMetadata': {'RequestId': 'fe5ef3d8-9801-5bbc-a19f-77a5631734c0', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'fe5ef3d8-9801-5bbc-a19f-77a5631734c0', 'x-amz-id-2': 'Se6oO9VV7LGSHBz3qqfAhwgBqxtEo+PPpXGtK2LXU0BCevaOeeQYQwLNE/c4zspH7kcNhectQeMbz94rqrHtNRZKY/a/LDXj', 'date': 'Wed, 27 Nov 2024 20:34:05 GMT', 'content-type': 'application/x-amz-json-1.1', 'content-length':