In [None]:
import time
import json
import pandas as pd
import matplotlib.pyplot as plt
import geopandas as gpd
from pykafka import KafkaClient
from kafka import KafkaConsumer, KafkaProducer
from river import linear_model
from sklearn.linear_model import LinearRegression
from src.environment import KAFKA_BROKER, KAFKA_TOPIC
from src.weather_api import fetch_history

In [None]:
# Kafka setup
kafka_host = KAFKA_BROKER
kafka_topic_realtime = 'weather_realtime'
kafka_topic_forecast = 'weather_forecast'
kafka_topic_history = 'weather_history'

client = KafkaClient(hosts=kafka_host)
topic_realtime = client.topics[kafka_topic_realtime]
topic_forecast = client.topics[kafka_topic_forecast]
topic_history = client.topics[kafka_topic_history]

# Batch regression model using scikit-learn
batch_model = LinearRegression()

# WeatherAPI.com API key
api_key = 'YOUR_WEATHERAPI.COM_API_KEY'

In [None]:
# Create a consumer instance for the weather topic
consumer = KafkaConsumer(
    kafka_topic_realtime, # The topic name
    bootstrap_servers=[kafka_host], # The bootstrap server URL
    auto_offset_reset="earliest", # Start from the beginning of the topic
    enable_auto_commit=True, # Commit the offsets automatically
    group_id="weather-group", # The consumer group id
    value_deserializer=lambda v: json.loads(v.decode("utf-8")) # A function to deserialize the messages as JSON
)

# Create an empty list to store the data
online_data = []

# Consume the messages from the topic
for message in consumer:
    # Append the message value to the data list
    online_data.append(message.value)

# Create the online_model instance using the data
online_model = linear_model.LinearRegression().learn_many(online_data, x=['realtime', 'forecast'], y='history')


In [None]:
# Define the perform_batch_regression function
def perform_batch_regression(X, y):
    # Fit the batch_model to the data
    batch_model.fit(X, y)

    # Print the coefficients and the intercept
    print(f"Coefficients: {batch_model.coef_}")
    print(f"Intercept: {batch_model.intercept_}")

# Call the perform_batch_regression function
perform_batch_regression(X, y)


In [None]:
# Plot temperature analysis for each city
def plot_temperature_analysis(cities):
    for city in cities:
        history_data = fetch_history(city, time.strftime('%Y-%m-%d'))
        if history_data:
            dates = [day['date'] for day in history_data['forecast']['forecastday']]
            max_temps = [day['day']['maxtemp_c'] for day in history_data['forecast']['forecastday']]
            min_temps = [day['day']['mintemp_c'] for day in history_data['forecast']['forecastday']]

            # Plot temperature analysis
            plt.plot(dates, max_temps, label=f'{city} - Max Temperature')
            plt.plot(dates, min_temps, label=f'{city} - Min Temperature')

    plt.xlabel('Date')
    plt.ylabel('Temperature (Celsius)')
    plt.title('Temperature Analysis')
    plt.legend()
    plt.show()

In [None]:
if __name__ == "__main__":
    cities = ['New York', 'London', 'Tokyo']  # Add more cities if needed

    # Example batch regression with scikit-learn
    # Replace X and y with actual features and target variables
    X = [[data['realtime']['temp_c']] for data in online_model]
    y = [data['timestamp'] for data in online_model]
    perform_batch_regression(X, y)

    # Perform comparisons and discussions of online vs batch regression
    # Add your comparison and discussion code here

    # Plot temperature analysis for each city
    plot_temperature_analysis(cities)