### Ensure following packages are installed.

In [None]:
!pip install influxdb_client # install the influxdb_client to enable streaming to influxDB

In [None]:
!pip install certifi # provides Mozilla's trusted SSL/TLS certificates for secure HTTPS connections in Python

### Code implementing stream: Source -> Kafka -> PySpark -> InfluxDB

In [None]:
import requests
import json
import time
from kafka import KafkaProducer, KafkaConsumer
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql import Row
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
import certifi
import threading

# Kafka configuration
KAFKA_BROKER = 'localhost:9092'
TOPIC = 'weatherlogs'

# InfluxDB configuration
token = "sHCei1o3UR9_FsFWBnV-SMAJGRPspzEBDu__2Wmnkj3-LozLSKCe-yNm-6zhyl5oGInqdmqqeoKP8VoF2fSGKA=="
org = "MyProjects"
bucket = "weather-forecast"
url = "https://us-east-1-1.aws.cloud2.influxdata.com"

# Kafka producer setup
producer = KafkaProducer(
    bootstrap_servers=[KAFKA_BROKER],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# Spark session initialization
spark = SparkSession.builder \
    .appName("Weather") \
    .getOrCreate()

# Schema for incoming data
schema = StructType([
    StructField("location", StringType(), True),
    StructField("short_forecast", StringType(), True),
    StructField("temperature", StringType(), True),
    StructField("temperature_unit", StringType(), True),
    StructField("wind_speed", StringType(), True),
    StructField("start_time", StringType(), True)
])

# InfluxDB client setup
client = InfluxDBClient(url=url, token=token, org=org, ssl_ca_cert=certifi.where())
write_api = client.write_api(write_options=SYNCHRONOUS)

latitude, longitude = 38.8977, -77.0365  # Washington, D.C. coordinates

def stream_forecast():
    while True:
        try:
            api_url = f"https://api.weather.gov/points/{latitude},{longitude}"
            response = requests.get(api_url)
            response.raise_for_status()
            
            data = response.json()
            forecast_url = data['properties']['forecast']

            forecast_response = requests.get(forecast_url)
            forecast_response.raise_for_status()
            
            forecast_data = forecast_response.json()
            for period in forecast_data['properties']['periods']:
                forecast_info = {
                    "location": "Washington, D.C.",
                    "short_forecast": period['shortForecast'],
                    "temperature": period['temperature'],
                    "temperature_unit": period['temperatureUnit'],
                    "wind_speed": period['windSpeed'],
                    "start_time": period['startTime']
                }
                producer.send(TOPIC, value=forecast_info)
            time.sleep(604800)  # Stream every 7 days
        except requests.RequestException as e:
            print("Failed to fetch data:", e)
            time.sleep(3600)  # Retry after 1 hour on failure

def consume_and_store():
    consumer = KafkaConsumer(
        TOPIC,
        bootstrap_servers=KAFKA_BROKER,
        auto_offset_reset='latest',
        value_deserializer=lambda x: json.loads(x.decode('utf-8'))
    )
    for message in consumer:
        data = message.value
        if isinstance(data, dict):
            row = Row(
                location=data['location'],
                short_forecast=data['short_forecast'],
                temperature=str(data['temperature']),
                temperature_unit=data['temperature_unit'],
                wind_speed=data['wind_speed'],
                start_time=data['start_time']
            )
            df = spark.createDataFrame([row], schema=schema)
            point = (
                Point("weather.forecast")
                .tag("location", row['location'])
                .tag("short_forecast", row['short_forecast'])
                .field("temperature", row['temperature'])
                .field("wind_speed", row['wind_speed'])
                .time(row['start_time'])
            )
            write_api.write(bucket=bucket, org=org, record=point)
            print(f"Data written to InfluxDB: {row['start_time']}, {row['temperature']}°{row['temperature_unit']} , {row['short_forecast']}")

if __name__ == "__main__":
    try:
        threading.Thread(target=stream_forecast).start()
        consume_and_store()
    except KeyboardInterrupt:
        print("Streaming stopped.")
    finally:
        producer.close()
        client.close()

