In [1]:
!pip install Faker fastavro pandas

Collecting Faker
  Downloading faker-37.0.0-py3-none-any.whl.metadata (15 kB)
Collecting fastavro
  Downloading fastavro-1.10.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (5.5 kB)
Downloading faker-37.0.0-py3-none-any.whl (1.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.9/1.9 MB[0m [31m10.8 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading fastavro-1.10.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.3 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.3/3.3 MB[0m [31m30.5 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: fastavro, Faker
Successfully installed Faker-37.0.0 fastavro-1.10.0


- **Passenger Requests and Cancellations**: For passenger requests and cancellations.
- **Driver Availability Updates**: For driver availability.


In [3]:
import random
from faker import Faker
import fastavro
import datetime

## Passenger Requests and Cancellations Schema:

In [4]:
passenger_request_schema = {
    "type": "record",
    "name": "PassengerRequest",
    "fields": [
        {"name": "request_id", "type": "string"},
        {"name": "passenger_id", "type": "string"},
        {"name": "passenger_name", "type": "string"},
        {"name": "pickup_location", "type": "string"},
        {"name": "dropoff_location", "type": "string"},
        {"name": "request_time", "type": "string"},
        {
            "name": "status",
            "type": {
                "type": "enum",
                "name": "Status",
                "symbols": ["requested", "accepted", "canceled", "completed"]
            }
        },
        {
            "name": "cancellation_time",
            "type": ["null", "string"],  # Correct use of null as a type in Python
            "default": None  # Use None (Python's null equivalent)
        },
        {"name": "ride_duration", "type": "int", "default": 30},
        {"name": "vehicle_type", "type": "string", "default": "standard"},
        {"name": "estimated_eta", "type": "string", "default": "20 minutes"},
        {
            "name": "demand_level",
            "type": {
                "type": "enum",
                "name": "DemandLevel",
                "symbols": ["High", "Medium", "Low"]
            },
            "default": "Medium"
        },
        {"name": "price", "type": "float"},
        {"name": "driver_rating", "type": "float"},
        {"name": "passenger_rating", "type": "float"},
        {
            "name": "favorite_location",
            "type": ["null", "string"],  # Correct use of null as a type in Python
            "default": None  # Use None for nullable fields in Python
        },
        {
            "name": "is_wheelchair_accessible",
            "type": "boolean",
            "default": False  # Use False (Python's boolean equivalent)
        },
        {
            "name": "scheduled_time",
            "type": ["null", "string"],
            "default": None  # Use None for nullable fields
        },
        {
            "name": "multiple_stops",
            "type": ["null", {"type": "array", "items": "string"}],
            "default": None
        },
        {
            "name": "donation_amount",
            "type": ["null", "float"],
            "default": None  # Use None for nullable fields
        },
        {"name": "vehicle_license_plate", "type": "string"}
    ]
}


- **request_id**: A unique ID for the request.
- **passenger_id**: A unique ID for the passenger.
- **passenger_name**
- **pickup_location**: Where the passenger is requesting the ride from.
- **dropoff_location**: The destination of the ride.
- **request_time**: The time when the passenger requests the ride.
- **status**: The status of the ride (requested, accepted, canceled, or completed).
- **cancellation_time**: If the request is canceled, this field records the cancellation time.
- **ride_duration**: The duration of the ride in minutes. The default is set to 30 minutes, but this can be updated based on the ride specifics.
- **vehicle_type**: The type of vehicle requested by the passenger
- **estimated_eta**: estimated time of arrival for drivers  based on current traffic conditions and proximity. The default is set to 20 minutes
- **demand_level**: The level of demand for rides in the area. This helps adjust the pricing or availability of vehicles based on whether it's high, medium, or low demand.
- **price**: The price of the ride based on the vehicle type, distance, and demand
- **driver_rating**: The rating given to the driver, which could be between 3.5 and 5 stars. A low rating (below 3.5) could cause the ride to be canceled.
- **passenger_rating**: The rating given by the driver to the passenger. It typically ranges from 4 to 5 stars, as passengers are generally rated positively.
- **favorite_location**: A location the passenger frequently uses (e.g., "Home", "Work"). This field is optional, and if not set, it defaults to None.
- **is_wheelchair_accessible**: A flag indicating whether the passenger requested a wheelchair-accessible vehicle. This is a boolean value (True or False).
- **scheduled_time**: The time at which the passenger wants to schedule the ride in advance. If not scheduled, it defaults to None. This allows passengers to set a pickup time in the future.
- **multiple_stops**: An array of strings representing multiple stops during the ride. This allows for additional destinations to be added along the way.
- **donation_amount**: The optional donation amount the passenger can choose to add to the fare, typically in increments of 5, 10, or 15. This field is nullable (None by default), allowing passengers who do not wish to donate to leave it empty.
- **vehicle_license_plate**


## Driver Availability Updates Schema

In [5]:
driver_availability_schema = {
    "type": "record",
    "name": "DriverAvailability",
    "fields": [
        {"name": "driver_id", "type": "string"},
        {"name": "driver_name", "type": "string"},
        {
            "name": "status",
            "type": {
                "type": "enum",
                "name": "Status",
                "symbols": ["available", "unavailable"]
            }
        },
        {"name": "update_time", "type": "string"},
        {"name": "driver_rating", "type": "float"},
        {
            "name": "is_wheelchair_accessible",
            "type": "boolean",
            "default": False  # Flag for wheelchair accessible vehicles
        },
        {"name": "vehicle_license_plate", "type": "string"}
    ]
}


- **driver_id**: A unique ID for the driver.
- **driver_name**
- **status**: The status of the driver (either "available" or "unavailable").
- **update_time**: The time when the driver’s availability status is updated.
- **driver_rating**: The rating given to the driver by passengers. This typically ranges from 1 to 5, with a higher rating indicating better service.
- **is_wheelchair_accessible**: A flag indicating whether the driver’s vehicle is wheelchair accessible. The default value is set to False, meaning the vehicle is not accessible unless explicitly marked as True.
- **vehicle_license_plate**

## Generate synthetic data

In [6]:
import random
from faker import Faker
import datetime
import fastavro
import json

In [7]:
# Initialize Faker to generate random data
fake = Faker()

In [33]:
# Function to generate realistic time with random offset
def generate_realistic_time():
    now = datetime.datetime.now()
    time_offset = datetime.timedelta(minutes=random.randint(-30, 30))  # Random offset within 30 minutes
    return (now + time_offset).strftime('%Y-%m-%d %H:%M:%S')

# Define price range based on vehicle type
def get_base_vehicle_price_range(vehicle_type):
    price_range_map = {
        "Black/Executive": (100, 150),  # $100 to $150
        "Van XL": (90, 140),  # $90 to $140
        "Van": (80, 120),  # $80 to $120
        "Priority": (70, 110),  # $70 to $110
        "Baby": (60, 100),  # $60 to $100
        "Kids": (50, 90),  # $50 to $90
        "Comfort": (40, 80),  # $40 to $80
        "Pet": (35, 70),  # $35 to $70
        "Electric": (30, 60),  # $30 to $60
        "Taxi": (20, 30),  # $20 to $30
        "Share": (10, 20),  # $10 to $20
        "Wheelchair": (60, 100)  # Wheelchair accessible vehicle price range
    }
    return price_range_map.get(vehicle_type, (20, 30))  # Default to Taxi price range if not found

# Function to simulate demand and adjust the price dynamically within the range
def adjust_price_for_demand(base_min, base_max):
    # Simulate a demand factor (between 1.0 and 2.0)
    demand_factor = random.uniform(1.0, 2.0)  # This simulates the demand multiplier

    # Categorize demand level
    if demand_factor >= 1.5:
        demand_level = "High"
    elif demand_factor >= 1.2:
        demand_level = "Medium"
    else:
        demand_level = "Low"

    adjusted_price = random.uniform(base_min, base_max) * demand_factor  # Adjust the price based on demand
    return round(adjusted_price, 2), demand_level  # Round to two decimal places for price

# Function to generate a driver rating (between 3.5 and 5 for most drivers, below 3.5 is rare)
def generate_driver_rating():
    rating = random.uniform(3.5, 5)  # Default range for drivers is 3.5 to 5
    if random.random() < 0.1:  # 10% chance to have a rating below 3.5
        rating = random.uniform(1, 3.5)  # Ratings below 3.5 are rare
    return round(rating, 1)

# Function to generate a passenger rating (between 4 and 5 stars)
def generate_passenger_rating():
    return round(random.uniform(4, 5), 1)  # Random rating between 4 and 5 stars

# Function to generate a passenger request event with multiple stops set to None or empty list
def generate_passenger_request(request_id, driver_rating):
    passenger_id = fake.uuid4()  # Generate a unique passenger ID
    passenger_name = fake.name()  # Generate a random name for the passenger
    pickup_location = fake.city()  # Random pickup location
    dropoff_location = fake.city()  # Random dropoff location
    request_time = generate_realistic_time()  # Generate a more varied time


    # Update probability distribution for the status
    status_probs = [("requested", 0.2), ("accepted", 0.7), ("canceled", 0.1)]
    status = random.choices([x[0] for x in status_probs], [x[1] for x in status_probs])[0]

    cancellation_time = "null"
    if status == "canceled":
        cancellation_time = generate_realistic_time()  # Realistic cancellation time

    # Edge case for long ride duration
    ride_duration = random.randint(5, 60)  # Random ride duration between 5 and 60 minutes
    if random.random() < 0.05:  # 5% chance to simulate a long ride
        ride_duration = 120  # Simulate an anomalously long ride duration (2 hours)

    vehicle_type = random.choice(["Taxi", "Black/Executive", "Van", "Van XL", "Comfort",
                                  "Share", "Electric", "Kids", "Baby", "Priority", "Pet", "Wheelchair"])  # Vehicle type options
    estimated_eta = f"{random.randint(2, 25)} minutes"  # Random ETA between 2 and 25 minutes

    # Get the base price range based on vehicle type
    base_min, base_max = get_base_vehicle_price_range(vehicle_type)
    # Adjust the price based on demand within the given range
    price, demand_level = adjust_price_for_demand(base_min, base_max)

    # Ride completion check based on driver rating
    if driver_rating < 3.5:
        status = "canceled"  # If the driver rating is below 3.5, the ride cannot be completed

    # Generate a passenger rating for the driver (between 4 and 5)
    passenger_rating = generate_passenger_rating()

    # Favorite location
    favorite_location = random.choice([None, "Home", "Work"])

    # Is wheelchair accessible
    is_wheelchair_accessible = random.choice([True, False])

    # Advanced scheduling
    scheduled_time = "null"
    if random.random() < 0.2:
        scheduled_time = generate_realistic_time()  # 20% chance to schedule in advance

    # Multiple stops: Ensure stops are different from pickup and dropoff locations
    stop_locations = set([fake.city() for _ in range(5)])  # Generate a set of random city names for stops
    stop_locations.discard(pickup_location)  # Remove pickup location if present
    stop_locations.discard(dropoff_location)  # Remove dropoff location if present

    # Convert the set to a list before using random.sample
    multiple_stops = random.sample(list(stop_locations), random.randint(1, 2)) if random.random() < 0.5 else None  # 50% chance of having stops

    # Donation option
    donation_amount = random.choice([None, 5, 10, 15])

    # Generate license plate number (realistic format)
    vehicle_license_plate = f"{random.choice(['AB', 'XY', 'GH', 'DC'])}{random.randint(1000, 9999)}"  # Example format

    # Explicitly convert request_id to string
    event = {
        "request_id": str(request_id),
        "passenger_id": passenger_id,
        "passenger_name": passenger_name,  # Add passenger name
        "pickup_location": pickup_location,
        "dropoff_location": dropoff_location,
        "request_time": request_time,
        "status": status,
        "cancellation_time": cancellation_time,
        "ride_duration": ride_duration,
        "vehicle_type": vehicle_type,
        "estimated_eta": estimated_eta,
        "price": price,
        "demand_level": demand_level,
        "driver_rating": driver_rating,
        "passenger_rating": passenger_rating,
        "favorite_location": favorite_location,
        "is_wheelchair_accessible": is_wheelchair_accessible,
        "scheduled_time": scheduled_time,
        "multiple_stops": multiple_stops,  # Can be None or an array
        "donation_amount": donation_amount,
        "vehicle_license_plate": vehicle_license_plate  # Add vehicle license plate
    }
    return event

# Function to generate a driver availability event
def generate_driver_availability(driver_id):
    status = random.choices(
        ["available", "on_trip", "offline"],
        [0.6, 0.3, 0.1]  # 60% chance driver is available, 30% on a trip, 10% offline
    )[0]
    update_time = generate_realistic_time()  # Use realistic time for update time

    # Generate the driver's rating
    driver_rating = generate_driver_rating()

    # Generate driver's name
    driver_name = fake.name()

    # Generate license plate number
    vehicle_license_plate = f"{random.choice(['AB', 'XY', 'GH', 'DC'])}{random.randint(1000, 9999)}"  # Example format

    # Explicitly convert driver_id to string
    event = {
        "driver_id": str(driver_id),# Convert to string
        "driver_name": driver_name,
        "status": status,
        "update_time": update_time,
        "driver_rating": driver_rating,  # Add driver's rating
        "is_wheelchair_accessible": random.choice([True, False]), # Wheelchair accessible vehicles
        "vehicle_license_plate": vehicle_license_plate  # Add vehicle license plate
    }
    return event

# Generate configurable number of events
def generate_events(passenger_count, driver_count):
    passenger_events = [generate_passenger_request(i, generate_driver_rating()) for i in range(passenger_count)]
    driver_availability_events = [generate_driver_availability(i) for i in range(driver_count)]
    return passenger_events, driver_availability_events

# Example usage: Generate 200 passenger requests and 100 driver availability events
passenger_events, driver_availability_events = generate_events(200, 100)

# Serialize the data to JSON format
with open('passenger_requests.json', 'w') as json_file:
    json.dump(passenger_events, json_file, indent=4)

with open('driver_availability.json', 'w') as json_file:
    json.dump(driver_availability_events, json_file, indent=4)

# AVRO Serialization
passenger_request_schema

driver_availability_schema


# Serialize the data to AVRO format
with open('passenger_requests.avro', 'wb') as avro_file:
    fastavro.writer(avro_file, passenger_request_schema, passenger_events)

with open('driver_availability.avro', 'wb') as avro_file:
    fastavro.writer(avro_file, driver_availability_schema, driver_availability_events)


Check the Generated Files and Verify the Data

In [34]:
import json

# Check the first few records of the passenger_requests.json file
with open('passenger_requests.json', 'r') as json_file:
    passenger_data = json.load(json_file)
    print("First 5 Passenger Requests:")
    print(passenger_data[:5])  # Show the first 5 entries

# Check the first few records of the driver_availability.json file
with open('driver_availability.json', 'r') as json_file:
    driver_data = json.load(json_file)
    print("\nFirst 5 Driver Availability Entries:")
    print(driver_data[:5])  # Show the first 5 entries


First 5 Passenger Requests:
[{'request_id': '0', 'passenger_id': '18f07ee7-ad11-4841-9cd0-60b64aa0369c', 'passenger_name': 'Terry Brooks', 'pickup_location': 'Port Matthew', 'dropoff_location': 'New Marystad', 'request_time': '2025-03-16 20:57:42', 'status': 'accepted', 'cancellation_time': 'null', 'ride_duration': 41, 'vehicle_type': 'Van', 'estimated_eta': '11 minutes', 'price': 128.15, 'demand_level': 'Medium', 'driver_rating': 3.5, 'passenger_rating': 4.4, 'favorite_location': 'Work', 'is_wheelchair_accessible': True, 'scheduled_time': 'null', 'multiple_stops': None, 'donation_amount': None, 'vehicle_license_plate': 'AB6569'}, {'request_id': '1', 'passenger_id': 'a6f7f1d8-fcbd-4b36-84ce-292bc4b5c7b5', 'passenger_name': 'Jason Lopez', 'pickup_location': 'Andreamouth', 'dropoff_location': 'Johnfort', 'request_time': '2025-03-16 20:33:42', 'status': 'canceled', 'cancellation_time': 'null', 'ride_duration': 42, 'vehicle_type': 'Taxi', 'estimated_eta': '3 minutes', 'price': 52.43, 'dema

JUST TO SEE IT CLEARLY IN PANDAS (CHANGED FROM JSON FILE TO PANDAS)

In [35]:
import json
import pandas as pd

# Check the first few records of the passenger_requests.json file
with open('passenger_requests.json', 'r') as json_file:
    passenger_data = json.load(json_file)
    print("First 5 Passenger Requests:")
    passenger_df = pd.DataFrame(passenger_data)  # Convert to pandas DataFrame
    display(passenger_df.head())  # Display the first 5 rows in table format

# Check the first few records of the driver_availability.json file
with open('driver_availability.json', 'r') as json_file:
    driver_data = json.load(json_file)
    print("\nFirst 5 Driver Availability Entries:")
    driver_df = pd.DataFrame(driver_data)  # Convert to pandas DataFrame
    display(driver_df.head())  # Display the first 5 rows in table format


First 5 Passenger Requests:


Unnamed: 0,request_id,passenger_id,passenger_name,pickup_location,dropoff_location,request_time,status,cancellation_time,ride_duration,vehicle_type,...,price,demand_level,driver_rating,passenger_rating,favorite_location,is_wheelchair_accessible,scheduled_time,multiple_stops,donation_amount,vehicle_license_plate
0,0,18f07ee7-ad11-4841-9cd0-60b64aa0369c,Terry Brooks,Port Matthew,New Marystad,2025-03-16 20:57:42,accepted,,41,Van,...,128.15,Medium,3.5,4.4,Work,True,,,,AB6569
1,1,a6f7f1d8-fcbd-4b36-84ce-292bc4b5c7b5,Jason Lopez,Andreamouth,Johnfort,2025-03-16 20:33:42,canceled,,42,Taxi,...,52.43,High,2.3,4.0,Work,False,,,15.0,DC7000
2,2,d0ad664e-f1f3-494a-81a1-1bc570962154,Joshua Long,Stonefurt,Greenfort,2025-03-16 20:37:42,requested,,120,Pet,...,79.38,High,4.5,4.4,,False,2025-03-16 20:40:42,[North Mason],10.0,XY4109
3,3,a57596ca-5da8-405e-83c7-8b984c9f71a6,Michael Trujillo,Port Eric,West Jessicafort,2025-03-16 21:01:42,accepted,,57,Share,...,14.2,Medium,4.7,4.2,Home,False,,[Kimberlyfort],5.0,GH4577
4,4,0b9a2bbf-cce4-4c34-bb34-282855747617,Nicholas Johnson,New Michaelville,Jeffreyside,2025-03-16 20:56:42,requested,,31,Priority,...,107.75,Medium,3.6,4.2,,True,2025-03-16 20:40:42,"[Toniview, Jacobstown]",5.0,GH7386



First 5 Driver Availability Entries:


Unnamed: 0,driver_id,driver_name,status,update_time,driver_rating,is_wheelchair_accessible,vehicle_license_plate
0,0,Kevin Jimenez,on_trip,2025-03-16 20:40:43,4.4,True,GH8959
1,1,Amber King,available,2025-03-16 20:38:43,3.6,True,AB1062
2,2,Aaron Chambers,available,2025-03-16 21:06:43,4.7,True,GH9215
3,3,Ashley Prince,on_trip,2025-03-16 20:30:43,4.9,True,DC9620
4,4,Mitchell Higgins,available,2025-03-16 20:31:43,4.4,True,AB1558


JUST TO SEE IT CLEARLY IN PANDAS (CHANGED FROM JSON FILE TO PANDAS)

In [36]:
import fastavro

# Check the first few records of the passenger_requests.avro file
with open('passenger_requests.avro', 'rb') as avro_file:
    reader = fastavro.reader(avro_file)
    print("First 5 Passenger Requests (AVRO):")
    for i, record in enumerate(reader):
        if i < 5:
            print(record)

# Check the first few records of the driver_availability.avro file
with open('driver_availability.avro', 'rb') as avro_file:
    reader = fastavro.reader(avro_file)
    print("\nFirst 5 Driver Availability Entries (AVRO):")
    for i, record in enumerate(reader):
        if i < 5:
            print(record)


First 5 Passenger Requests (AVRO):
{'request_id': '0', 'pickup_location': 'Port Matthew', 'dropoff_location': 'New Marystad', 'request_time': '2025-03-16 20:57:42', 'status': 'accepted', 'estimated_eta': '11 minutes', 'cancellation_reason': None}
{'request_id': '1', 'pickup_location': 'Andreamouth', 'dropoff_location': 'Johnfort', 'request_time': '2025-03-16 20:33:42', 'status': 'canceled', 'estimated_eta': '3 minutes', 'cancellation_reason': None}
{'request_id': '2', 'pickup_location': 'Stonefurt', 'dropoff_location': 'Greenfort', 'request_time': '2025-03-16 20:37:42', 'status': 'requested', 'estimated_eta': '17 minutes', 'cancellation_reason': None}
{'request_id': '3', 'pickup_location': 'Port Eric', 'dropoff_location': 'West Jessicafort', 'request_time': '2025-03-16 21:01:42', 'status': 'accepted', 'estimated_eta': '23 minutes', 'cancellation_reason': None}
{'request_id': '4', 'pickup_location': 'New Michaelville', 'dropoff_location': 'Jeffreyside', 'request_time': '2025-03-16 20:56

In [37]:
import fastavro
import pandas as pd

# Check the first few records of the passenger_requests.avro file
with open('passenger_requests.avro', 'rb') as avro_file:
    reader = fastavro.reader(avro_file)
    passenger_records = [record for i, record in enumerate(reader) if i < 5]  # Collect first 5 records

    print("First 5 Passenger Requests (AVRO):")
    passenger_df = pd.DataFrame(passenger_records)  # Convert to pandas DataFrame
    display(passenger_df)  # Display as a table

# Check the first few records of the driver_availability.avro file
with open('driver_availability.avro', 'rb') as avro_file:
    reader = fastavro.reader(avro_file)
    driver_records = [record for i, record in enumerate(reader) if i < 5]  # Collect first 5 records

    print("\nFirst 5 Driver Availability Entries (AVRO):")
    driver_df = pd.DataFrame(driver_records)  # Convert to pandas DataFrame
    display(driver_df)  # Display as a table


First 5 Passenger Requests (AVRO):


Unnamed: 0,request_id,pickup_location,dropoff_location,request_time,status,estimated_eta,cancellation_reason
0,0,Port Matthew,New Marystad,2025-03-16 20:57:42,accepted,11 minutes,
1,1,Andreamouth,Johnfort,2025-03-16 20:33:42,canceled,3 minutes,
2,2,Stonefurt,Greenfort,2025-03-16 20:37:42,requested,17 minutes,
3,3,Port Eric,West Jessicafort,2025-03-16 21:01:42,accepted,23 minutes,
4,4,New Michaelville,Jeffreyside,2025-03-16 20:56:42,requested,5 minutes,



First 5 Driver Availability Entries (AVRO):


Unnamed: 0,driver_id,update_time,status
0,0,2025-03-16 20:40:43,on_trip
1,1,2025-03-16 20:38:43,available
2,2,2025-03-16 21:06:43,available
3,3,2025-03-16 20:30:43,on_trip
4,4,2025-03-16 20:31:43,available
