In [1]:
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider

# Set up Cassandra connection
cluster = Cluster(['localhost'])
session = cluster.connect('my_keyspace')

In [2]:
import random
import time
import requests
import threading
import random

# Create table for reservations
create_table_query = """
    CREATE TABLE IF NOT EXISTS reservations (
        id INT PRIMARY KEY,
        guest_name TEXT,
        check_in_date DATE,
        check_out_date DATE
    )
"""

# Create table for occupied dates
create_occupied_dates_table_query = """
    CREATE TABLE IF NOT EXISTS occupied_dates (
        check_in_date DATE PRIMARY KEY
    )
"""

session.execute(create_table_query)
session.execute(create_occupied_dates_table_query)

def check_availability(check_in_date):
    select_query = """
        SELECT * FROM occupied_dates WHERE check_in_date = %s
    """
    result = session.execute(select_query, (check_in_date,))
    if result:
        return False  # Date not available
    else:
        return True  # Available date

# Function to create a reservation
def create_reservation(reservation_id, guest_name, check_in_date, check_out_date):
    if not check_availability(check_in_date):
        print("Date not available!")
        return

    insert_query = """
        INSERT INTO reservations (id, guest_name, check_in_date, check_out_date)
        VALUES (%s, %s, %s, %s)
    """
    try:
        session.execute(insert_query, (reservation_id, guest_name, check_in_date, check_out_date))
        insert_occupied_date_query = """
            INSERT INTO occupied_dates (check_in_date)
            VALUES (%s)
        """
        session.execute(insert_occupied_date_query, (check_in_date,))
        print("Reservation created successfully.")
    except Exception as e:
        print("Error creating reservation:", str(e))

# Function to update a reservation
def update_reservation(reservation_id, guest_name):
    update_query = """
        UPDATE reservations SET guest_name = %s WHERE id = %s
    """
    try:
        session.execute(update_query, (guest_name, reservation_id))
        print("Reservation updated successfully.")
    except Exception as e:
        print("Error updating reservation:", str(e))

# Function to cancel a reservation
def cancel_reservation(reservation_id):
    delete_query = """
        DELETE FROM reservations WHERE id = %s
    """
    try:
        session.execute(delete_query, (reservation_id,))
        print("Reservation canceled successfully.")
    except Exception as e:
        print("Error canceling reservation:", str(e))

# Function to retrieve a reservation by ID
def get_reservation(reservation_id):
    select_query = """
        SELECT * FROM reservations WHERE id = %s
    """
    result = session.execute(select_query, (reservation_id,))
    reservation = result.one()
    if reservation:
        print("Reservation ID:", reservation.id)
        print("Guest Name:", reservation.guest_name)
        print("Check-in Date:", reservation.check_in_date)
        print("Check-out Date:", reservation.check_out_date)
    else:
        print("Reservation not found.")

# Function to retrieve all reservations
def get_all_reservations():
    select_query = """
        SELECT * FROM reservations
    """
    result = session.execute(select_query)
    for reservation in result:
        print("Reservation ID:", reservation.id)
        print("Guest Name:", reservation.guest_name)
        print("Check-in Date:", reservation.check_in_date)
        print("Check-out Date:", reservation.check_out_date)






In [3]:
import random
from datetime import datetime, timedelta
# Function to generate a random date within a given range
def generate_random_date(start_date, end_date):
    start_date = datetime.strptime(start_date, '%Y-%m-%d')
    end_date = datetime.strptime(end_date, '%Y-%m-%d')
    delta = end_date - start_date
    random_days = random.randint(0, delta.days)
    random_date = start_date + timedelta(days=random_days)
    return random_date.strftime('%Y-%m-%d')

# Function to create a reservation with random dates
def create_random_reservation(reservation_id, guest_name, start_date, end_date):
    random_check_in_date = generate_random_date(start_date, end_date)
    random_check_out_date = generate_random_date(random_check_in_date, end_date)
    create_reservation(reservation_id, guest_name, random_check_in_date, random_check_out_date)

We tried to implement the stress tests in two different ways. Find them both below.

