In [None]:
pip install pandas numpy kafka-python scikit-learn

In [None]:
import pandas as pd
import numpy as np
from kafka import KafkaConsumer  # For data ingestion (requires kafka-python library)
from sklearn.linear_model import LinearRegression
import sqlite3  # For local storage simulation

In [None]:
# 1. Data Ingestion
def ingest_data():
    """
    Simulates data ingestion from a Kafka topic. 
    Returns a mock dataframe for demonstration or real data if Kafka is configured.
    """
    # Mock data for testing without Kafka
    data = {
        "timestamp": pd.date_range(start="2025-01-01", periods=10, freq="D"),
        "sensor_reading": np.random.randint(50, 100, size=10),
    }
    df = pd.DataFrame(data)
    print("Data Ingested:")
    print(df)
    return df

    # Uncomment the following to use Kafka for real data ingestion
    # consumer = KafkaConsumer('your_topic', 
    #                          bootstrap_servers=['localhost:9092'],
    #                          value_deserializer=lambda m: json.loads(m.decode('ascii')))
    # 
    # data_list = []
    # for message in consumer:
    #     data_list.append(message.value)
    # 
    # if not data_list:
    #     raise ValueError("No data received from Kafka")
    # 
    # df = pd.DataFrame(data_list)
    # consumer.close()
    # return df

# 2. Data Processing (Simple ETL)
def process_data(df):
    """
    Simulates processing by normalizing the sensor readings.
    """
    if df.empty:
        raise ValueError("DataFrame is empty, cannot process data.")
    
    df["normalized_reading"] = (df["sensor_reading"] - df["sensor_reading"].min()) / (
        df["sensor_reading"].max() - df["sensor_reading"].min()
    )
    print("\nData Processed (Normalized):")
    print(df)
    return df

# 3. Data Storage
def store_data(df):
    """
    Stores processed data into a local SQLite database.
    """
    if df.empty:
        raise ValueError("DataFrame is empty, cannot store data.")
    
    conn = sqlite3.connect("data_pipeline.db")
    df.to_sql("sensor_data", conn, if_exists="replace", index=False)
    conn.close()
    print("\nData Stored in SQLite Database.")

# 4. AI Model Integration (Simple Predictive Model)
def ai_model_prediction(df):
    """
    Uses a linear regression model to predict future sensor readings based on time.
    """
    if df.empty:
        raise ValueError("DataFrame is empty, cannot make predictions.")
    
    # Convert timestamp to ordinal for numeric prediction
    df["timestamp_ordinal"] = pd.to_datetime(df["timestamp"]).apply(pd.Timestamp.toordinal)

    # Train a simple linear regression model
    model = LinearRegression()
    X = df[["timestamp_ordinal"]]
    y = df["sensor_reading"]
    try:
        model.fit(X, y)
    except ValueError as e:
        print(f"Error in model fitting: {e}")
        return

    # Predict future readings for the next 5 days
    future_dates = pd.date_range(start=df["timestamp"].max(), periods=5, freq="D")
    future_ordinals = future_dates.map(pd.Timestamp.toordinal)
    future_readings = model.predict(future_ordinals.values.reshape(-1, 1))

    print("\nAI Model Predictions (Next 5 Days):")
    for date, reading in zip(future_dates, future_readings):
        print(f"Date: {date}, Predicted Reading: {reading:.2f}")

# Main Pipeline Execution
if __name__ == "__main__":
    try:
        # Step 1: Ingest data
        data = ingest_data()

        # Step 2: Process data
        processed_data = process_data(data)

        # Step 3: Store data
        store_data(processed_data)

        # Step 4: AI Model Prediction
        ai_model_prediction(processed_data)
    except Exception as e:
        print(f"An error occurred: {e}")