## Streaming

Perform the data analysis in a “streaming” manner (treat data as streaming). 

Use Kafka as a message broker and write a custom producer (reading data from files) and a custom consumer
(for processing data). 

Since data is (with few exceptions, most likely errors) ordered by the “summons number” you can assume that the lines in CSV files are in chronological order
(important for producing Kafka messages). 

Decide if using different topics can help you. 

Show rolling descriptive statistics (mean, standard deviation, …, think of at least three more) for all data, boroughs, and for the 10 most interesting streets (with highest numbers of tickets overall, or by your qualified choice). 

For the same data, choose, implement, and apply a stream clustering algorithm (preferably spatial clustering) of your choice [7].

Note: stream processing is better not performed in the Arnes cluster.

In [150]:
import confluent_kafka as kafka, socket
import os, socket, uuid, json
import pandas as pd
import faust, time

In [151]:
base_path = '../datasets/original_data/'

csv_files = [f for f in os.listdir(base_path) if f.endswith('.csv')]
csv_files.sort()

street_locations = pd.read_csv(base_path + 'street_locations.csv')

In [152]:
def preprocess_chunk(chunk):
    chunk = chunk.dropna(subset=['summons_number'])
    chunk = chunk.drop_duplicates()

    chunk['street_code'] = chunk['street_code1'].where(chunk['street_code1'] != 0, chunk['street_code2'].where(chunk['street_code2'] != 0, chunk['street_code3'])).astype("string")
    chunk['street_code'] = chunk['street_code'].replace('0', pd.NA)
    chunk.drop(['street_code1', 'street_code2', 'street_code3'], axis=1, inplace=True)
    
    chunk['issue_date'] = pd.to_datetime(chunk['issue_date'], format="mixed")
    
    chunk['violation_time'] = chunk['violation_time'].astype(str)
    # add M to the end of the time to read it from 12 hour format
    chunk['violation_time'] = chunk["violation_time"].str.upper() + "M"
    # replace the values that starts with 00 to 12
    chunk['violation_time'] = chunk['violation_time'].str.replace(r'^00', '12', regex=True)
    # convert the time to 24 hour format
    chunk['violation_time'] = pd.to_datetime(chunk["violation_time"], format="%I%M%p", errors="coerce")
    # combine the date and time
    chunk['issue_date'] = chunk["issue_date"].dt.strftime('%Y-%m-%d') + ' ' + chunk["violation_time"].dt.strftime('%H:%M:%S')
    
    chunk = chunk.dropna(subset=['issue_date'])
    chunk = chunk.drop(["violation_time"], axis=1)

    #chunk['vehicle_expiration_date'] = pd.to_datetime(chunk['vehicle_expiration_date'], format='%Y%m%d', errors='coerce')

    chunk['time_first_observed'] = chunk['time_first_observed'].astype(str)
    chunk['time_first_observed'] = chunk["time_first_observed"].str.upper() + "M"
    chunk['time_first_observed'] = chunk['time_first_observed'].str.replace(r'^00', '12', regex=True)
    chunk['time_first_observed'] = pd.to_datetime(chunk['time_first_observed'], format='%I%M%p', errors='coerce')

    # replace 0 with NaN
    chunk['date_first_observed'] = chunk['date_first_observed'].replace('0', pd.NaT)
    # replace 0001-01-03T12:00:00.000 with NaN
    chunk['date_first_observed'] = chunk['date_first_observed'].replace('0001-01-03T12:00:00.000', pd.NaT)

    chunk['date_first_observed'] = pd.to_datetime(chunk['date_first_observed'], format='%Y%m%d', errors='coerce')

    # merge the date and time
    chunk['date_first_observed'] = chunk["date_first_observed"].dt.strftime('%Y-%m-%d') + ' ' + chunk["time_first_observed"].dt.strftime('%H:%M:%S')
    chunk = chunk.drop(["time_first_observed"], axis=1)

    # translate the county names to the borough names
    county_to_borough = {
        "BRONX": "BX", # Bronx
        "BX": "BX",
        "Bronx": "BX",
        "BRONX": "BX",
        "BK": "K", # Brooklyn known as Kings
        "K": "K",
        "Kings": "K",
        "KINGS": "K",
        "KING": "K",
        "Q": "Q", # Queens
        "QN": "Q",
        "Qns": "Q",
        "QUEEN": "Q",
        "QUEENS": "Q",
        "QNS": "Q",
        "QU": "Q",
        "NY": "NY", # Manhattan known as New York
        "MN": "NY",
        "MAN": "NY",
        "NEW Y": "NY",
        "NEWY": "NY",
        "NYC": "NY",
        "ST": "R", # Staten Island known as Richmond
        "R": "R",
        "Rich": "R",
        "RICH": "R",
        "RICHM": "R",
        "RC": "R",
        "MH": "NY",
        "MS": "NY",
        "N": "NY",
        "P": "NY",
        "PBX": "NY",
        "USA": "NY",
        "VINIS": "NY",
        "A": pd.NA,
        "F": pd.NA,
        "ABX": pd.NA,
        "108": pd.NA,
        "103": "R", # Staten Island zip code
        "00000": pd.NA,
        "K   F": pd.NA,
    }

    chunk['violation_county'] = chunk['violation_county'].map(county_to_borough)

    borough_to_code = {
    'NY': 1,
    'BX': 2,
    'K': 3,
    'Q': 4,
    'R': 5
    }

    chunk['violation_county'] = chunk['violation_county'].map(borough_to_code)

    # drop the rows that have NaN in the violation_county
    chunk = chunk.dropna(subset=['violation_county'])
    
    # update street names with real ones
    chunk['street_name'] = street_locations.loc[street_locations['street_code'].isin(chunk['street_code']), 'street_name'].values
    
    chunk['latitude'] = street_locations.loc[street_locations['street_code'].isin(chunk['street_code']), 'latitude'].values
    chunk['longitude'] = street_locations.loc[street_locations['street_code'].isin(chunk['street_code']), 'longitude'].values

    chunk = chunk.dropna(subset=['latitude', 'longitude'])

    return chunk