In [16]:
# Stress Test 1: The client makes the same request very quickly.
def stress_test_1():
    for _ in range(10):
        create_reservation(1, 'John Doe', '2023-06-15', '2023-06-20')

# Stress Test 2: Two or more clients making simultaneous requests.
def stress_test_2():
    def create_reservation_concurrent(reservation_id):
        guest_name = f"Guest {reservation_id}"
        check_in_date = '2023-06-15'
        check_out_date = '2023-06-20'
        create_reservation(reservation_id, guest_name, check_in_date, check_out_date)

    # Create multiple threads and execute concurrent requests
    threads = []
    for i in range(10):
        thread = threading.Thread(target=create_reservation_concurrent, args=(i+1,))
        threads.append(thread)
        thread.start()

    # Wait for all threads to complete
    for thread in threads:
        thread.join()

# Stress Test 3: Generate a large load of data (1000 requests)
def stress_test_3():
    for i in range(2):
        reservation_id = i + 1
        guest_name = f"Guest {reservation_id}"
        check_in_date = '2023-06-15'
        check_out_date = '2023-06-20'
        create_random_reservation(reservation_id, guest_name, check_in_date, check_out_date)
        time.sleep(0.01)  # Add a small delay to simulate realistic load

# Example usage
create_reservation(1, 'John Doe', '2023-06-15', '2023-06-20')
#update_reservation(1, 'Jane Smith')
#cancel_reservation(1)
#get_reservation(1)
#get_all_reservations()

# Run stress tests
#stress_test_1()
#stress_test_2()
stress_test_3()


Date not available!
Date not available!
Date not available!


In [5]:
import unittest

class HotelReservationStressTest(unittest.TestCase):
        
    @classmethod
    def tearDownClass(cls):
        # Clean up after all tests are executed
        cls.session.shutdown()
        cls.cluster.shutdown()
    
    # Stress Test 1: The client makes the same request very quickly
    def test_same_request_quickly(self):
        def make_same_request():
            # Simulate a client making the same request quickly
            for _ in range(10):
                self.session.execute("INSERT INTO reservations (guest_name, check_in_date) VALUES ('John Doe', '2023-06-15')")
        
        # Create a client thread that makes the same request repeatedly in a short interval
        client_thread = threading.Thread(target=make_same_request)
        client_thread.start()
        client_thread.join()
        
        # Add assertions to validate the expected results
        
        pass
    
    # Stress Test 2: Two or more clients make the possible requests randomly
    def test_multiple_clients_random_requests(self):
        def make_random_request():
            # Simulate a client making random requests
            check_in_dates = ["2023-12-06", "2024-05-08", "2023-06-06", "2023-09-11"]
            names = ["John Doe", "Jane Smith", "Alice Johnson", "Bob Brown"]
            for _ in range(10):
                name = random.choice(names)
                check_in = random.choice(check_in_dates)
                self.session.execute(f"INSERT INTO reservations (guest_name, check_in_date) VALUES ('{name}', '{check_in}')")
        
        # Create multiple client threads that make random requests simultaneously
        client_threads = []
        for _ in range(2):
            client_thread = threading.Thread(target=make_random_request)
            client_threads.append(client_thread)
            client_thread.start()
        
        for client_thread in client_threads:
            client_thread.join()
        
        pass
    
    # Stress Test 3: Immediate occupancy of all reservations on 2 clients
    def test_immediate_occupancy(self):
        def occupy_all_seats():
            # Simulate clients trying to occupy all seats immediately
            while not self.is_full():
                room = self.get_next_available_room()
                self.session.execute(f"INSERT INTO reservations (guest_name, check_in_date) VALUES ('John Doe', '{room}')")
        
        # Create two client threads that try to occupy all the reservations immediately
        client_threads = []
        for _ in range(2):
            client_thread = threading.Thread(target=occupy_all_seats)
            client_threads.append(client_thread)
            client_thread.start()
        
        for client_thread in client_threads:
            client_thread.join()
        
        
        pass
    

In [None]:
# Close Cassandra connection
session.shutdown()
cluster.shutdown()