# Airbnb Data Loading to Cassandra

This notebook loads Airbnb data from CSV files into a Cassandra database.
The data includes listings, neighbourhoods, calendar, and reviews for multiple cities.

In [45]:
# Import required libraries
import os
import pandas as pd
from pathlib import Path
from cassandra.cluster import Cluster
import re
from datetime import datetime
import numpy as np
import time
from tqdm import tqdm

## Connect to Cassandra

First, we'll establish a connection to our local Cassandra cluster.

In [46]:
cluster = Cluster(['127.0.0.1'])
session = cluster.connect()
schema_file_path = Path('schema.cql')
with open(schema_file_path, 'r') as f:
            schema_content = f.read()
statements = [stmt.strip() for stmt in schema_content.split(';') if stmt.strip()]
            
for stmt in statements:
    if not stmt.startswith('--') and stmt:
        session.execute(stmt)

InvalidRequest: Error from server: code=2200 [Invalid query] message="Index listings_host_id_idx_1 is a duplicate of existing index listings_host_id_idx"

## Helper Functions

Let's define some helper functions to clean and convert data for Cassandra compatibility.

In [47]:
# Helper functions for data cleaning and format conversion

def clean_price(price_str):
    """Clean price strings to numeric values"""
    if pd.isna(price_str) or price_str == '':
        return None
    if isinstance(price_str, (int, float)):
        return float(price_str)
    # Remove currency symbols and commas
    if isinstance(price_str, str):
        cleaned = re.sub(r'[^\d.]', '', price_str)
        return float(cleaned) if cleaned else None
    return None

def convert_date(date_str):
    """Convert date strings to datetime objects"""
    if pd.isna(date_str) or date_str == '':
        return None
    if isinstance(date_str, str):
        try:
            return pd.to_datetime(date_str).date()
        except:
            return None
    if isinstance(date_str, pd.Timestamp):
        return date_str.date()
    return None

def convert_bool(value):
    """Convert various boolean representations to True/False"""
    if pd.isna(value) or value == '':
        return False
    if isinstance(value, bool):
        return value
    if isinstance(value, str):
        return value.lower() in ('true', 't', 'yes', 'y', '1')
    if isinstance(value, (int, float)):
        return bool(value)
    return False

def safe_int(value):
    """Convert values to integers safely"""
    if pd.isna(value) or value == '':
        return 0
    try:
        return int(float(value))
    except:
        return 0

def safe_float(value):
    """Convert values to floats safely"""
    if pd.isna(value) or value == '':
        return 0.0
    try:
        return float(value)
    except:
        return 0.0

def safe_text(value):
    """Convert values to strings safely for Cassandra text type"""
    if pd.isna(value) or value == '':
        return None
    # Escape single quotes for Cassandra
    if isinstance(value, str):
        return value.replace("'", "''")
    return str(value)

In [48]:
# Functions to load data into Cassandra

def load_neighbourhoods(file_path):
    """Load neighbourhoods data into Cassandra"""
    try:
        print(f"Loading neighbourhoods from: {file_path}")
        df = pd.read_csv(file_path)
        
        
        # Check if required columns exist
        if 'neighbourhood' not in df.columns or 'city' not in df.columns:
            print("Required columns missing in neighbourhoods data")
            return 0
        
        # Ensure neighbourhood_group column exists
        if 'neighbourhood_group' not in df.columns:
            df['neighbourhood_group'] = None
            
        insert_query = """
            INSERT INTO my_keyspace.neighbourhoods (city, neighbourhood_group, neighbourhood)
            VALUES (?, ?, ?)
        """
        
        prepared = session.prepare(insert_query)
        count = 0
        
        for _, row in tqdm(df.iterrows(), total=len(df), desc="Inserting neighbourhoods"):
            city = safe_text(row['city'])
            neighbourhood_group = safe_text(row.get('neighbourhood_group', ''))
            neighbourhood = safe_text(row['neighbourhood'])
            
            if city and neighbourhood:
                session.execute(prepared, (city, neighbourhood_group, neighbourhood))
                count += 1
        
        print(f"Inserted {count} neighbourhood records")
        return count
    except Exception as e:
        print(f"Error loading neighbourhoods: {e}")
        return 0

