In [1]:
from cassandra.cluster import Cluster

cluster = Cluster(['127.0.0.1'])  # Cassandra is running locally
session = cluster.connect()

# List keyspaces
rows = session.execute("SELECT keyspace_name FROM system_schema.keyspaces;")
for row in rows:
    print(row.keyspace_name)


system_auth
system_schema
logistics
system_distributed
system
system_traces


In [2]:
from cassandra.cluster import Cluster
import uuid
import random
from datetime import datetime, timedelta

# -----------------------------
# Connect to Cassandra logistics keyspace
# -----------------------------
cluster = Cluster(['127.0.0.1'])
session = cluster.connect('logistics')  # use your correct keyspace




In [None]:
###now without inserting dupes

In [None]:
from cassandra.cluster import Cluster
from cassandra.query import BatchStatement
import uuid, random
from datetime import datetime, timedelta

# -----------------------------
# Connect to Cassandra
# -----------------------------
cluster = Cluster(['127.0.0.1'])
session = cluster.connect('logistics')  # Keyspace: logistics

# -----------------------------
# Configurable options
# -----------------------------
NUM_ROWS = 10000
BATCH_SIZE = 50
BENEFICIARY_RANGE = (50, 600)
VEHICLE_LOAD_RANGE = (2, 25)
WAREHOUSE_TAT_RANGE = (1, 24)
RETURN_PERCENTAGE_RANGE = (0, 10)
REQUESTED_TONNES_RANGE = (2, 20)

# -----------------------------
# Data pools
# -----------------------------
branches = ['Nairobi', 'Mombasa', 'Kisumu', 'Wajir', 'Garissa', 'Turkana', 'Mandera']
subcounties = {
    'Nairobi': ['Ngong', 'Kangemi', 'Kasarani'],
    'Mombasa': ['Kisauni', 'Likoni', 'Nyali'],
    'Kisumu': ['Kisumu East', 'Kisumu West', 'Nyando'],
    'Wajir': ['Wajir East', 'Wajir West', 'Wajir North'],
    'Garissa': ['Garissa Township', 'Balambala', 'Lagdera'],
    'Turkana': ['Turkana North', 'Turkana South', 'Turkana Central'],
    'Mandera': ['Mandera East', 'Mandera West', 'Mandera North']
}

vehicles = ['KDB125W', 'KBD126U', 'KBA789I', 'KDA123X', 'KDA124Y', 'KDB567Z', 'KBC234U']
drivers = ['John', 'Mary', 'Ali','Martin', 'Grace', 'Amina', 'Peter', 'Fatuma', 'George']

cargo_options = {
    'Food': ['RUTF (Therapeutic Food)', 'General Food Rations', 'RUSF (Supplementary Food)', 'Maize Flour', 'Rice', 'Cooking Oil', 'Beans'],
    'WASH': ['Water', 'Hygiene Kits', 'Water Purification Tablets', 'Sanitary Pads', 'Soap'],
    'Medical': ['Medicines', 'Health Kits', 'Vaccines', 'Medical Equipment', 'First Aid Kits'],
    'Shelter / NFI': ['Tent', 'Blanket', 'Sleeping Mat', 'Plastic Sheeting', 'Kitchen Set'],
    'Logistics': ['Fuel', 'Generator', 'Warehousing Equipment', 'Pallets', 'Forklift']
}

statuses = ['Planned', 'In Transit', 'Delivered', 'Delayed']
status_weights = [0.05, 0.4, 0.5, 0.05]
urgency_levels = ['High', 'Medium', 'Low']
urgency_weights = [0.1, 0.3, 0.6]
regions = ['Central', 'Coast', 'Western', 'North Eastern']
distribution_centers = ['DC_1', 'DC_2', 'DC_3', 'Dagahaley Camp', 'Ifo Camp', 'Kakuma Camp']
week_ranges = ['1-7', '8-14', '15-21', '22-28', '29-31']
days_of_week = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday']

