Imports and loading data

In [1]:
import pandas as pd
import numpy as np
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
import matplotlib.pyplot as plt
import seaborn as sns

taxitripdata = pd.read_csv(
    r'C:\Users\maraw\Desktop\Assignment 1\datasets\taxi_trip_data.csv', encoding='cp1252')
taxizonegeo = pd.read_csv(
    r'C:\Users\maraw\Desktop\Assignment 1\datasets\taxi_zone_geo.csv', encoding='cp1252')

if (taxitripdata is not None):
    print("taxitripdata loaded")

if (taxizonegeo is not None):
    print("taxizonegeo loaded")

taxitripdata loaded
taxizonegeo loaded


Data preprocessing

In [2]:
taxitripdata = taxitripdata[:100000]
# a) Remove the columns "store_and_fwd_flag", "rate_code", and "total_amount" from taxitripdata
taxitripdata = taxitripdata.drop(columns=["store_and_fwd_flag", "rate_code", "total_amount"])

# b) Drop rows with missing essential details
taxitripdata = taxitripdata.dropna()
taxitripdata = taxitripdata[taxitripdata['trip_distance'] > 0]
taxitripdata = taxitripdata[taxitripdata['passenger_count'] > 0]
taxitripdata = taxitripdata[(taxitripdata['fare_amount'] >= 0) & (taxitripdata['tip_amount'] >= 0)]

taxitripdata['pickup_datetime'] = pd.to_datetime(taxitripdata['pickup_datetime'], errors='coerce')
taxitripdata['dropoff_datetime'] = pd.to_datetime(taxitripdata['dropoff_datetime'], errors='coerce')

# Drop rows with invalid datetime format
taxitripdata.dropna(subset=['pickup_datetime', 'dropoff_datetime'], inplace=True)

# Convert the datetime objects back to strings
taxitripdata['pickup_datetime'] = taxitripdata['pickup_datetime'].dt.strftime('%Y-%m-%d %H:%M:%S')
taxitripdata['dropoff_datetime'] = taxitripdata['dropoff_datetime'].dt.strftime('%Y-%m-%d %H:%M:%S')


Cassandra connection

In [124]:
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider

cloud_config = {
    'secure_connect_bundle': 'secure-connect-ass-1.zip' 
}
auth_provider = PlainTextAuthProvider('vaAAEmqjcceIeSrgisKSNYDr', '_a_Txb5DosFoQy+ZMjweULuwNdfp9LD+7LCd9xRTdr_9n_UJiJQBsXOuvrl+.qBwei32S52-LrCelBjUk,YZxoAFjcH0iT+WeFGddti_A9G7eUuCMRj8F3m0aMeEfBnz')
cluster = Cluster(cloud=cloud_config, auth_provider=auth_provider)
session = cluster.connect()

session.set_keyspace('big_data')

Creating tables

In [4]:
session.execute("""
CREATE TABLE IF NOT EXISTS trips (
    vendor_id text,
    pickup_datetime timestamp,
    dropoff_datetime timestamp,
    passenger_count int,
    trip_distance double,
    payment_type text,
    fare_amount double,
    extra double,
    mta_tax double,
    tip_amount double,
    tolls_amount double,
    imp_surcharge double,
    pickup_location_id text,
    dropoff_location_id text,
    PRIMARY KEY (pickup_datetime, vendor_id)
)
""")

session.execute("""
CREATE TABLE IF NOT EXISTS zones (
    zone_id text PRIMARY KEY,
    zone_name text,
    borough text,
    zone_geom text
)
""")

<cassandra.cluster.ResultSet at 0x15f8fbdb100>

Cassandra insertion

In [31]:
session.execute("TRUNCATE trips;")

<cassandra.cluster.ResultSet at 0x15f587903a0>

In [32]:
# Trips table insertion
from datetime import datetime
from cassandra.query import BatchStatement

taxitripdata = taxitripdata[:20000]

batch_size = 2000
batch_count = len(taxitripdata) // batch_size + 1

