In [82]:
#Config APIs

import os
import json
import requests
from dotenv import load_dotenv

script_dir = os.getcwd()
config_path = (script_dir + '/pipeline-climate/ignore/config.json')

with open(config_path) as config_file:
    config = json.load(config_file)
    APIs = config.get('APIs').rstrip('/')

env_path = os.path.join(APIs, ".env")
load_dotenv(dotenv_path=env_path)

api_keys ={
    "openweather": os.getenv("OPENWEATHER_API_KEY", ""),
    "weatherapi": os.getenv("WEATHERAPI_API_KEY", ""),
    "weatherbit": os.getenv("WEATHERBIT_API_KEY", ""),
    "openaq": os.getenv("OPENAQ_API_KEY", ""),
    "bigdata": os.getenv("BIGDATA_API_KEY", ""),
}

api_configs = {
    "openweather": {
        "url": "http://api.openweathermap.org/data/2.5/weather",
        "params": {
            "q": "", 
            "appid": api_keys["openweather"],
            "units": "metric"
        }
    },
    "weatherapi": {
        "url": "http://api.weatherapi.com/v1/current.json",
        "params": {
            "q": "",
            "key": api_keys["weatherapi"]
        }
    },
    "weatherbit": {
        "url": "https://api.weatherbit.io/v2.0/current",
        "params": {
            "city": ""  ,
            "key": api_keys["weatherbit"]
        }
    },
    "openaq": {
        "url": "https://api.openaq.org/v2/locations",
        "params": {
            "city": "",
            "key": api_keys["openaq"]
        }
    },
    "bigdata": {
        "url": "https://api.bigdatacloud.net/data/reverse-geocode-client",
        "params": {
            "latitude": "",
            "longitude": "",
            "key": api_keys["bigdata"]
        }
    }
}

param_info = {
    'q': 'location'
}

#Reading data 
def read_api_data(api_name, *args):
    """Request of data based in wanted API"""
    if api_name in api_configs:
        config = api_configs[api_name]
        
        params = config["params"].copy()
        params_keys = list(params.keys())
        
        for i, key in enumerate(params_keys):
            if i < len(args):
                params[key] = args[i]

        response = requests.get(config["url"], params=params)

        if response.status_code == 200:
            return response.json()
        else:
            print(f"Error to request data from API: {api_name}: {response.status_code}")
    else:
        print(f"API '{api_name}' not found")
        return None


In [83]:
#Kafka config

from confluent_kafka import Producer
from confluent_kafka.admin import AdminClient

prod_conf = {
    'bootstrap.servers': 'localhost:9092, localhost:9094',
    'client.id': 'py-producer'
}

prod = Producer(**prod_conf) 

def msg_report(err, msg):
    if err is not None:
        print(f'Msg error: {msg} - {err}')
    else:
        print(f'Msg delivered to topic: {msg.topic()}, partition: [{msg.partition()}]')


admin_client = AdminClient({'bootstrap.servers': 'localhost:9092, localhost:9094'})

message_count = 0
msg_limit = 50


def create_partitions(topic, partition_count):
    fs = admin_client.alter_partitions([{
        'topic':topic,
        'new_count': partition_count
    }])
    for topic, f in fs.items():
        try:
            f.result()
            print(f'Partitions from topic: {topic} increased to: {partition_count}.')
        except Exception as e:
            print(f'Error in alter partitions from topic {topic}: {str(e)}.')

def get_partitions(topic):
    topic_metadata = admin_client.list_topics(topic)
    if topic_metadata.topics.get(topic):
        return len(topic_metadata.topics[topic].partitions)
    return 0

In [86]:
#Code execution
def produce_msg(topic, data):
    global message_count

    js_data = json.dumps(data).encode('utf-8')
    try:
        prod.produce(topic, key=str(data.get('id', 'unknown')), value = js_data)
        prod.poll(0)
        message_count += 1

        if message_count >= msg_limit:
            current = get_partitions(topic)
            create_partitions(topic, current + 1)
            message_count = 0
    except Exception as e:
        print(f'Error producing message: {str(e)}')
        
#def user_input():

            
def main():
    while True:
        options = [k for k,v in api_configs.items()]
        api_name = input(f"Options: {options}\n\nWrite the name API wanted or type 'quit' to exit: ").strip().lower()

        if api_name == 'quit'.lower():
            print(f'Exiting...')
            break

        if api_name in api_configs:
            params = []
            ocult_params = ["appid","key","units"]
            for param in api_configs[api_name]["params"]:
                if param not in ocult_params:
                    param_desc = param_info.get(param, param)
                    param_input = input(f"Write the {param_desc}").strip()
                    params.append(param_input)
        else:
            print(f'API not recognized: {api_name}')
            continue
        data = read_api_data(api_name, *params)

        if data:
            print(f'Getting data from {api_name}')
            global topic
            topic = api_name
            produce_msg(topic, data)
        prod.flush()
        
if __name__ == '__main__':
    main()

Getting data from openaq
Exiting...


In [None]:
#AWS S3 storage
import json
import os
import boto3
from confluent_kafka import Consumer, KafkaException
from dotenv import load_dotenv

cons_config = {
    'bootstrap.servers': 'localhost:9092, localhost:9094',
    'group.id': 's3-consumer',
    'auto.offset.reset': 'earliest'
}

cons = Consumer(cons_config)
cons.subscribe([topic])


script_dir = os.getcwd()
config_path = (script_dir + '/pipeline-climate/ignore/config.json')

with open(config_path) as config_file:
    config = json.load(config_file)
    aws = config.get('AWS').rstrip('/')
env_file = os.path.join(aws,'.env')
load_dotenv(dotenv_path=env_file)

s3_client = boto3.client(
    's3',
    aws_access_key_id = os.getenv('AWS_ACCESS_KEY_ID'),
    aws_secret_access_key = os.getenv('AWS_SECRET_ACCESS_KEY'),
    region_name = os.getenv('AWS_REGION_NAME') 
)

def upload_s3(file_name, data):
    bucket_name = 's3bucketsz'

    try:
        s3_client.put_object(Bucket=bucket_name, Key=file_name, Body=data)
        print(f'Uploading {file_name} to S3')
    except Exception as e:
        print(f'Error uploading {file_name} to S3: {str(e)}')

def consumer_msg():
    i = 0
    while True:
        try:
            msg = cons.poll(1.0)
            if msg is None:
                continue
            if msg.error():
                raise KafkaException(msg.error())
            
            file_name = f'data/{topic}-{i+1}.json'

            data = msg.value().decode('utf-8')

            upload_s3(file_name, data)
            i += 1

        except KeyboardInterrupt:
            break
        except Exception as e:
            print(f'Error in upload to S3: {str(e)}')

    cons.close()

if __name__ == '__main__':
    consumer_msg()