# Cassandra

Dataset : taxi_trip_data.csv

## Connection

In [33]:
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
import json

# This secure connect bundle is autogenerated when you download your SCB,
# if yours is different update the file name below
cloud_config= {
  'secure_connect_bundle': './cassandra_needs/secure-connect-taxi-trip.zip'
}

# This token JSON file is autogenerated when you download your token,
# if yours is different update the file name below
with open("./cassandra_needs/taxi_trip-token.json") as f:
    secrets = json.load(f)

CLIENT_ID = secrets["clientId"]
CLIENT_SECRET = secrets["secret"]

auth_provider = PlainTextAuthProvider(CLIENT_ID, CLIENT_SECRET)
cluster = Cluster(cloud=cloud_config, auth_provider=auth_provider)
session = cluster.connect()

row = session.execute("select release_version from system.local").one()
if row:
  print(row[0])
else:
  print("An error occurred.")

4.0.0.6816


## Import and Read

In [19]:
import pandas as pd

taxi_trip_data = pd.read_csv('./datasets/taxi_trip_data.csv', nrows=50_000)

taxi_zone_geo_data = pd.read_csv('./datasets/taxi_zone_geo.csv')

In [20]:
taxi_trip_data.columns

Index(['vendor_id', 'pickup_datetime', 'dropoff_datetime', 'passenger_count',
       'trip_distance', 'rate_code', 'store_and_fwd_flag', 'payment_type',
       'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount',
       'imp_surcharge', 'total_amount', 'pickup_location_id',
       'dropoff_location_id'],
      dtype='object')

In [21]:
taxi_zone_geo_data.columns

Index(['zone_id', 'zone_name', 'borough', 'zone_geom'], dtype='object')

## Data Preprocessing

### Drop Unnecessary Columns

In [22]:
taxi_trip_data.drop(
    columns=["store_and_fwd_flag", "rate_code", "total_amount"], inplace=True
)

### Drop rows with missing essential details


In [23]:
essential_columns = [
    "vendor_id",
    "pickup_datetime",
    "dropoff_datetime",
    "passenger_count",
    "trip_distance",
    "payment_type",
    "fare_amount",
    "extra",
    "mta_tax",
    "tip_amount",
    "tolls_amount",
    "imp_surcharge",
    "pickup_location_id",
    "dropoff_location_id",
]


### Drop Missing Values

In [24]:
taxi_trip_data.dropna(subset=essential_columns, inplace=True)
taxi_zone_geo_data.dropna(inplace=True)

## Join Datasets

In [25]:
joined_data = pd.merge(
    taxi_trip_data,
    taxi_zone_geo_data,
    how="inner",
    left_on="pickup_location_id",
    right_on="zone_id",
)

In [31]:
joined_data.columns

Index(['vendor_id', 'pickup_datetime', 'dropoff_datetime', 'passenger_count',
       'trip_distance', 'payment_type', 'fare_amount', 'extra', 'mta_tax',
       'tip_amount', 'tolls_amount', 'imp_surcharge', 'pickup_location_id',
       'dropoff_location_id', 'zone_id', 'zone_name', 'borough', 'zone_geom',
       'trip_duration', 'total_trip_cost'],
      dtype='object')

## Calculations

### Trip Duration

In [26]:
joined_data["pickup_datetime"] = pd.to_datetime(joined_data["pickup_datetime"])
joined_data["dropoff_datetime"] = pd.to_datetime(joined_data["dropoff_datetime"])
joined_data["trip_duration"] = (
    joined_data["dropoff_datetime"] - joined_data["pickup_datetime"]
).dt.total_seconds() / 60

### Total Trip Cost

In [27]:
joined_data["total_trip_cost"] = (
    joined_data["fare_amount"]
    + joined_data["extra"]
    + joined_data["mta_tax"]
    + joined_data["tip_amount"]
    + joined_data["tolls_amount"]
    + joined_data["imp_surcharge"]
)

