In [1]:
!pip install fastavro
!pip install faker
!pip install numpy
!python generate_avro.py

Collecting fastavro
  Downloading fastavro-1.10.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (5.5 kB)
Downloading fastavro-1.10.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.3 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.3/3.3 MB[0m [31m20.8 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: fastavro
Successfully installed fastavro-1.10.0
Collecting faker
  Downloading faker-37.0.0-py3-none-any.whl.metadata (15 kB)
Downloading faker-37.0.0-py3-none-any.whl (1.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.9/1.9 MB[0m [31m19.0 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: faker
Successfully installed faker-37.0.0
python3: can't open file '/content/generate_avro.py': [Errno 2] No such file or directory


## DATA GENERATOR:

Two Avro Schemas defined for the two data feeds selected:
*   Passenger request
*   Driver availability

At first we chose several spanish cities to test the generator, but afterwards we saw that the results weren't very realistic, so we focused on Madrid area.


MADRID

In [2]:
import random
import uuid
import json
import fastavro
from datetime import datetime
from geopy.distance import geodesic

In [3]:
# AVRO schema for passenger requests
passenger_request_schema = {
    "type": "record",
    "name": "PassengerRequest",
    "fields": [
        {"name": "id", "type": "string"},
        {"name": "vendor_id", "type": "int"},
        {"name": "pickup_datetime", "type": "string"},
        {"name": "passenger_count", "type": "int"},
        {"name": "pickup_longitude", "type": "double"},
        {"name": "pickup_latitude", "type": "double"},
        {"name": "dropoff_longitude", "type": "double"},
        {"name": "dropoff_latitude", "type": "double"},
        {"name": "vehicle_type", "type": "string"},
        {"name": "fare_estimate", "type": "double"}
    ]
}

In [4]:
# AVRO schema for driver availability
driver_availability_schema = {
    "type": "record",
    "name": "DriverAvailability",
    "fields": [
        {"name": "driver_id", "type": "string"},
        {"name": "available_since", "type": "string"},
        {"name": "vehicle_type", "type": "string"},
        {"name": "current_longitude", "type": "double"},
        {"name": "current_latitude", "type": "double"},
        {"name": "availability_status", "type": "string"}
    ]
}

Once the Avro schemas were defined we started to create two main functions for the data generator. However, since we wanted to make it as realistic as possible we had to define the types of vehicles and Madrid coordinates.

In [5]:
# We use the following webpage to define the Madrid area in coordinates format: http://bboxfinder.com/#0.000000,0.000000,0.000000,0.000000
MADRID_LAT_MIN, MADRID_LAT_MAX = 40.30, 40.50
MADRID_LON_MIN, MADRID_LON_MAX = -3.75, -3.60

def random_madrid_location():
    lat = round(random.uniform(MADRID_LAT_MIN, MADRID_LAT_MAX), 6)
    lon = round(random.uniform(MADRID_LON_MIN, MADRID_LON_MAX), 6)
    return lat, lon


# We defined the different vehicle types & an approximate fare.
FARE_CONFIG = {
    "Economy": {"base": 2.5, "per_km": 1.2, "surcharge": 0.5},
    "Standard": {"base": 3.0, "per_km": 1.5, "surcharge": 1.0},
    "Luxury": {"base": 5.0, "per_km": 2.5, "surcharge": 2.0},
}

def calculate_fare(vehicle_type, distance_km):
    config = FARE_CONFIG[vehicle_type]
    return round(config["base"] + (distance_km * config["per_km"]) + config["surcharge"], 2)

Now that we had everything set up, we started to create the two data generators for the two data feeds



1st function: to generate passenger requests which takes pickup location & dropoff location randomly to make it more realistic.

This function contains the id of the request, the vendor_id which handle the request, number of passengers, pickup_datetime, pickup / dropoff location, type of vehicle and the charged amount for the trip


---


2nd function: to generate driver availability which contains the driver_id, availability status, vehicle type, current location of the driver and the availability.



In [6]:
def generate_passenger_request():
    pickup_lat, pickup_lon = random_madrid_location()
    dropoff_lat, dropoff_lon = random_madrid_location()
    distance_km = geodesic((pickup_lat, pickup_lon), (dropoff_lat, dropoff_lon)).km
    vehicle_type = random.choice(list(FARE_CONFIG.keys()))
    fare_estimate = calculate_fare(vehicle_type, distance_km)

    return {
        "id": str(uuid.uuid4()),
        "vendor_id": random.randint(1, 3),
        "pickup_datetime": datetime.utcnow().isoformat(),
        "passenger_count": random.randint(1, 4),
        "pickup_longitude": pickup_lon,
        "pickup_latitude": pickup_lat,
        "dropoff_longitude": dropoff_lon,
        "dropoff_latitude": dropoff_lat,
        "vehicle_type": vehicle_type,
        "fare_estimate": fare_estimate
    }

def generate_driver_availability():
    driver_lat, driver_lon = random_madrid_location()
    return {
        "driver_id": str(uuid.uuid4()),
        "available_since": datetime.utcnow().isoformat(),
        "vehicle_type": random.choice(list(FARE_CONFIG.keys())),
        "current_longitude": driver_lon,
        "current_latitude": driver_lat,
        "availability_status": random.choice(["Available", "Busy"])
    }

To continue, once the data generators were created we had to searilize the data in order to pass the info from JSON to Avro format (we used fastavro for this). As seen in class, this part is key in order to compact the data in a binary form that reduces storage size offering faster streaming and processing.

In [7]:
def serialize_to_avro(data, schema, file_path):
    with open(file_path, "wb") as out_file:
        fastavro.writer(out_file, schema, data)

Finally, we tested the data generators with 10 passenger requests and 5 drivers to see how it was working.

In [9]:
num_requests = 10
num_drivers = 10
passenger_requests = [generate_passenger_request() for _ in range(num_requests)]
driver_availabilities = [generate_driver_availability() for _ in range(num_drivers)]

# In order to save to Avro format
serialize_to_avro(passenger_requests, passenger_request_schema, "passenger_requests.avro")
serialize_to_avro(driver_availabilities, driver_availability_schema, "driver_availability.avro")

# Print results in JSON format:
print("Sample passenger requests data:")
for request in passenger_requests:
    print(json.dumps(request, indent=4))

print("\nSample driver availability data:")
for driver in driver_availabilities:
    print(json.dumps(driver, indent=4))

def serialize_to_json(data, file_path):
    with open(file_path, "w", encoding="utf-8") as json_file:
        json.dump(data, json_file, indent=4)

# In order to save to JSON format
serialize_to_json(passenger_requests, "passenger_requests.json")
serialize_to_json(driver_availabilities, "driver_availability.json")

Sample passenger requests data:
{
    "id": "ff1dd60a-8b93-4a25-8aa1-058bf40671cd",
    "vendor_id": 2,
    "pickup_datetime": "2025-03-16T21:11:25.464139",
    "passenger_count": 2,
    "pickup_longitude": -3.610393,
    "pickup_latitude": 40.336669,
    "dropoff_longitude": -3.703716,
    "dropoff_latitude": 40.488569,
    "vehicle_type": "Economy",
    "fare_estimate": 25.36
}
{
    "id": "cc6e5134-5730-4717-a550-03817b225283",
    "vendor_id": 3,
    "pickup_datetime": "2025-03-16T21:11:25.464346",
    "passenger_count": 1,
    "pickup_longitude": -3.600997,
    "pickup_latitude": 40.345571,
    "dropoff_longitude": -3.72291,
    "dropoff_latitude": 40.33415,
    "vehicle_type": "Standard",
    "fare_estimate": 19.65
}
{
    "id": "f0a608b9-9513-4afc-8ba0-a1119cf28cac",
    "vendor_id": 3,
    "pickup_datetime": "2025-03-16T21:11:25.464534",
    "passenger_count": 3,
    "pickup_longitude": -3.697209,
    "pickup_latitude": 40.389272,
    "dropoff_longitude": -3.73874,
    "dropoff