def load_listings(file_path):
    """Load listings data into Cassandra"""
    try:
        print(f"Loading listings from: {file_path}")
        df = pd.read_csv(file_path)
        
        # Drop rows with NA in required columns
        original_count = len(df)
        df = df.dropna(subset=['id', 'city'])
        dropped_count = original_count - len(df)
        print(f"Dropped {dropped_count} rows with NA values in required columns from listings data")
        
        if 'id' not in df.columns or 'city' not in df.columns:
            print("Required columns missing in listings data")
            return 0
        
        # Ensure neighbourhood column exists for primary key
        if 'neighbourhood' not in df.columns and 'neighbourhood_cleansed' in df.columns:
            df['neighbourhood'] = df['neighbourhood_cleansed']
        elif 'neighbourhood' not in df.columns:
            df['neighbourhood'] = 'Unknown'
            
        # Create insert query
        columns = [
            'id', 'name', 'description', 'host_id', 'host_name', 'host_since',
            'host_is_superhost', 'neighbourhood', 'neighbourhood_group', 'city',
            'latitude', 'longitude', 'property_type', 'room_type', 'accommodates',
            'bedrooms', 'beds', 'price', 'minimum_nights', 'maximum_nights',
            'number_of_reviews', 'review_scores_rating', 'calculated_host_listings_count'
        ]
        
        # Check which columns actually exist in the DataFrame
        existing_cols = [col for col in columns if col in df.columns]
        
        placeholders = ', '.join(['?'] * len(existing_cols))
        col_str = ', '.join(existing_cols)
        
        insert_query = f"""
            INSERT INTO my_keyspace.listings ({col_str})
            VALUES ({placeholders})
        """
        
        prepared = session.prepare(insert_query)
        count = 0
        batch_size = 100
        batch = []
        
        for idx, row in tqdm(df.iterrows(), total=len(df), desc="Preparing listings"):
            try:
                values = []
                for col in existing_cols:
                    value = row.get(col)
                    if col == 'id' or col == 'host_id':
                        values.append(safe_int(value))
                    elif col == 'host_since':
                        values.append(convert_date(value))
                    elif col == 'host_is_superhost':
                        values.append(convert_bool(value))
                    elif col in ['price']:
                        values.append(clean_price(value))
                    elif col in ['latitude', 'longitude', 'review_scores_rating']:
                        values.append(safe_float(value))
                    elif col in ['accommodates', 'bedrooms', 'beds', 'minimum_nights',
                                'maximum_nights', 'number_of_reviews', 'calculated_host_listings_count']:
                        values.append(safe_int(value))
                    else:
                        values.append(safe_text(value))
                        
                batch.append(values)
                
                # Execute in batches to improve performance
                if len(batch) >= batch_size:
                    for b in batch:
                        session.execute(prepared, b)
                    count += len(batch)
                    batch = []
                    
            except Exception as row_err:
                print(f"Error processing row {idx}: {row_err}")
                continue
        
        # Insert any remaining records
        if batch:
            for b in batch:
                try:
                    session.execute(prepared, b)
                    count += 1
                except Exception as e:
                    print(f"Error inserting batch record: {e}")
        
        print(f"Inserted {count} listing records")
        return count
    except Exception as e:
        print(f"Error loading listings: {e}")
        return 0
    
def load_calendar(file_path):
    """Load calendar data into Cassandra"""
    try:
        print(f"Loading calendar from: {file_path}")
        df = pd.read_csv(file_path)
        
        # Drop rows with NA in required columns
        original_count = len(df)
        df = df.dropna(subset=['listing_id', 'date'])
        dropped_count = original_count - len(df)
        print(f"Dropped {dropped_count} rows with NA values in required columns from calendar data")
        
        if 'listing_id' not in df.columns or 'date' not in df.columns:
            print("Required columns missing in calendar data")
            return 0
        
        # Convert date column to datetime
        df['date'] = pd.to_datetime(df['date']).dt.date
        
        # Process price columns
        if 'price' in df.columns:
            df['price'] = df['price'].apply(clean_price)
        if 'adjusted_price' in df.columns:
            df['adjusted_price'] = df['adjusted_price'].apply(clean_price)
            
        # Convert available column if needed
        if 'available' in df.columns:
            df['available'] = df['available'].apply(convert_bool)
        
        insert_query = """
            INSERT INTO my_keyspace.calendar 
            (listing_id, date, available, price, adjusted_price, minimum_nights, maximum_nights)
            VALUES (?, ?, ?, ?, ?, ?, ?)
        """
        
        prepared = session.prepare(insert_query)
        count = 0
        batch_size = 500
        batch = []
        
        for idx, row in tqdm(df.iterrows(), total=len(df), desc="Inserting calendar data"):
            try:
                listing_id = safe_int(row['listing_id'])
                date = row['date']
                available = convert_bool(row.get('available', True))
                price = clean_price(row.get('price', 0))
                adjusted_price = clean_price(row.get('adjusted_price', price))
                min_nights = safe_int(row.get('minimum_nights', 1))
                max_nights = safe_int(row.get('maximum_nights', 365))
                
                batch.append((listing_id, date, available, price, adjusted_price, min_nights, max_nights))
                
                if len(batch) >= batch_size:
                    for b in batch:
                        session.execute(prepared, b)
                    count += len(batch)
                    batch = []
                    
            except Exception as row_err:
                print(f"Error processing calendar row {idx}: {row_err}")
                continue
        
        # Insert any remaining records
        if batch:
            for b in batch:
                try:
                    session.execute(prepared, b)
                    count += 1
                except Exception as e:
                    print(f"Error inserting batch calendar record: {e}")
        
        print(f"Inserted {count} calendar records")
        return count
    except Exception as e:
        print(f"Error loading calendar: {e}")
        return 0

