In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_timestamp, hour, dayofweek, month, year, date_format, rand
import pymongo
import happybase
import uuid
from datetime import datetime
import time
import json



# Initialize Spark with proper configurations
spark = SparkSession.builder \
    .appName("NYC Taxi Data Pipeline") \
    .config("spark.mongodb.output.uri", "mongodb://localhost:27017/bigdata_demo.raw_taxi_data") \
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1") \
    .config("spark.mongodb.input.uri", "mongodb://localhost:27017/bigdata_demo.raw_taxi_data") \
    .getOrCreate()
    
spark.sparkContext.setLogLevel("ERROR")


your 131072x1 screen size is bogus. expect trouble
25/04/15 21:29:01 WARN Utils: Your hostname, DESKTOP-U7R862J resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/04/15 21:29:01 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/suman/.ivy2/cache
The jars for the packages stored in: /home/suman/.ivy2/jars
org.mongodb.spark#mongo-spark-connector_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-f88bd66f-f108-4745-a9f2-4577ba5f2729;1.0
	confs: [default]
	found org.mongodb.spark#mongo-spark-connector_2.12;3.0.1 in central
	found org.mongodb#mongodb-driver-sync;4.0.5 in central
	found org.mongodb#bson;4.0.5 in central
	found org.mongodb#mongodb-driver-core;4.0.5 in central
:: resolution report :: resolve 305ms :: artifacts dl 14ms
	:: modules in use:
	org.mongodb#bson;4.0.5 from central in [default]
	org.mongodb#mongodb-driver-core;4.0.5 from central in [default]
	org.mongodb#mongodb-driver-sync;4.0.5 from central in [default]
	org.mongodb.spark#mongo-spark-connector_2.12;3.0.1 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts  

In [2]:
taxi_df  = spark.read.csv("yellow_tripdata_2025-01.csv", header=True)

In [3]:
# Display the schema
print("Dataset Schema:")
taxi_df.printSchema()

