# Data Collection and Cleaning (A3 and A4)

In [1]:
import json
from datetime import datetime
from Kafka import KafkaWriter, KafkaReader
from OpenWeatherMap import OpenWeatherMap
import datetime
import time
from CheckDuplicates import check_if_duplicate, check_if_duplicate_in_list
import uuid

In [2]:
openWeatherMap = OpenWeatherMap(api_key="2265d775d28c3c75d22edcb7126ca08f") # enter your api key here
kafka_prod1 = KafkaWriter(bootstrap_servers='kafka-1,kafka-2', topic='weather.forecast.raw') # enter bootstrap servers and topic

def load_locations() -> json:
    """
    Loads 'locations.json' into a json-object and returns it.
    """
    with open('locations.json', mode='r') as file:
        return json.load(file)


def collect_forecast_data() -> None:
    """
    Queries OpenWeatherMap for each location for the 5-day forecast and stores the the returned values in Kafka.
    """
    locs = load_locations()
    dt_format = "%Y%m%d%H%M%S"
    for loc in locs: # for each location
        city = locs[loc]
        data = openWeatherMap.get_forecast(city=city) # query OpenWeatherMap
        for element in data['list']: # for each forecast
            element['city'] = {'name': loc, 'coords': {'latitude': city['latitude'], 'longitude': city['longitude']}} # add city (name + coords)
            element['fetched_at'] = int(datetime.datetime.now().strftime(dt_format)) # create own timestamp and add it to the forecast
            key = uuid.uuid4().hex
            kafka_prod1.store(message_key=key, data=element) # publish forecast data with unique key to Kafka topic

In [3]:
kafka_con1 = KafkaReader(bootstrap_servers='kafka-1,kafka-2', topic='weather.forecast.raw', group_id='con_group1', client_id='con1', auto_offset_reset='latest')
kafka_prod2 = KafkaWriter(bootstrap_servers='kafka-1,kafka-2', topic='weather.forecast.clean')

def clean_forecast_data() -> None:
    """
    Consumes new raw forecast data from corresponding topic, compares it to the data that is already stored in the "cleaned" topic and adds missing forecasts to the "cleaned" topic.
    This process leads to the "cleaned" topic containing no duplicates.
    """
    raw_data = kafka_con1.retrieve() # consume new raw data
    old_clean_data = KafkaReader(bootstrap_servers='kafka-1,kafka-2', topic='weather.forecast.clean', group_id=uuid.uuid4().hex, client_id=uuid.uuid4().hex).retrieve() # use new consumer in each iteration to always fetch all old "cleaned" data
    new_clean_data = [x for x in raw_data if not check_if_duplicate_in_list(new=x, old=old_clean_data)] # compare data in cleaned topic and new raw data, add only to list if raw forecast not already in cleaned topic
    for element in new_clean_data:
        key = uuid.uuid4().hex
        kafka_prod2.store(message_key=key, data=element) # publish each new (unique) forecast to Kafka topic (cleaned)

In [4]:
while True: # endless loop
    collect_forecast_data() # collect raw data
    clean_forecast_data()  # clean datas
    time.sleep(15*60)  # e.g. every 15 Minutes

KeyboardInterrupt: 