# Task 1. Processing Data Stream (45%)
## Event Producer 1 (climate_streaming)

In [1]:
import csv
import pymongo
from pymongo import MongoClient
from datetime import datetime
from pprint import pprint

# Specify the host and port explicitly
client = MongoClient('192.168.100.7', 27017)

# Getting the database and collection
db = client["fit3182_asgn3"]
collection = db["hist_data"]

# Get the latest date of historic records
latest_date=''

pipeline = [
    {
        '$addFields': {
            'dateForSorting': {
                '$dateFromString': {'dateString': '$date', 'format': '%d/%m/%Y'}
            }
        }
    },
    {
        '$sort': {
            'dateForSorting': -1 # Sort in ascending order
        }
    },
    {
        '$limit': 1 
    }
]

cursor = collection.aggregate(pipeline)

for document in cursor:
    latest_date = document['dateForSorting']  # Store the value of the 'date' field in a variable
    print(latest_date)

2023-01-01 00:00:00


In [2]:
from time import sleep
from json import dumps
from kafka3 import KafkaProducer
import csv
import datetime as dt

hostip = "192.168.100.7" 
csv_file = "climate_streaming.csv"

def publish_message(producer_instance, topic_name, key, value):
    try:
        key_bytes = bytes(key, encoding='utf-8')
        value_bytes = bytes(value, encoding='utf-8')
        producer_instance.send(topic_name, key=key_bytes, value=value_bytes)
        producer_instance.flush()
        print('Message published successfully. Data: ' + str(value))
    except Exception as ex:
        print('Exception in publishing message.')
        print(str(ex))

def connect_kafka_producer():
    _producer = None
    try:
        _producer = KafkaProducer(bootstrap_servers=[f'{hostip}:9092'],
                                  api_version=(0, 10))
    except Exception as ex:
        print('Exception while connecting to Kafka.')
        print(str(ex))
    finally:
        return _producer


if __name__ == '__main__':
    topic = 'Climate_Hotspot_Stream'
    
    print('Publishing records..')
    producer = connect_kafka_producer()
    
    # Read from climate_streaming.csv
    with open(csv_file, 'r') as file:
        reader = csv.DictReader(file)
        
        # Add one day to the latest date
        latest_date += dt.timedelta(days=1)

        # Add 2 fields ('producer', 'date') to each climate record
        for row in reader:
            row['producer'] = 'Climate'
            row['date'] = latest_date.strftime("%d/%m/%Y") 
            data = dumps(row)
            publish_message(producer, topic, 'parsed', data)
            
            # Increment the datetime by one day in every 10 seconds
            sleep(10)
            latest_date += dt.timedelta(days=1)  

Publishing records..
Message published successfully. Data: {"latitude": "-37.623", "longitude": "149.323", "air_temperature_celcius": "19", "relative_humidity": "56.8", "windspeed_knots": "7.9", "max_wind_speed": "11.1", "precipitation ": " 0.00I", "GHI_w/m2": "154", "producer": "Climate", "date": "02/01/2023"}
Message published successfully. Data: {"latitude": "-38.038", "longitude": "142.986", "air_temperature_celcius": "15", "relative_humidity": "50.7", "windspeed_knots": "9.2", "max_wind_speed": "13", "precipitation ": " 0.02G", "GHI_w/m2": "128", "producer": "Climate", "date": "03/01/2023"}
Message published successfully. Data: {"latitude": "-37.95", "longitude": "142.366", "air_temperature_celcius": "16", "relative_humidity": "53.6", "windspeed_knots": "8.1", "max_wind_speed": "15", "precipitation ": " 0.00G", "GHI_w/m2": "133", "producer": "Climate", "date": "04/01/2023"}
Message published successfully. Data: {"latitude": "-38.231", "longitude": "147.172", "air_temperature_celci

Message published successfully. Data: {"latitude": "-37.642", "longitude": "149.263", "air_temperature_celcius": "20", "relative_humidity": "55.8", "windspeed_knots": "10.5", "max_wind_speed": "15.9", "precipitation ": " 0.01G", "GHI_w/m2": "163", "producer": "Climate", "date": "31/01/2023"}
Message published successfully. Data: {"latitude": "-37.634", "longitude": "149.237", "air_temperature_celcius": "16", "relative_humidity": "48.4", "windspeed_knots": "8.1", "max_wind_speed": "15.9", "precipitation ": " 0.00G", "GHI_w/m2": "139", "producer": "Climate", "date": "01/02/2023"}
Message published successfully. Data: {"latitude": "-37.384", "longitude": "149.336", "air_temperature_celcius": "16", "relative_humidity": "48.9", "windspeed_knots": "5.4", "max_wind_speed": "8.9", "precipitation ": " 0.00I", "GHI_w/m2": "138", "producer": "Climate", "date": "02/02/2023"}
Message published successfully. Data: {"latitude": "-37.389", "longitude": "149.311", "air_temperature_celcius": "18", "rela

Message published successfully. Data: {"latitude": "-38.141", "longitude": "143.183", "air_temperature_celcius": "23", "relative_humidity": "54.1", "windspeed_knots": "5.7", "max_wind_speed": "14", "precipitation ": " 0.00I", "GHI_w/m2": "190", "producer": "Climate", "date": "01/03/2023"}
Message published successfully. Data: {"latitude": "-36.952", "longitude": "144.972", "air_temperature_celcius": "21", "relative_humidity": "57.3", "windspeed_knots": "5.4", "max_wind_speed": "9.9", "precipitation ": " 0.00I", "GHI_w/m2": "169", "producer": "Climate", "date": "02/03/2023"}
Message published successfully. Data: {"latitude": "-34.965", "longitude": "142.031", "air_temperature_celcius": "20", "relative_humidity": "62.6", "windspeed_knots": "10.1", "max_wind_speed": "15", "precipitation ": " 0.00I", "GHI_w/m2": "153", "producer": "Climate", "date": "03/03/2023"}
Message published successfully. Data: {"latitude": "-37.978", "longitude": "145.623", "air_temperature_celcius": "21", "relative

KeyboardInterrupt: 