for i in range(batch_count):
    batch_start = i * batch_size
    batch_end = (i + 1) * batch_size
    batch = taxitripdata.iloc[batch_start:batch_end]
    batch_statement = BatchStatement()
    for index, row in batch.iterrows():
        try:
            statement = """
            INSERT INTO trips (vendor_id, pickup_datetime, dropoff_datetime, passenger_count, trip_distance, payment_type, fare_amount, extra, mta_tax, tip_amount, tolls_amount, imp_surcharge, pickup_location_id, dropoff_location_id)
            VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
            """
            batch_statement.add(statement, (
                str(row['vendor_id']),
                datetime.strptime(row['pickup_datetime'], '%Y-%m-%d %H:%M:%S'),
                datetime.strptime(row['dropoff_datetime'], '%Y-%m-%d %H:%M:%S'),
                int(row['passenger_count']),
                row['trip_distance'],
                str(row['payment_type']),
                row['fare_amount'],
                row['extra'],
                row['mta_tax'],
                row['tip_amount'],
                row['tolls_amount'],
                row['imp_surcharge'],
                str(row['pickup_location_id']),
                str(row['dropoff_location_id'])
            ))
        except Exception as e:
            print(f"Error at index {index}: {e}")
            print(row)
            continue
    session.execute(batch_statement)


In [27]:
result = session.execute("SELECT COUNT(*) FROM trips")
count = result.one()[0]
print(f"Total number of rows in trips table: {count}")

Total number of rows in trips table: 19994


In [34]:
result = session.execute("SELECT * FROM trips LIMIT 5")
for row in result:
    print(row)