def load_reviews(file_path):
    """Load reviews data into Cassandra"""
    try:
        print(f"Loading reviews from: {file_path}")
        df = pd.read_csv(file_path)
        
        # Drop rows with NA in required columns
        original_count = len(df)
        df = df.dropna(subset=['listing_id', 'date', 'id'])
        dropped_count = original_count - len(df)
        print(f"Dropped {dropped_count} rows with NA values in required columns from reviews data")
        
        if 'listing_id' not in df.columns or 'date' not in df.columns or 'id' not in df.columns:
            print("Required columns missing in reviews data")
            return 0
        
        # Convert date column
        df['date'] = pd.to_datetime(df['date']).dt.date
        
        insert_query = """
            INSERT INTO my_keyspace.reviews 
            (id, listing_id, date, reviewer_id, reviewer_name, comments)
            VALUES (?, ?, ?, ?, ?, ?)
        """
        
        prepared = session.prepare(insert_query)
        count = 0
        batch_size = 250
        batch = []
        
        for idx, row in tqdm(df.iterrows(), total=len(df), desc="Inserting reviews"):
            try:
                review_id = safe_int(row['id'])
                listing_id = safe_int(row['listing_id'])
                date = row['date']
                reviewer_id = safe_int(row.get('reviewer_id', 0))
                reviewer_name = safe_text(row.get('reviewer_name', ''))
                comments = safe_text(row.get('comments', ''))
                
                batch.append((review_id, listing_id, date, reviewer_id, reviewer_name, comments))
                
                if len(batch) >= batch_size:
                    for b in batch:
                        session.execute(prepared, b)
                    count += len(batch)
                    batch = []
                    
            except Exception as row_err:
                print(f"Error processing review row {idx}: {row_err}")
                continue
        
        # Insert any remaining records
        if batch:
            for b in batch:
                try:
                    session.execute(prepared, b)
                    count += 1
                except Exception as e:
                    print(f"Error inserting batch review record: {e}")
        
        print(f"Inserted {count} review records")
        return count
    except Exception as e:
        print(f"Error loading reviews: {e}")
        return 0

## Load Preprocessed Data into Cassandra

Now let's load the preprocessed data into our Cassandra database. We'll use the preprocessed_*.csv files that were created earlier.

In [49]:
# Define base directory for the preprocessed data
BASE_DIR = Path('../Airbnb Data')

# File paths for preprocessed data
listings_path = BASE_DIR / 'preprocessed_listings.csv'
neighbourhoods_path = BASE_DIR / 'preprocessed_neighbourhoods.csv'
calendar_path = BASE_DIR / 'preprocessed_calendar.csv'
reviews_path = BASE_DIR / 'preprocessed_reviews.csv'

# Verify files exist
files_exist = True
for file_path in [listings_path, neighbourhoods_path, calendar_path, reviews_path]:
    if not file_path.exists():
        print(f"Warning: {file_path} does not exist")
        files_exist = False

if files_exist:
    print("All preprocessed files found. Starting data loading...\n")
    
    # Load neighbourhoods first (since they're referenced by listings)
    start_time = time.time()
    neighbourhood_count = load_neighbourhoods(neighbourhoods_path)
    print(f"Loaded {neighbourhood_count} neighbourhoods in {time.time() - start_time:.2f} seconds\n")
    
    # Load listings
    start_time = time.time()
    listings_count = load_listings(listings_path)
    print(f"Loaded {listings_count} listings in {time.time() - start_time:.2f} seconds\n")
    
    # Load calendar data (can be very large, so we process it last)
    start_time = time.time()
    calendar_count = load_calendar(calendar_path)
    print(f"Loaded {calendar_count} calendar entries in {time.time() - start_time:.2f} seconds\n")
    
    # Load reviews
    start_time = time.time()
    reviews_count = load_reviews(reviews_path)
    print(f"Loaded {reviews_count} reviews in {time.time() - start_time:.2f} seconds\n")
    
    # Summary
    print("\nData loading complete!")
    print(f"Total records loaded: {neighbourhood_count + listings_count + calendar_count + reviews_count}")
else:
    print("Some preprocessed files are missing. Please run the preprocessing notebook first.")

All preprocessed files found. Starting data loading...

Loading neighbourhoods from: ../Airbnb Data/preprocessed_neighbourhoods.csv


