In [1]:
!pip install pyspark



In [1]:
import logging
from json import dumps
from random import uniform
from time import sleep

from kafka import KafkaProducer

logging.basicConfig(level=logging.INFO, format=" %(levelname)s %(asctime)s: %(message)s")
log = logging.getLogger(__name__)

In [3]:
!curl http://localhost:9092/v3/clusters

curl: (52) Empty reply from server


In [None]:
!curl https://raw.githubusercontent.com/f0xtek/covidcab/master/yellow_tripdata_2020-04.csv -o yellow_tripdata_2020-04.csv

In [5]:
from tqdm import tqdm

def produce(csv_file: str, bootstrap_servers: str, topic: str):
    producer = KafkaProducer(
        security_protocol="PLAINTEXT",
        bootstrap_servers=[bootstrap_servers],
        value_serializer=lambda x: dumps(x).encode('utf-8'),
        acks="all",
        retries = 3
    )
    
    pbar = tqdm(total=52750)

    # открываем файл на чтение
    with open(csv_file, 'r') as data_file:
        # пропускаем заголовок
        header = data_file.readline()
        log.info(f'Header is [{header}]') 
        count = 0
        
        while True:
#             sleep(uniform(0.9, 0.9))
            sleep(uniform(0.01, 0.1)) # эмулируем интервал
#             sleep(uniform(0.0001, 0.0005))
            line = data_file.readline().strip()

            if not line:
                log.info("File ended")
                break

            count += 1
            fields = line.split(',') 

            data = {
                'vendor_id': int(fields[0]),
                'tpep_pickup_datetime': fields[1],
                'tpep_dropoff_datetime': fields[2],
                'passenger_count': int(fields[3]),
                'trip_distance': float(fields[4]),
                'ratecode_id': int(fields[5]),
                'store_and_fwd_flag': fields[6],
                'pulocation_id': int(fields[7]),
                'dolocation_id': int(fields[8]),
                'payment_type': int(fields[9]),
                'fare_amount': float(fields[10]),
                'extra': float(fields[11]),
                'mta_tax': float(fields[12]),
                'tip_amount': float(fields[13]),
                'tolls_amount': float(fields[14]),
                'improvement_surcharge': float(fields[15]),
                'total_amount': float(fields[16]),
                'congestion_surcharge': float(fields[17]),
            }

            producer.send(topic=topic, value=data)
            pbar.update(1)
#             log.debug("Line {}: {}".format(count, line.strip()))
#             log.info(f"Line {count} sent")

In [None]:
BOOTSTRAP_SERVERS = 'localhost:9092'
TOPIC_NAME = 'taxi'
DATA_FILE = 'yellow_tripdata_2020-04.csv'

produce(DATA_FILE, BOOTSTRAP_SERVERS, TOPIC_NAME)

 INFO 2025-02-22 10:35:24,041: <BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connecting> [IPv6 ('::1', 9092, 0, 0)]>: connecting to localhost:9092 [('::1', 9092, 0, 0) IPv6]
 INFO 2025-02-22 10:35:24,042: Probing node bootstrap-0 broker version
 INFO 2025-02-22 10:35:24,061: <BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connecting> [IPv6 ('::1', 9092, 0, 0)]>: Connection complete.
 INFO 2025-02-22 10:35:24,176: Broker version identified as 2.5.0
 INFO 2025-02-22 10:35:24,176: Set configuration api_version=(2, 5, 0) to skip auto check_version requests on startup
  0%|                                                 | 0/52750 [00:00<?, ?it/s] INFO 2025-02-22 10:35:24,218: Header is [VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
]
 INFO 2025-02-22 10:35:24