Row(pickup_datetime=datetime.datetime(2018, 1, 28, 10, 42, 35), vendor_id='1', dropoff_datetime=datetime.datetime(2018, 1, 28, 10, 50, 4), dropoff_location_id='161', extra=0.0, fare_amount=7.0, imp_surcharge=0.3, mta_tax=0.5, passenger_count=1, payment_type='1', pickup_location_id='237', tip_amount=2.3, tolls_amount=0.0, trip_distance=1.3, trip_durations=None)
Row(pickup_datetime=datetime.datetime(2018, 5, 30, 20, 46, 20), vendor_id='1', dropoff_datetime=datetime.datetime(2018, 5, 30, 20, 52, 13), dropoff_location_id='148', extra=0.5, fare_amount=6.0, imp_surcharge=0.3, mta_tax=0.5, passenger_count=1, payment_type='2', pickup_location_id='113', tip_amount=0.0, tolls_amount=0.0, trip_distance=1.1, trip_durations=None)
Row(pickup_datetime=datetime.datetime(2018, 11, 3, 3, 34, 5), vendor_id='2', dropoff_datetime=datetime.datetime(2018, 11, 3, 3, 40, 17), dropoff_location_id='148', extra=0.5, fare_amount=5.5, imp_surcharge=0.3, mta_tax=0.5, passenger_count=5, payment_type='2', pickup_locat

In [6]:
session.execute("TRUNCATE zones;")

<cassandra.cluster.ResultSet at 0x15f160921d0>

In [10]:
# Zones table insertion
from cassandra.query import BatchStatement

batch_size = 20
batch = BatchStatement()
for index, row in taxizonegeo.iterrows():
    batch.add("""
        INSERT INTO zones (zone_id, zone_name, borough, zone_geom)
        VALUES (%s, %s, %s, %s)
        """, (str(row['zone_id']), row['zone_name'], row['borough'], row['zone_geom']))
    if index % batch_size == 0:
        session.execute(batch)
        batch.clear()

if batch:
    session.execute(batch)

In [11]:
result = session.execute("SELECT * FROM zones LIMIT 5")
for row in result:
    print(row)

Row(zone_id='255', borough='Brooklyn', zone_geom='POLYGON((-73.9619023962157 40.7253205009055, -73.9621146158342 40.7251480631995, -73.9616598430067 40.7248604542315, -73.9618942670608 40.7241067020505, -73.9619025615696 40.7240982902404, -73.9620074479949 40.7239919013039, -73.9620727192193 40.7238803007014, -73.9622350498215 40.7235702591326, -73.9622534627705 40.723532297773, -73.9627566541133 40.722399015063, -73.9629812449123 40.7220316763952, -73.9634536531554 40.7215788900316, -73.9634771575288 40.7215563612848, -73.9634781016271 40.7215565130286, -73.9635795096711 40.7215727887158, -73.9636020182392 40.7215386669427, -73.9639979372447 40.7209384380727, -73.9648942203191 40.721475120882, -73.9650491454935 40.7213573442518, -73.9640876914787 40.7207555295223, -73.9641477183037 40.7207062794526, -73.9642027448413 40.7206537571889, -73.9642524639679 40.7205982558891, -73.9642965981865 40.7205400853375, -73.9643250960372 40.7204723693472, -73.9643509064015 40.7204040323719, -73.9645

Questions d-h

In [39]:
session.execute("ALTER TABLE trips add trip_duration int;")

<cassandra.cluster.ResultSet at 0x15f080d49d0>

In [43]:
# d) Calculate the duration for each trip and add it as a new field in your database
from cassandra.cluster import Cluster
from cassandra.query import SimpleStatement, BatchStatement
import time

def unix_timestamp(dt):
    return int(time.mktime(dt.timetuple()))

fetch_size = 2000

query = SimpleStatement("SELECT vendor_id, pickup_datetime, dropoff_datetime FROM trips", fetch_size=fetch_size)
rows = session.execute(query)

batch = BatchStatement()

for row in rows:
    vendor_id = row.vendor_id
    pickup_datetime = row.pickup_datetime
    dropoff_datetime = row.dropoff_datetime
    
    trip_duration = unix_timestamp(dropoff_datetime) - unix_timestamp(pickup_datetime)
    
    update_query = SimpleStatement("""
        UPDATE trips
        SET trip_duration = %s
        WHERE pickup_datetime = %s AND vendor_id = %s
    """)
    
    batch.add(update_query, (trip_duration, pickup_datetime, vendor_id))

session.execute(batch)

<cassandra.cluster.ResultSet at 0x15f5860cdf0>

In [109]:
result = session.execute("SELECT * FROM trips LIMIT 5")
for row in result:
    print(row)

Row(pickup_datetime=datetime.datetime(2018, 1, 28, 10, 42, 35), vendor_id='1', dropoff_borough=None, dropoff_datetime=datetime.datetime(2018, 1, 28, 10, 50, 4), dropoff_location_id='161', dropoff_zone_name=None, extra=0.0, fare_amount=7.0, imp_surcharge=0.3, mta_tax=0.5, passenger_count=1, payment_type='1', pickup_borough=None, pickup_location_id='237', pickup_zone_name=None, tip_amount=2.3, tolls_amount=0.0, total_cost=10.100000000000001, trip_distance=1.3, trip_duration=449)
Row(pickup_datetime=datetime.datetime(2018, 5, 30, 20, 46, 20), vendor_id='1', dropoff_borough=None, dropoff_datetime=datetime.datetime(2018, 5, 30, 20, 52, 13), dropoff_location_id='148', dropoff_zone_name=None, extra=0.5, fare_amount=6.0, imp_surcharge=0.3, mta_tax=0.5, passenger_count=1, payment_type='2', pickup_borough=None, pickup_location_id='113', pickup_zone_name=None, tip_amount=0.0, tolls_amount=0.0, total_cost=7.3, trip_distance=1.1, trip_duration=353)
Row(pickup_datetime=datetime.datetime(2018, 11, 3,

In [None]:
session.execute("ALTER TABLE trips ADD total_cost double")

In [45]:
# e) Calculate the total trip cost and add it as a new field in your database
query = SimpleStatement("SELECT vendor_id, pickup_datetime, fare_amount, extra, mta_tax, tip_amount, tolls_amount, imp_surcharge FROM trips", fetch_size=fetch_size)
rows = session.execute(query)

batch = BatchStatement()

for row in rows:
    vendor_id = row.vendor_id
    pickup_datetime = row.pickup_datetime
    total_cost = row.fare_amount + row.extra + row.mta_tax + row.tip_amount + row.tolls_amount + row.imp_surcharge
    
    update_query = SimpleStatement("""
        UPDATE trips
        SET total_cost = %s
        WHERE pickup_datetime = %s AND vendor_id = %s
    """)
    
    batch.add(update_query, (total_cost, pickup_datetime, vendor_id))

session.execute(batch)

<cassandra.cluster.ResultSet at 0x15f5b0a2560>

In [57]:
from collections import defaultdict
from datetime import datetime, time, timedelta

time_ranges = [
    (time(0, 0), time(6, 0), "Night"),
    (time(6, 0), time(12, 0), "Morning"),
    (time(12, 0), time(18, 0), "Afternoon"),
    (time(18, 0), time(23, 59, 59), "Evening")
]

query = SimpleStatement("SELECT pickup_datetime, payment_type FROM trips", fetch_size=fetch_size)
rows = session.execute(query)

payment_counts = {label: defaultdict(int) for _, _, label in time_ranges}

for row in rows:
    pickup_time = row.pickup_datetime.time()
    payment_type = row.payment_type

    for start_time, end_time, label in time_ranges:
        if start_time <= pickup_time <= end_time or (end_time < start_time and (pickup_time <= end_time or pickup_time >= start_time)):
            payment_counts[label][payment_type] += 1
            break

for _, _, label in time_ranges:
    most_common_payment = max(payment_counts[label], key=payment_counts[label].get)
    print(f"{label}: {most_common_payment} with {payment_counts[label][most_common_payment]} occurrences")

Night: 1 with 1284 occurrences
Morning: 1 with 3449 occurrences
Afternoon: 1 with 4291 occurrences
Evening: 1 with 4879 occurrences


In [59]:
# g) What is the average tip amount per passenger count?
from collections import defaultdict

query = SimpleStatement("SELECT passenger_count, tip_amount FROM trips", fetch_size=fetch_size)
rows = session.execute(query)

tip_sums = defaultdict(float)
tip_counts = defaultdict(int)

for row in rows:
    passenger_count = row.passenger_count
    tip_amount = row.tip_amount

    tip_sums[passenger_count] += tip_amount
    tip_counts[passenger_count] += 1

for passenger_count in tip_sums:
    avg_tip = tip_sums[passenger_count] / tip_counts[passenger_count]
    print(f"Passenger Count {passenger_count}: Average Tip Amount = {avg_tip}")

Passenger Count 1: Average Tip Amount = 1.849161772930645
Passenger Count 5: Average Tip Amount = 1.8550947603121506
Passenger Count 2: Average Tip Amount = 1.8219293568810384
Passenger Count 3: Average Tip Amount = 1.7779391100702588
Passenger Count 4: Average Tip Amount = 1.961300813008131
Passenger Count 6: Average Tip Amount = 1.9800175746924427


In [141]:
# h) What are the best 5 locations for drivers to pick up passengers from?

# Get trips data and count the pickups per location
trips_query = "SELECT pickup_location_id FROM trips;"
trips_result = session.execute(trips_query)

pickup_counts = {}
for row in trips_result:
    location_id = row.pickup_location_id
    if location_id in pickup_counts:
        pickup_counts[location_id] += 1
    else:
        pickup_counts[location_id] = 1


In [142]:
# Get zone data and store it in a dict
zones_query = "SELECT zone_id, zone_name, borough FROM zones;"
zones_result = session.execute(zones_query)

zone_data = {}
for row in zones_result:
    zone_data[row.zone_id] = {
        'zone_name': row.zone_name,
        'borough': row.borough
    }


In [143]:
best_pickup_locations = sorted(pickup_counts.items(), key=lambda x: x[1], reverse=True)[:5]

for location_id, count in best_pickup_locations:
    location_data = zone_data.get(location_id, {'zone_name': 'Unknown', 'borough': 'Unknown'})
    print(f"Zone ID: {location_id}, Zone Name: {location_data['zone_name']}, Borough: {location_data['borough']}, Pickups: {count}")


Zone ID: 237, Zone Name: Upper East Side South, Borough: Manhattan, Pickups: 807
Zone ID: 161, Zone Name: Midtown Center, Borough: Manhattan, Pickups: 757
Zone ID: 236, Zone Name: Upper East Side North, Borough: Manhattan, Pickups: 730
Zone ID: 230, Zone Name: Times Sq/Theatre District, Borough: Manhattan, Pickups: 711
Zone ID: 162, Zone Name: Midtown East, Borough: Manhattan, Pickups: 683