### Save CSV for cassandra usage

In [36]:
# save the data to a csv file
joined_data.to_csv("./datasets/joined_taxi_trip_data.csv", index=False)

## Cassandra Work

### Initiation

In [29]:
session.set_keyspace("taxi_trip_data")
row = cluster.metadata.keyspaces["taxi_trip_data"]

from pprint import pprint
pprint(row)

<cassandra.metadata.KeyspaceMetadata object at 0x12eed6260>


### Table Creation

In [35]:
session.execute(
    """
    CREATE TABLE IF NOT EXISTS taxi_trip_data.trip_data (
        vendor_id TEXT,
        pickup_datetime TIMESTAMP,
        dropoff_datetime TIMESTAMP,
        passenger_count INT,
        trip_distance DOUBLE,
        payment_type TEXT,
        fare_amount DOUBLE,
        extra DOUBLE,
        mta_tax DOUBLE,
        tip_amount DOUBLE,
        tolls_amount DOUBLE,
        imp_surcharge DOUBLE,
        pickup_location_id INT,
        dropoff_location_id INT,
        zone_id INT,
        zone_name TEXT,
        borough TEXT,
        zone_geom TEXT,
        trip_duration DOUBLE,
        total_trip_cost DOUBLE,
        PRIMARY KEY (pickup_datetime, vendor_id)
    )
    """
)

<cassandra.cluster.ResultSet at 0x11a611660>

### Insertion

In [37]:
import csv

ROW_LIMIT = 20_000
row_count = 0

with open("./datasets/joined_taxi_trip_data.csv", "r") as f:
    reader = csv.reader(f)  # Create a reader object.
    next(reader)  # Skip the header row.
    for row in reader:
        if row_count >= ROW_LIMIT:
            break
        vendor_id = row[0]
        pickup_datetime = row[1]
        dropoff_datetime = row[2]
        passenger_count = int(row[3])
        trip_distance = float(row[4])
        payment_type = row[5]
        fare_amount = float(row[6])
        extra = float(row[7])
        mta_tax = float(row[8])
        tip_amount = float(row[9])
        tolls_amount = float(row[10])
        imp_surcharge = float(row[11])
        pickup_location_id = int(row[12])
        dropoff_location_id = int(row[13])
        zone_id = int(row[14])
        zone_name = row[15]
        borough = row[16]
        zone_geom = row[17]
        trip_duration = float(row[18])
        total_trip_cost = float(row[19])
        
        query = (
            f"INSERT INTO taxi_trip_data.trip_data "
            f"(vendor_id, pickup_datetime, dropoff_datetime, passenger_count, trip_distance, payment_type, fare_amount, extra, mta_tax, tip_amount, tolls_amount, imp_surcharge, pickup_location_id, dropoff_location_id, zone_id, zone_name, borough, zone_geom, trip_duration, total_trip_cost) "
            f"VALUES ('{vendor_id}', '{pickup_datetime}', '{dropoff_datetime}', {passenger_count}, {trip_distance}, '{payment_type}', {fare_amount}, {extra}, {mta_tax}, {tip_amount}, {tolls_amount}, {imp_surcharge}, {pickup_location_id}, {dropoff_location_id}, {zone_id}, '{zone_name}', '{borough}', '{zone_geom}', {trip_duration}, {total_trip_cost})"
        )

        try:
            session.execute(query)
        except Exception as e:
            print(e)
            break
        row_count += 1

errors={'01ac88f6-b79d-4938-ad9d-5597469bea97-us-east1.db.astra.datastax.com:29042:c39a5bc8-1a0c-4e01-ab3a-883b2afa0063': 'Client request timeout. See Session.execute[_async](timeout)'}, last_host=01ac88f6-b79d-4938-ad9d-5597469bea97-us-east1.db.astra.datastax.com:29042:c39a5bc8-1a0c-4e01-ab3a-883b2afa0063


### Closing Connection

In [38]:
# Close the connection
cluster.shutdown()