In [153]:
producer = kafka.Producer({'bootstrap.servers': "localhost:29092",
                  'client.id': socket.gethostname()})
                  
consumer = kafka.Consumer({'bootstrap.servers': "localhost:29092",
                           'client.id': socket.gethostname(),
                           'group.id': 'test_group', 
                           'auto.offset.reset': 'earliest'})

topic = "parking_violations"

%4|1723198788.786|TERMINATE|cristian-pc#producer-13| [thrd:app]: Producer terminating with 8 messages (8689 bytes) still in queue or transit: use flush() to wait for outstanding message delivery


In [154]:
%%writefile parking_violations.py

import faust

class ParkingViolation(faust.Record, validation=True):
    summons_number: int
    plate_id: str
    registration_state: str
    plate_type: str
    violation_code: str
    vehicle_body_type: str
    vehicle_make: str
    issuing_agency: str
    street_code: str
    vehicle_expiration_date: str
    violation_location: str
    violation_precinct: str
    issuer_precinct: str
    issuer_code: str
    issuer_command: str
    issuer_squad: str
    violation_county: str
    violation_in_front_of_or_opposite: str
    house_number: str
    street_name: str
    intersecting_street: str
    date_first_observed: str
    law_section: str
    sub_division: str
    violation_legal_code: str
    days_parking_in_effect: str
    from_hours_in_effect: str
    to_hours_in_effect: str
    vehicle_color: str
    unregistered_vehicle: str
    vehicle_year: str
    meter_number: str
    feet_from_curb: str
    violation_post_code: str
    violation_description: str
    no_standing_or_stopping_violation: str
    hydrant_violation: str
    double_parking_violation: str
    latitude: str
    longitude: str

Overwriting parking_violations.py


In [155]:
%%writefile stream_stats.py