Dataset Schema:
root
 |-- VendorID: string (nullable = true)
 |-- tpep_pickup_datetime: string (nullable = true)
 |-- tpep_dropoff_datetime: string (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- trip_distance: string (nullable = true)
 |-- RatecodeID: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: string (nullable = true)
 |-- DOLocationID: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: string (nullable = true)
 |-- extra: string (nullable = true)
 |-- mta_tax: string (nullable = true)
 |-- tip_amount: string (nullable = true)
 |-- tolls_amount: string (nullable = true)
 |-- improvement_surcharge: string (nullable = true)
 |-- total_amount: string (nullable = true)
 |-- congestion_surcharge: string (nullable = true)
 |-- Airport_fee: string (nullable = true)



In [None]:


print("Performing data cleaning and transformations...")
cleaned_df = taxi_df \
    .withColumn("pickup_datetime", to_timestamp(col("tpep_pickup_datetime"))) \
    .withColumn("dropoff_datetime", to_timestamp(col("tpep_dropoff_datetime"))) \
    .withColumn("pickup_hour", hour(col("pickup_datetime"))) \
    .withColumn("pickup_day", dayofweek(col("pickup_datetime"))) \
    .withColumn("pickup_month", month(col("pickup_datetime"))) \
    .withColumn("pickup_year", year(col("pickup_datetime"))) \
    .withColumn("trip_duration_minutes", (col("dropoff_datetime").cast("long") - col("pickup_datetime").cast("long")) / 60) \
    .withColumn("date", date_format(col("pickup_datetime"), "yyyy-MM-dd")) \
    .drop("tpep_pickup_datetime", "tpep_dropoff_datetime")  # Drop original timestamp columns
    
    
# select random 100000 samples to handle the large dataset on convservative system
cleaned_df = cleaned_df.orderBy(rand()).limit(100000)

Performing data cleaning and transformations...


In [5]:
# Show sample data
print("Sample of transformed data:")
cleaned_df.show(5)

# Write to MongoDB as raw data
print("Writing data to MongoDB...")
cleaned_df.write.format("mongo").mode("overwrite").save()

print("Data successfully loaded into MongoDB collection: raw_taxi_data")

Sample of transformed data:


                                                                                

+--------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+-------------------+-------------------+-----------+----------+------------+-----------+---------------------+----------+
|VendorID|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|    pickup_datetime|   dropoff_datetime|pickup_hour|pickup_day|pickup_month|pickup_year|trip_duration_minutes|      date|
+--------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+-------------------+-------------------+-----------+----------+------------

[Stage 3:>                                                          (0 + 1) / 1]

Data successfully loaded into MongoDB collection: raw_taxi_data


                                                                                

In [None]:
# Load data from MongoDB
print("Loading data from MongoDB...")
mongo_df = spark.read.format("mongo").load()

Loading data from MongoDB...


                                                                                

In [7]:
# Show sample data from MongoDB
print("Sample data from MongoDB:")
mongo_df.show(5)

Sample data from MongoDB:
+-----------+------------+------------+----------+--------+--------------------+--------------------+----------+-------------------+-----+-----------+---------------------+-------+---------------+------------+-------------------+----------+-----------+------------+-----------+------------------+----------+------------+------------+-------------+---------------------+
|Airport_fee|DOLocationID|PULocationID|RatecodeID|VendorID|                 _id|congestion_surcharge|      date|   dropoff_datetime|extra|fare_amount|improvement_surcharge|mta_tax|passenger_count|payment_type|    pickup_datetime|pickup_day|pickup_hour|pickup_month|pickup_year|store_and_fwd_flag|tip_amount|tolls_amount|total_amount|trip_distance|trip_duration_minutes|
+-----------+------------+------------+----------+--------+--------------------+--------------------+----------+-------------------+-----+-----------+---------------------+-------+---------------+------------+-------------------+-----

In [None]:
#!/usr/bin/env python3
# MongoDB to HBase Data Transfer

def connect_to_mongodb():
    """Connect to MongoDB"""
    try:
        # Update with your MongoDB connection details if different
        client = pymongo.MongoClient("mongodb://localhost:27017/")
        # Assuming you have a database called bigdata_demo with raw_taxi_data collection
        db = client["bigdata_demo"]
        collection = db["raw_taxi_data"]
        
        print(f"Successfully connected to MongoDB")
        print(f"Collection count: {collection.count_documents({})}")
        return collection
    except Exception as e:
        print(f"Error connecting to MongoDB: {e}")
        return None


In [None]:
import happybase
import pymongo
import uuid
import time
import concurrent.futures
from datetime import datetime
import multiprocessing
from threading import Lock
import queue
import itertools
# Global variables for thread-safe counters
processed_count = 0
processed_lock = Lock()
progress_update_interval = 5  # seconds
last_progress_time = time.time()

def connect_to_hbase():
    """Connect to HBase running in Docker"""
    try:
        # Connect to HBase Thrift server running on default port 9090
        connection = happybase.Connection('localhost', port=9090, timeout=300000, autoconnect=True)
        print("Successfully connected to HBase")
        return connection
    except Exception as e:
        print(f"Error connecting to HBase: {e}")
        return None

def create_hbase_table(connection):
    """Create HBase table for taxi data"""
    try:
        # Define the table schema
        families = {
            'trips': dict(max_versions=1),     # Trip details
            'payment': dict(max_versions=1),   # Payment information 
            'location': dict(max_versions=1)   # Location information
        }
        
        # Check if table exists, if so delete it for demo purposes
        if b'taxi_trips' in connection.tables():
            print("Dropping existing taxi_trips table for demo...")
            connection.delete_table('taxi_trips', disable=True)
        
        # Create the table
        print("Creating taxi_trips table in HBase...")
        connection.create_table('taxi_trips', families)
        print("Table created successfully!")
        
        return True
    except Exception as e:
        print(f"Error creating HBase table: {e}")
        return False

def mongodb_to_hbase_record(mongo_doc):
    """Convert a MongoDB document to HBase record format"""
    # Create a row key using date from the MongoDB document and a UUID
    # Assuming the document has a 'date' field
    date_str = mongo_doc.get('date', datetime.now().strftime('%Y-%m-%d'))
    date_formatted = date_str.replace('-', '')
    row_key = f"{date_formatted}-{str(uuid.uuid4())}"
    
    # Prepare the HBase record
    hbase_record = {
        'row_key': row_key,
        'trips': {},
        'payment': {},
        'location': {}
    }
    
    # Map MongoDB fields to HBase column families
    # Trips related fields
    trip_fields = ['pickup_datetime', 'dropoff_datetime', 'pickup_hour', 'pickup_day',
                  'pickup_month', 'pickup_year', 'trip_duration_minutes', 'passenger_count']
    
    # Payment related fields
    payment_fields = ['fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount',
                     'improvement_surcharge', 'total_amount', 'congestion_surcharge', 'payment_type']
    
    # Location related fields
    location_fields = ['PULocationID', 'DOLocationID', 'trip_distance']
    
    # Fill the HBase record from MongoDB document
    for field in trip_fields:
        if field in mongo_doc:
            hbase_record['trips'][field] = str(mongo_doc[field])
    
    for field in payment_fields:
        if field in mongo_doc:
            hbase_record['payment'][field] = str(mongo_doc[field])
    
    for field in location_fields:
        if field in mongo_doc:
            hbase_record['location'][field] = str(mongo_doc[field])
    
    return hbase_record

def insert_batch_to_hbase(batch, host='localhost', port=9090):
    """
    Insert a batch of records into HBase
    Each worker thread/process will have its own connection
    """
    try:
        # Create a new connection for this thread/process
        connection = happybase.Connection(host=host, port=port, timeout=30000)
        table = connection.table('taxi_trips')
        
        for record in batch:
            # Prepare data in HBase format
            data = {}
            
            # Add trip details
            for column, value in record['trips'].items():
                data[f'trips:{column}'.encode()] = str(value).encode()
                
            # Add payment details
            for column, value in record['payment'].items():
                data[f'payment:{column}'.encode()] = str(value).encode()
                
            # Add location details
            for column, value in record['location'].items():
                data[f'location:{column}'.encode()] = str(value).encode()
            
            # Insert record
            table.put(record['row_key'].encode(), data)
        
        # Update progress counter
        global processed_count, processed_lock, last_progress_time
        with processed_lock:
            processed_count += len(batch)
            current_time = time.time()
            if current_time - last_progress_time >= progress_update_interval:
                print(f"Progress: {processed_count} records processed")
                last_progress_time = current_time
        
        # Close the connection
        connection.close()
        return len(batch)
    except Exception as e:
        print(f"Error in worker thread/process: {e}")
        return 0

def chunker(iterable, chunk_size):
    """Yield chunks from an iterable"""
    it = iter(iterable)
    while True:
        chunk = list(itertools.islice(it, chunk_size))
        if not chunk:
            break
        yield chunk

def transfer_data_async(mongo_collection, hbase_connection, batch_size=100, num_workers=None):
    """Transfer data from MongoDB to HBase using multiple threads/processes"""
    try:
        import itertools
        
        # Determine number of workers (default to CPU count)
        if num_workers is None:
            num_workers = multiprocessing.cpu_count()
        
        print(f"Using {num_workers} worker threads for data transfer")
        
        # Get total document count
        total_docs = mongo_collection.count_documents({})
        print(f"Transferring {total_docs} documents from MongoDB to HBase...")
        
        # Reset global counter
        global processed_count
        processed_count = 0
        
        # Start time for performance tracking
        start_time = time.time()
        
        # Process all documents in batches
        cursor = mongo_collection.find({})
        
        # Convert MongoDB documents to HBase format
        print("Converting MongoDB documents to HBase records...")
        
        # Create a thread pool for processing batches
        with concurrent.futures.ThreadPoolExecutor(max_workers=num_workers) as executor:
            # Submit batches for processing
            futures = []
            
            # Process in chunks to avoid memory issues
            for chunk in chunker(cursor, 10000):
                # Convert documents to HBase records
                hbase_records = [mongodb_to_hbase_record(doc) for doc in chunk]
                
                # Process in batches
                for i in range(0, len(hbase_records), batch_size):
                    batch = hbase_records[i:i + batch_size]
                    futures.append(executor.submit(insert_batch_to_hbase, batch))
            
            # Wait for all futures to complete
            print(f"Waiting for {len(futures)} batch operations to complete...")
            total_processed = 0
            for future in concurrent.futures.as_completed(futures):
                try:
                    batch_count = future.result()
                    total_processed += batch_count
                except Exception as exc:
                    print(f"Batch processing generated an exception: {exc}")
        
        # Final statistics
        total_time = time.time() - start_time
        print(f"Transfer complete! {total_processed} records transferred to HBase in {total_time:.2f} seconds")
        if total_time > 0:
            print(f"Transfer rate: {total_processed/total_time:.2f} records/second")
        
        return True
    except Exception as e:
        print(f"Error transferring data: {e}")
        return False

def transfer_data_with_queue(mongo_collection, hbase_connection, batch_size=100, num_workers=None):
    """
    Transfer data using a producer-consumer pattern with a queue
    This is an alternative implementation that can be more memory efficient
    """
    try:
        import threading
        
        # Determine number of workers (default to CPU count)
        if num_workers is None:
            num_workers = multiprocessing.cpu_count()
        
        print(f"Using producer-consumer pattern with {num_workers} workers")
        
        # Get total document count
        total_docs = mongo_collection.count_documents({})
        print(f"Transferring {total_docs} documents from MongoDB to HBase...")
        
        # Reset global counter
        global processed_count
        processed_count = 0
        
        # Create a queue for batches
        batch_queue = queue.Queue(maxsize=num_workers * 2)  # Buffer some batches
        
        # Flag to signal consumers to exit
        done_producing = threading.Event()
        
        # Start time for performance tracking
        start_time = time.time()
        
        # Consumer function
        def consumer():
            while not (done_producing.is_set() and batch_queue.empty()):
                try:
                    batch = batch_queue.get(timeout=1)
                    insert_batch_to_hbase(batch)
                    batch_queue.task_done()
                except queue.Empty:
                    continue
                except Exception as e:
                    print(f"Consumer error: {e}")
        
        # Start consumer threads
        consumers = []
        for _ in range(num_workers):
            t = threading.Thread(target=consumer)
            t.daemon = True
            t.start()
            consumers.append(t)
        
        # Producer: get documents from MongoDB and put batches in queue
        try:
            cursor = mongo_collection.find({})
            current_batch = []
            
            for doc in cursor:
                # Convert document to HBase record
                record = mongodb_to_hbase_record(doc)
                current_batch.append(record)
                
                # When batch is full, add to queue
                if len(current_batch) >= batch_size:
                    batch_queue.put(current_batch)
                    current_batch = []
            
            # Add any remaining records
            if current_batch:
                batch_queue.put(current_batch)
            
        finally:
            # Signal that we're done producing
            done_producing.set()
        
        # Wait for all tasks to be processed
        batch_queue.join()
        
        # Wait for consumer threads to exit
        for t in consumers:
            t.join(timeout=1)
        
        # Final statistics
        total_time = time.time() - start_time
        print(f"Transfer complete! {processed_count} records transferred to HBase in {total_time:.2f} seconds")
        if total_time > 0:
            print(f"Transfer rate: {processed_count/total_time:.2f} records/second")
        
        return True
    except Exception as e:
        print(f"Error transferring data: {e}")
        return False

# Example usage
if __name__ == "__main__":
    import pymongo
    
    # Connect to MongoDB
    mongo_client = pymongo.MongoClient("mongodb://localhost:27017/")
    mongo_db = mongo_client["bigdata_demo"]
    mongo_collection = mongo_db["raw_taxi_data"]
    
    # Connect to HBase
    hbase_connection = connect_to_hbase()
    
    # Create table
    create_hbase_table(hbase_connection)
    
    # Choose the transfer method:
    # 1. Thread pool executor (better for smaller datasets)
    transfer_data_async(mongo_collection, hbase_connection, batch_size=1000)


Successfully connected to HBase
Dropping existing taxi_trips table for demo...
Creating taxi_trips table in HBase...
Table created successfully!
Using 12 worker threads for data transfer
Transferring 100000 documents from MongoDB to HBase...
Converting MongoDB documents to HBase records...
Progress: 1000 records processed
Waiting for 100 batch operations to complete...
Progress: 13000 records processed
Progress: 25000 records processed
Progress: 37000 records processed
Progress: 49000 records processed
Progress: 61000 records processed
Progress: 73000 records processed
Progress: 85000 records processed
Transfer complete! 100000 records transferred to HBase in 62.65 seconds
Transfer rate: 1596.06 records/second
