In [None]:
import json
import time
import logging
import socket
from datetime import datetime
import requests
from confluent_kafka import Producer

KAFKA_BROKER = 'broker:9092'
TRANSACTION_TOPIC = 'streaming'
LAG = 0.5


def create_producer():
    try:
        producer = Producer({
            "bootstrap.servers": KAFKA_BROKER,
            "client.id": socket.gethostname(),
            "enable.idempotence": True,
            "batch.size": 64000,
            "linger.ms": 10,
            "acks": "all",
            "retries": 5,
            "delivery.timeout.ms": 1000
        })
    except Exception as e:
        logging.exception("Nie mogę utworzyć producenta")
        producer = None
    return producer

def fetch_data():
    url = 'http://api.citybik.es/v2/networks/velib'
    response = requests.get(url)

    if response.status_code != 200:
        print("Request failed. Status code:", response.status_code)
        return None

    data = response.json()
    stations = data['network']['stations']
    
    # Ekstrahowanie danych
    station_data = []
    for station in stations:
        station_info = {
            'Station Name': station['name'],
            'Latitude': station['latitude'],
            'Longitude': station['longitude'],
            'Number of empty slots': station.get('empty_slots', 0),
            'Number of free bikes': station.get('free_bikes', 0),
            'Number of e-bikes': station['extra'].get('ebikes', 0),
            'Timestamp': station.get('timestamp', 0).split('.')[0].split('T')[-1]
        }
        station_data.append(station_info)
    
    return station_data

_id = 0
producer = create_producer()
previous_data = None

if producer is not None:
    while True:
        station_data = fetch_data()
        if station_data is not None:
            stations_json = json.dumps(station_data).encode("utf-8")

            # Sprawdzanie zmian w danych
            if previous_data is None or stations_json != previous_data:
                previous_data = stations_json
                
                # Tworzenie rekordu
                current_time = datetime.utcnow().isoformat()
                record = {
                    "id": _id,
                    "data": station_data,
                    "current_time": current_time
                }
                record = json.dumps(record).encode("utf-8")
                
                # Wysyłanie danych do Kafki
                producer.produce(topic=TRANSACTION_TOPIC, value=record)
                producer.flush()
                print("Sent data to Kafka")
                _id += 1    
            time.sleep(LAG)
        else:
            print("Failed to fetch data. Retrying...")
            time.sleep(60)

Request failed. Status code: 429
Failed to fetch data. Retrying...
Sent data to Kafka
Request failed. Status code: 429
Failed to fetch data. Retrying...


In [None]:

    %%file producer1.py
import json
import time 
import logging
import socket
from datetime import datetime
from numpy.random import uniform, choice, randn
from random import random as r
import numpy as np
from confluent_kafka import Producer

KAFKA_BROKER = 'broker:9092'
TRANSACTION_TOPIC = 'transactions'
LAG = 0.5
PROBABILITY_OUTLIER = 0.05
def create_producer():
    try:
        producer = Producer({​​
        "bootstrap.servers":KAFKA_BROKER,
        "client.id": socket.gethostname(),
        "enable.idempotence": True,
        "batch.size": 64000,
        "linger.ms":10,
        "acks": "all",
        "retries": 5,
        "delivery.timeout.ms":1000
        }​​)
    except Exception as e:
        logging.exception("nie mogę utworzyć producenta")
        producer = None
    return producer

_id = 0 
producer = create_producer()
if producer is not None:
    while True:
        if r() <= PROBABILITY_OUTLIER:
            X_test = uniform(low=-4, high=4, size=(1,2))
        else:
            X = 0.3 * randn(1,2)
            X_test = (X + choice(a=[2,-2], size=1, p=[0.5, 0.5]))
        X_test = np.round(X_test, 3).tolist()
        current_time = datetime.utcnow().isoformat()
        record = {​​
        "id": _id,
        "data": X_test,
        "current_time" : current_time
        }​​
        record = json.dumps(record).encode("utf-8")
        producer.produce(topic= TRANSACTION_TOPIC, value=record)
        producer.flush()
        _id +=1 
        time.sleep(LAG)