class StreamStats():
    def __init__(self):
        self.count = 0
        self.sum = 0
        self.squared_sum = 0
        self.min = 1
        self.max = 1
        self.mean = 0.0
        self.std = 0.0

    def update(self, value):
        self.count += 1

        self.sum += value
        self.squared_sum += value ** 2
        self.mean = self.sum / self.count

        self.min = min(self.min, value)
        self.max = max(self.max, value)

        self.std = (self.squared_sum / self.count - self.mean ** 2) ** 0.5

    def clear(self):
        self.count = 0
        self.sum = 0
        self.squared_sum = 0
        self.min = 1
        self.max = 1
        self.mean = 0.0
        self.std = 0.0

    def __str__(self):
        return f"mean: {self.mean} +/- {self.std}, min: {self.min}, max: {self.max}"

# Update the stats for each year, month, day, and hour
class StreamDateStats():
    def __init__(self):
        self.year = StreamStats()
        self.month = StreamStats()
        self.day = StreamStats()

        self.year_count = 0
        self.month_count = 0
        self.day_count = 0

        self.total_count = 0

    def increase(self):
        self.year_count += 1
        self.month_count += 1
        self.day_count += 1

        self.total_count += 1

    def reset_year(self):
        self.year.update(self.year_count)
        self.year_count = 0

    def reset_month(self):
        self.month.update(self.month_count)
        self.month_count = 0

    def reset_day(self):
        self.day.update(self.day_count)
        self.day_count = 0

    def clear_day(self):
        self.day.clear()
        self.day_count = 0

    def clear_month(self):
        self.month.clear()
        self.month_count = 0
        
class CounterDateStats():
    def __init__(self):
        self.counter = {}

    def increase(self, key):
        if key in [None, "None", "nan"]:
            return

        if key not in self.counter:
            self.counter[key] = StreamDateStats()

        self.counter[key].increase()

    def items(self):
        return sorted(self.counter.items(), key=lambda x: x[1].total_count, reverse=True)

    def most_commons(self, n):
        return sorted(self.counter.items(), key=lambda x: x[1].total_count, reverse=True)[:n]

    def reset_year(self):
        for key in self.counter:
            self.counter[key].reset_year()

    def reset_month(self):
        for key in self.counter:
            self.counter[key].reset_month()

    def reset_day(self):
        for key in self.counter:
            self.counter[key].reset_day()

    def clear_month(self):
        for key in self.counter:
            self.counter[key].clear_month()

    def clear_day(self):
        for key in self.counter:
            self.counter[key].clear_day()

Overwriting stream_stats.py


In [156]:
%%writefile faust_app.py

from typing import List
import faust
import numpy as np
import math
from datetime import datetime
from parking_violations import ParkingViolation
from stream_stats import StreamStats, StreamDateStats, CounterDateStats
from collections import Counter

topic = "parking_violations"

app = faust.App(topic, broker='kafka://localhost:29092')
print(f"App is {app}")

violations_topic = app.topic(topic, value_type=ParkingViolation)