Inserting neighbourhoods: 100%|██████████| 481/481 [00:00<00:00, 1468.14it/s]


Inserted 481 neighbourhood records
Loaded 481 neighbourhoods in 0.58 seconds

Loading listings from: ../Airbnb Data/preprocessed_listings.csv
Dropped 0 rows with NA values in required columns from listings data


Preparing listings: 100%|██████████| 62771/62771 [00:55<00:00, 1133.59it/s]


Inserted 62771 listing records
Loaded 62771 listings in 56.39 seconds

Loading calendar from: ../Airbnb Data/preprocessed_calendar.csv


  df = pd.read_csv(file_path)


Dropped 0 rows with NA values in required columns from calendar data


Inserting calendar data: 100%|██████████| 22902530/22902530 [2:29:06<00:00, 2559.84it/s]  


Inserted 22902530 calendar records
Loaded 22902530 calendar entries in 9019.98 seconds

Loading reviews from: ../Airbnb Data/preprocessed_reviews.csv
Dropped 0 rows with NA values in required columns from reviews data


Inserting reviews: 100%|██████████| 3003791/3003791 [17:47<00:00, 2813.37it/s]

Inserted 3003791 review records
Loaded 3003791 reviews in 1087.55 seconds


Data loading complete!
Total records loaded: 25969573





## Verify Data in Cassandra

Let's run some queries to verify that our data loaded correctly.

In [44]:
start_time = time.time()
reviews_count = load_reviews(reviews_path)
print(f"Loaded {reviews_count} reviews in {time.time() - start_time:.2f} seconds\n")

Loading reviews from: ../Airbnb Data/preprocessed_reviews.csv
Dropped 0 rows with NA values in required columns from reviews data


Inserting reviews: 100%|██████████| 3003791/3003791 [17:37<00:00, 2841.16it/s]


Inserted 3003791 review records
Loaded 3003791 reviews in 1074.36 seconds



In [50]:
# Verify data in each table
def count_records(table_name):
    query = f"SELECT COUNT(*) FROM my_keyspace.{table_name}"
    try:
        result = session.execute(query)
        return result.one()[0]
    except Exception as e:
        print(f"Error counting {table_name}: {e}")
        return 0

# Get counts for each table
neighbourhoods_count = count_records('neighbourhoods')
listings_count = count_records('listings')
calendar_count = count_records('calendar')
reviews_count = count_records('reviews')

print("Records in Cassandra database:")
print(f"Neighbourhoods: {neighbourhoods_count}")
print(f"Listings: {listings_count}")
print(f"Calendar entries: {calendar_count}")
print(f"Reviews: {reviews_count}")

# Sample queries to verify data quality
print("\nSample data from each table:")

# Sample neighbourhoods
print("\nSample neighbourhoods:")
rows = session.execute("SELECT * FROM my_keyspace.neighbourhoods LIMIT 5")
for row in rows:
    print(row)

# Sample listings
print("\nSample listings:")
rows = session.execute("SELECT id, name, city, neighbourhood, room_type FROM my_keyspace.listings LIMIT 5")
for row in rows:
    print(row)

# Sample calendar entries
print("\nSample calendar entries:")
rows = session.execute("SELECT listing_id, date, available, price FROM my_keyspace.calendar LIMIT 5")
for row in rows:
    print(row)

# Sample reviews
print("\nSample reviews:")
rows = session.execute("SELECT id, listing_id, date, reviewer_name FROM my_keyspace.reviews LIMIT 5")
for row in rows:
    print(row)

Error counting calendar: errors={'127.0.0.1:9042': 'Client request timeout. See Session.execute[_async](timeout)'}, last_host=127.0.0.1:9042
Error counting reviews: Error from server: code=1300 [Replica(s) failed to execute read] message="Operation failed - received 0 responses and 1 failures: UNKNOWN from localhost/127.0.0.1:7000" info={'consistency': 'LOCAL_ONE', 'required_responses': 1, 'received_responses': 0, 'failures': 1, 'error_code_map': {'127.0.0.1': '0x0000'}}
Records in Cassandra database:
Neighbourhoods: 481
Listings: 62771
Calendar entries: 0
Reviews: 0

Sample data from each table:

Sample neighbourhoods:
Row(city='Salem', neighbourhood='Ward 1', neighbourhood_group=None)
Row(city='Salem', neighbourhood='Ward 2', neighbourhood_group=None)
Row(city='Salem', neighbourhood='Ward 3', neighbourhood_group=None)
Row(city='Salem', neighbourhood='Ward 4', neighbourhood_group=None)
Row(city='Salem', neighbourhood='Ward 5', neighbourhood_group=None)

Sample listings:
Row(id=4318176

In [None]:
# Close the Cassandra connection when done
cluster.shutdown()