# -----------------------------
# Pre-prepare insert statement
# -----------------------------
insert_stmt = session.prepare("""
    INSERT INTO humanitarian_deliveries (
        branch, route, delivery_id, vehicle, driver_name,
        cargo_type, cargo_subtype, vehicle_load_tonnes, beneficiaries_count,
        created_date, arrival_date, status, urgency_level, region,
        distribution_center, week_range, day_of_week, requested_tonnes,
        released_tonnes, return_percentage, capacity_utilisation,
        warehouse_release_time_hours
    ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""")

# -----------------------------
# Helper functions
# -----------------------------
def choose_cargo(region):
    if region in ['Wajir', 'Garissa', 'Turkana', 'Mandera']:
        return random.choices(list(cargo_options.keys()), weights=[0.3,0.4,0.1,0.1,0.1], k=1)[0]
    else:
        return random.choices(list(cargo_options.keys()), weights=[0.4,0.2,0.2,0.1,0.1], k=1)[0]

def choose_route(branch):
    subcounty = random.choice(subcounties[branch])
    return f"{branch} - {subcounty}"

# -----------------------------
# Generate and insert rows in batches
# -----------------------------
batch = BatchStatement()

for i in range(NUM_ROWS):
    branch = random.choice(branches)
    route = choose_route(branch)
    delivery_id = uuid.uuid4()
    vehicle = random.choice(vehicles)
    driver_name = random.choice(drivers)
    region = random.choice(regions)
    cargo_type = choose_cargo(region)
    cargo_subtype = random.choice(cargo_options[cargo_type])
    vehicle_load_tonnes = round(random.uniform(*VEHICLE_LOAD_RANGE), 2)
    beneficiaries_count = random.randint(*BENEFICIARY_RANGE)
    created_date = datetime.now() - timedelta(days=random.randint(0, 30))
    arrival_date = created_date + timedelta(days=random.randint(1, 5))
    status = random.choices(statuses, weights=status_weights, k=1)[0]
    urgency_level = random.choices(urgency_levels, weights=urgency_weights, k=1)[0]
    distribution_center = random.choice(distribution_centers)
    week_range = random.choice(week_ranges)
    day_of_week = random.choice(days_of_week)
    requested_tonnes = round(random.uniform(*REQUESTED_TONNES_RANGE), 2)
    released_tonnes = round(random.uniform(1, requested_tonnes), 2)
    return_percentage = round(random.uniform(*RETURN_PERCENTAGE_RANGE), 2)
    capacity_utilisation = round((released_tonnes / vehicle_load_tonnes) * 100, 2)
    warehouse_release_time_hours = random.randint(*WAREHOUSE_TAT_RANGE)

    # -----------------------------
    # Duplicate check
    # -----------------------------
    existing = session.execute(
        "SELECT delivery_id FROM humanitarian_deliveries WHERE branch=%s AND route=%s AND delivery_id=%s",
        (branch, route, delivery_id)
    )
    if not existing.one():
        batch.add(insert_stmt, (
            branch, route, delivery_id, vehicle, driver_name,
            cargo_type, cargo_subtype, vehicle_load_tonnes, beneficiaries_count,
            created_date, arrival_date, status, urgency_level, region,
            distribution_center, week_range, day_of_week, requested_tonnes,
            released_tonnes, return_percentage, capacity_utilisation,
            warehouse_release_time_hours
        ))

    # Execute batch every BATCH_SIZE rows
    if (i + 1) % BATCH_SIZE == 0:
        session.execute(batch)
        batch.clear()

# Execute remaining rows if any
if len(batch) > 0:
    session.execute(batch)

print(f"✅ Successfully inserted {NUM_ROWS} rows in batches into logistics.humanitarian_deliveries!")


In [2]:
!jupyter nbconvert --to script lg_data_insert.ipynb


[NbConvertApp] Converting notebook lg_data_insert.ipynb to script
[NbConvertApp] Writing 6780 bytes to lg_data_insert.py


In [4]:
from cassandra.cluster import Cluster
from cassandra.query import BatchStatement
from datetime import datetime
import uuid