@app.agent(violations_topic)
async def process_violations(violations: List[ParkingViolation]):
    all_data_stats = StreamDateStats()
    borough_stats = CounterDateStats()
    streets_stats = CounterDateStats()

    tmp_all_data_stats = StreamDateStats()
    tmp_borough_stats = CounterDateStats()
    tmp_streets_stats = CounterDateStats()

    current_year = None
    current_month = None
    current_day = None

    async for violation in violations:
        violation.issue_date = datetime.strptime(violation.issue_date, '%Y-%m-%d %H:%M:%S')

        if current_year is None:
            current_year = violation.issue_date.year
            current_month = violation.issue_date.month
            current_day = violation.issue_date.day

        # If a new day starts, reset the day
        if violation.issue_date.day != current_day:
            tmp_all_data_stats.reset_day()
            tmp_borough_stats.reset_day()
            tmp_streets_stats.reset_day()

            all_data_stats.reset_day()
            borough_stats.reset_day()
            streets_stats.reset_day()

            current_day = violation.issue_date.day

        # If a new month starts, print the daily stats of the previous month (mean of violations per day) and clear the stats
        if violation.issue_date.month != current_month:
            tmp_all_data_stats.reset_month()
            tmp_borough_stats.reset_month()
            tmp_streets_stats.reset_month()

            all_data_stats.reset_month()
            borough_stats.reset_month()
            streets_stats.reset_month()

            print(f"Daily stats of month {current_month} of year {current_year}:")
            print(f"- Overall stats: total: {tmp_all_data_stats.total_count}, {tmp_all_data_stats.day}")
            print("- Borough stats:")
            for borough, stats in tmp_borough_stats.items():
                print(f".      {borough} stats: total: {stats.total_count}, {stats.day}")
            print("- Most common streets:")
            for idx, street in enumerate(tmp_streets_stats.most_commons(10)):
                print(f".      Street {idx + 1} ({street[0]}) stats: total: {street[1].total_count}, {street[1].day}")
            print()

            # clear the rolling stats
            tmp_all_data_stats.clear_day()
            tmp_borough_stats.clear_day()
            tmp_streets_stats.clear_day()

            current_month = violation.issue_date.month

        # if a new year starts, print the stats of the previous year (mean of violations per month) and clear the stats
        if violation.issue_date.year != current_year:
            tmp_all_data_stats.reset_year()
            tmp_borough_stats.reset_year()
            tmp_streets_stats.reset_year()

            all_data_stats.reset_year()
            borough_stats.reset_year()
            streets_stats.reset_year()

            print(f"Monthly stats of year {current_year}:")
            print(f"- Overall stats: total: {tmp_all_data_stats.total_count}, {tmp_all_data_stats.month}")
            print("- Borough stats:")
            for borough, stats in tmp_borough_stats.items():
                print(f".      {borough} stats: total: {stats.total_count}, {stats.month}")
            print("- Most common streets:")
            for idx, street in enumerate(tmp_streets_stats.most_commons(10)):
                print(f".      Street {idx + 1} ({street[0]}) stats: total: {street[1].total_count}, {street[1].month}")
            print()

            # clear the rolling stats
            tmp_all_data_stats.clear_month()
            tmp_borough_stats.clear_month()
            tmp_streets_stats.clear_month()

            current_year = violation.issue_date.year

        # print the overall stats of the data every 2000000 records
        if all_data_stats.total_count % 2000000 == 0 and all_data_stats.total_count != 0:
            # print overall data stats
            print(f"Overall stats from beginning:")
            print("- Overall stats: total: {all_data_stats.total_count}")
            print(f".      yearly: {all_data_stats.year}")
            print(f".      monthly: {all_data_stats.month}")
            print(f".      daily: {all_data_stats.day}")
            print("- Borough stats:")
            for borough, stats in tmp_borough_stats.items():
                print(f".      {borough} stats: {stats.total_count}")
                print(f".             yearly: {stats.year}")
                print(f".             monthly: {stats.month}")
                print(f".             daily: {stats.day}")
            print("- Most common streets:")
            for idx, street in enumerate(tmp_streets_stats.most_commons(10)):
                print(f".      Street {idx + 1} ({street[0]}) stats: total: {street[1].total_count}")
                print(f".             yearly: {street[1].year}")
                print(f".             monthly: {street[1].month}")
                print(f".             daily: {street[1].day}")
            print()

        # increase the stats with current violation
        all_data_stats.increase()
        borough_stats.increase(violation.violation_county)
        streets_stats.increase(violation.street_code)

        tmp_all_data_stats.increase()
        tmp_borough_stats.increase(violation.violation_county)
        tmp_streets_stats.increase(violation.street_code)

Overwriting faust_app.py


Data is in chronological ordered only by issue_date, not by violation_time. So no stats on the time.

In [None]:
total_count = 0

for csv_file in csv_files:
    df = pd.read_csv(base_path + csv_files[0], chunksize=10000)
    for chunk in df:
        chunk = preprocess_chunk(chunk)
        for index, line in chunk.iterrows():
            year = pd.to_datetime(line['issue_date']).year
            if year < 2013 or year > 2024:
                continue

            record_key = str(uuid.uuid4())
            record_value = line.to_dict()
            producer.produce(topic, key=record_key, value=json.dumps(record_value))
            producer.poll(0)

            print(f"Produced {total_count} records")
            print(f"Date: {line['issue_date']}")
            total_count += 1
    del df

producer.flush()
consumer.close()