# -----------------------------
# Connect to Cassandra
# -----------------------------
cluster = Cluster(['127.0.0.1'])
session = cluster.connect('logistics')

# -----------------------------
# Create summary table if not exists
# -----------------------------
session.execute("""
CREATE TABLE IF NOT EXISTS humanitarian_summary (
    branch text,
    route text,
    cargo_type text,
    cargo_subtype text,
    distribution_center text,
    week_range text,
    total_deliveries int,
    total_released_tonnes decimal,
    avg_capacity_utilisation decimal,
    avg_return_percentage decimal,
    total_beneficiaries int,
    status_delivered int,
    status_in_transit int,
    status_planned int,
    status_delayed int,
    PRIMARY KEY ((branch, route, cargo_type, cargo_subtype, distribution_center, week_range))
)
""")

# -----------------------------
# Aggregate original deliveries table
# -----------------------------
# Fetch all rows (can also filter by date if needed)
rows = session.execute("SELECT branch, route, cargo_type, cargo_subtype, distribution_center, week_range, released_tonnes, capacity_utilisation, return_percentage, beneficiaries_count, status FROM humanitarian_deliveries")

# Dictionary to hold aggregation
summary = {}

for row in rows:
    key = (row.branch, row.route, row.cargo_type, row.cargo_subtype, row.distribution_center, row.week_range)
    
    if key not in summary:
        summary[key] = {
            'total_deliveries': 0,
            'total_released_tonnes': 0.0,
            'capacity_sum': 0.0,
            'return_sum': 0.0,
            'total_beneficiaries': 0,
            'status_delivered': 0,
            'status_in_transit': 0,
            'status_planned': 0,
            'status_delayed': 0
        }
    
    summary[key]['total_deliveries'] += 1
    summary[key]['total_released_tonnes'] += float(row.released_tonnes)
    summary[key]['capacity_sum'] += float(row.capacity_utilisation)
    summary[key]['return_sum'] += float(row.return_percentage)
    summary[key]['total_beneficiaries'] += int(row.beneficiaries_count)
    
    if row.status == 'Delivered':
        summary[key]['status_delivered'] += 1
    elif row.status == 'In Transit':
        summary[key]['status_in_transit'] += 1
    elif row.status == 'Planned':
        summary[key]['status_planned'] += 1
    elif row.status == 'Delayed':
        summary[key]['status_delayed'] += 1

# -----------------------------
# Prepare insert statement
# -----------------------------
insert_stmt = session.prepare("""
INSERT INTO humanitarian_summary (
    branch, route, cargo_type, cargo_subtype, distribution_center, week_range,
    total_deliveries, total_released_tonnes, avg_capacity_utilisation,
    avg_return_percentage, total_beneficiaries,
    status_delivered, status_in_transit, status_planned, status_delayed
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""")

# -----------------------------
# Insert aggregated data
# -----------------------------
batch = BatchStatement()

for key, agg in summary.items():
    branch, route, cargo_type, cargo_subtype, distribution_center, week_range = key
    total_deliveries = agg['total_deliveries']
    total_released_tonnes = agg['total_released_tonnes']
    avg_capacity_utilisation = round(agg['capacity_sum'] / total_deliveries, 2)
    avg_return_percentage = round(agg['return_sum'] / total_deliveries, 2)
    total_beneficiaries = agg['total_beneficiaries']
    
    batch.add(insert_stmt, (
        branch, route, cargo_type, cargo_subtype, distribution_center, week_range,
        total_deliveries, total_released_tonnes, avg_capacity_utilisation,
        avg_return_percentage, total_beneficiaries,
        agg['status_delivered'], agg['status_in_transit'], agg['status_planned'], agg['status_delayed']
    ))
    
    if len(batch) >= 50:
        session.execute(batch)
        batch.clear()

# Execute remaining batch if any
if len(batch) > 0:
    session.execute(batch)

print("✅ Humanitarian summary table created and populated successfully!")


✅ Humanitarian summary table created and populated successfully!
