In [None]:
from collections import deque
import time
import math
from datetime import datetime, timedelta
import pandas as pd

In [None]:
from datetime import datetime, timedelta

class TimeSlots:
    def __init__(self, start_time="10:00", end_time="23:55", interval=5):
        self.start_time = datetime.strptime(start_time, "%H:%M")
        self.end_time = datetime.strptime(end_time, "%H:%M")
        self.interval = timedelta(minutes=interval)
        self.slots = self.generate_time_slots()
    
    def generate_time_slots(self):
        slots = []
        current_time = self.start_time
        while current_time <= self.end_time:
            slots.append(current_time.strftime("%H:%M"))
            current_time += self.interval
        return slots
    
    def display_slots(self):
        for slot in self.slots:
            print(slot)

# Create an instance of TimeSlots
time_slots = TimeSlots()

# Display the time slots
time_slots.display_slots()

# This fuction create a list of time instances 


In [None]:
class Visitor: # generate unique Visitor
    def __init__(self, id, itinerary, fast_pass: int, arrival_time):
        """
        :param id: a unique ID
        :param itinerary: a queue of itinerary (where they will go, in sequence)
        :param fast_pass: 0 for regular ticket, 1 for fast-pass ticket
        """
        self.id = id
        self.fast_pass = fast_pass 
        self.itinerary = itinerary 
        self.current_time = arrival_time # First set to when they are 'spawned'
        self.accumulated_waiting_time = 0 ### LINK TO JON's part
        self.accumulated_satisfactory_score = 0 ### LINK TO JON's part
        self.current_action = None

    def __repr__(self): # representation method 
        return f"Visitor {self.id, self.fast_pass}"
    
    def choose_action(self):
        self.current_action = self.itinerary.popleft()
        return self.current_action


In [None]:
class VisitorList: # List of visitor for a single attraction/utility throughout the whole day
    def __init__(self, name):
        """
        :param name: The name of the attraction/utility
        """
        self.name = name
        self.name_list = []

    def __repr__(self): # representation method 
        return f"Visitor List for {self.name}"
    
    def add_visitor_to_list(self, visitor):
        """
        :param visitor: a Visitor object
        """
        if isinstance(visitor, Visitor):
            self.name_list.append(visitor)

In [None]:
class Attraction: # Including shows, rides
    def __init__(self, name: str, ride_duration: int, ride_capacity: int):
        """
        Initialize an attraction with normal and express queues.
        :param name: Name of the attraction.
        :param ride_duration: Duration of the ride
        :param ride_capacity: Number of visitors that can be processed at once.
        """
        self.name = name
        self.regular_queue = deque()  
        self.fast_pass_queue = deque()
        self.ride_duration = ride_duration  # Time of each ride
        self.ride_capacity = ride_capacity # Max number of ppl a ride can take
        self.current_time = 10  # Simulation time in seconds
        self.total_served = 0  # cumulative number of served visitors 
        self.rides_served = 0 # Cumulative number of rides served in the current simulation, Flag for early termination
    
    def get_rides_served(self):
        return self.rides_served

    # def advance_time(self):
    #     # Update the current time based on the ride time
    #     self.current_time += self.ride_duration
    #     print(f"Time advanced to: {self.current_time} seconds.")

    def add_visitor(self, visitor):
        """
        :param visitor: a Visitor object
        """
        if isinstance(visitor, Visitor):
            if visitor.arrival_time <= self.current_time:
                if visitor.fast_pass == 1:
                    self.fast_pass_queue.append(visitor)
                    print(f"{visitor} added to the fast-pass queue at time {self.current_time}.") # for debugtest
                else:
                    self.regular_queue.append(visitor)
                    print(f"{visitor} added to the regular queue at time {self.current_time}.") # for debugtest
            else:
                print(f"{visitor} has not yet arrived at time {self.current_time}.")
        else:
            print("Only Visitor objects can be added to the queue.") # for debugtest

    def process_queue(self):
        # assumption: ride_capacity is EVEN number; fast-pass vs regular queue each take up 1/2 of ride_capacity
        # operation: take min(1/2*ride_capacity visitors out of fast_pass_queue, len(fast_pass_queue)), the rest from regular_queue
        
        riding_visitors = [] # list of visitors being served in the current ride
        
        # take min(1/2*ride_capacity visitors out of fast_pass_queue, len(fast_pass_queue))
        if self.fast_pass_queue: # while fast_pass_queue has visitors
            for _ in range(min(self.ride_capacity//2, len(self.fast_pass_queue))):
                person = self.fast_pass_queue.popleft()
                if person not in riding_visitors:
                    riding_visitors.append(person) # Get visitors in fast_pass_queue
                    print(f"{person} from Fast Pass Queue is being served.") # for debugging
                    self.total_served += 1
                    print("total serving so far:", self.total_served) # for debugging
        
        fast_pass_filled_ride = len(riding_visitors) # number of fast_pass visitors going to the current ride
        
        # the rest from regular_queue
        if self.regular_queue:
            for _ in range(min(self.ride_capacity - fast_pass_filled_ride, len(self.regular_queue))):
                person = self.regular_queue.popleft()
                if person not in riding_visitors:
                    riding_visitors.append(person) # Get visitors in regular_queue
                    print(f"{person} from Regular Queue is being served.") # for debugging
                    self.total_served += 1
                    print("total serving so far:", self.total_served) # for debugging
        
        if not self.fast_pass_queue and not self.regular_queue:
            print("No one is waiting in the queues at time", self.current_time)
        
        self.rides_served += 1

        # Update the time for Visitor objects NEW CODE
        for visitor in riding_visitors:
            visitor.current_time += timedelta(minutes=self.ride_duration) 


        # now we update the time OLD CODE
        for visitor in riding_visitors: 
            visitor.start_time = self.current_time  # When visitor starts the ride
            visitor.end_time = self.current_time + self.ride_duration
            print(f"{visitor} started at {visitor.start_time} and finished at {visitor.end_time}.") # for debugtest
            self.calculate_waiting_time(visitor)

        # Create outputs: curr_wait_time, crowd_level
        crowd_level = {'fast_pass_queue':len(self.fast_pass_queue), 
                       'regular_queue':len(self.regular_queue)}
        curr_wait_time = {'fast_pass_queue':math.floor(crowd_level['fast_pass_queue']/self.ride_capacity)*self.ride_duration, 
                          'regular_queue':math.floor(crowd_level['regular_queue']/self.ride_capacity)*self.ride_duration}

        # and lastly, we update the current time ### how to use universal time
        self.current_time += self.ride_duration
        print(f"The time now is {self.current_time}.")

        return {
            'riding_visitors': riding_visitors, # list of visitors being served at the current ride
            'rides_served': self.rides_served, # flag for early termination
            'total_served': self.total_served, # total number of visitors served up until that moment
            'current_time': self.current_time,
            'crowd_level': crowd_level, # dictionary
            'curr_wait_time': curr_wait_time # dictionary
        }

    def calculate_waiting_time(self, visitor):
        visitor.waiting_time = visitor.start_time - visitor.arrival_time
        print(f"{visitor} waited for {visitor.waiting_time} seconds.")



In [None]:
class Utility: # Including toilets, dining outlets, souvenir shops
    def __init__(self, name: str, service_time: int, util_capacity: int):
        """
        Initialize a utility in the part.
        :param name: Name of the utility.
        :param service_time: Duration spent inside the Utility
        :param util_capacity: Number of visitors that can be processed at once.
        """
        self.name = name
        self.service_time = service_time 
        self.util_capacity = util_capacity 
        self.queue = deque()
        self.total_served = 0

    def add_visitor(self, visitor):
        """
        :param visitor: a Visitor object
        """
        if isinstance(visitor, Visitor):
            if visitor.arrival_time <= self.current_time:
                self.queue.append(visitor)
                print(f"{visitor} added to the queue at time {self.current_time}.") # for debugtest
            else:
                print(f"{visitor} has not yet arrived at time {self.current_time}.") # for debugtest
        else:
            print("Only Visitor objects can be added to the queue.") # for debugtest

    def process_queue(self):
        
        current_use_visitors = [] # List of visitors being served in the current utility
        
        if self.queue: # while queue has visitors
            for _ in range(len(self.queue)):
                person = self.queue.popleft()
                if person not in current_use_visitors:
                    current_use_visitors.append(person) 
                    print(f"{person} from queue is being served.") # for debugging
                    self.total_served += 1
                    print("total serving so far:", self.total_served) # for debugging
        
        # if not self.fast_pass_queue and not self.regular_queue:
        #     print("No one is waiting in the queues at time", self.current_time)

        # now we update the time
        for visitor in current_use_visitors: 
            visitor.start_time = self.current_time  # When visitor starts the ride
            visitor.end_time = self.current_time + self.ride_duration
            print(f"{visitor} started at {visitor.start_time} and finished at {visitor.end_time}.") # for debugtest
            self.calculate_waiting_time(visitor)

        # Create outputs: curr_wait_time, crowd_level
        crowd_level = {'fast_pass_queue':len(self.fast_pass_queue), 
                       'regular_queue':len(self.regular_queue)}
        curr_wait_time = {'fast_pass_queue':math.floor(crowd_level['fast_pass_queue']/self.ride_capacity)*self.ride_duration, 
                          'regular_queue':math.floor(crowd_level['regular_queue']/self.ride_capacity)*self.ride_duration}

        # and lastly, we update the current time ### how to use universal time
        self.current_time += self.ride_duration
        print(f"The time now is {self.current_time}.")

        return {
            'riding_visitors': current_use_visitors, # list of visitors being served at the current ride
            'rides_served': self.rides_served, # flag for early termination
            'total_served': self.total_served, # total number of visitors served up until that moment
            'current_time': self.current_time,
            'crowd_level': crowd_level, # dictionary
            'curr_wait_time': curr_wait_time # dictionary
        }

    def calculate_waiting_time(self, visitor):
        visitor.waiting_time = visitor.start_time - visitor.arrival_time
        print(f"{visitor} waited for {visitor.waiting_time} seconds.")


    
    

In [None]:
class ThemePark:
    def __init__(self, num_visitors: int):
        """
        Initialize the theme park and visitor tracking.
        :param num_visitors: Total number of visitors in the simulation.
        """
        self.visitor_count = num_visitors
        self.global_time = 0  # Overall time counter for the simulation
        self.attractions = []
   

    def add_attraction(self, attraction: Attraction):
        """
        Add an attraction to the park.
        :param attraction: Attraction object.
        """
        self.attractions.append(attraction)

    def assign_visitors(self, express_pass_ratio=0.2):
        """
        Randomly assign visitors to attractions, with a percentage getting express passes.
        :param express_pass_ratio: Fraction of visitors with express passes.
        """
        for visitor_id in range(self.visitor_count):
            attraction = random.choice(self.attractions)
            express_pass = random.random() < express_pass_ratio  # Assign express pass based on ratio
            attraction.add_visitor_to_queue(visitor_id, express_pass)

    def simulate(self, steps: int):
        """
        Run the simulation for a fixed number of time steps.
        :param steps: Number of simulation steps (time units).
        """
        for _ in range(steps):
            for attraction in self.attractions:
                attraction.increment_time(self.visitor_wait_times)
            self.global_time += 1

    def output_waiting_times(self):
        """
        Output the total accumulated waiting times for each visitor.
        """
        for visitor_id, wait_time in enumerate(self.visitor_wait_times):
            print(f"Visitor {visitor_id} - Total Wait Time: {wait_time} minutes")

    def get_crowd_levels(self):
        """
        Output the average crowd levels for each attraction.
        """
        for attraction in self.attractions:
            crowd_level_summary = attraction.get_crowd_level_summary()
            print(f"{attraction.name} Crowd Levels: {crowd_level_summary}")

    def get_waiting_times_summary(self):
        """
        Get the average waiting time and total time spent for analysis.
        :return: Dictionary summarizing average waiting time and total waiting time.
        """
        return {
            "Average Wait Time": np.mean(self.visitor_wait_times),
            "Total Wait Time": np.sum(self.visitor_wait_times)
        }


# Example Usage
park = ThemePark(num_visitors=50)

# Create attractions with average wait times and capacity
roller_coaster = Attraction("Roller Coaster", avg_wait_time=45, capacity=20)
ferris_wheel = Attraction("Ferris Wheel", avg_wait_time=30, capacity=15)
haunted_house = Attraction("Haunted House", avg_wait_time=25, capacity=10)

# Add attractions to the park
park.add_attraction(roller_coaster)
park.add_attraction(ferris_wheel)
park.add_attraction(haunted_house)

# Assign visitors to queues (20% with express passes)
park.assign_visitors(express_pass_ratio=0.2)

# Run the simulation for a specified number of time steps
park.simulate(steps=60)

# Output total waiting times for each visitor
park.output_waiting_times()

# Output average crowd levels for each attraction
park.get_crowd_levels()

# Summary of average waiting times for analysis
waiting_times_summary = park.get_waiting_times_summary()
print("\nWaiting Times Summary:", waiting_times_summary)

In [None]:
# Create Visitor objects from csv file
# Columns needed: id, itinerary, fast_pass, arrival_time

visitor_generator_df = pd.read_csv('file path here')

visitors = [] # List to store Visitor objects
for index, row in visitor_generator_df.iterrows():
    new_visitor = Attraction(id=row['id'], itinerary=row['itinerary'], fast_pass=row['fast_pass'], arrival_time=row['arrival time']) # change according to the finalised csv
    visitors.append(new_visitor)

In [None]:
# Create Attraction objects from csv file
# Columns needed: name: str, ride_duration: int, ride_capacity: int

attraction_generator_df = pd.read_csv('file path here')

attractions = {} # Dictionary to store Attraction objects
attractions_name = [] # List to store Attraction name, to link with Visitor action
for index, row in attraction_generator_df.iterrows():
    new_attraction = Attraction(name=row['name'], ride_duration=row['duration'], ride_capacity=row['capacity']) # change according to the finalised csv
    attractions[row['name']] = new_attraction # key = new_attraction_name, item = Attraction object new_attraction
    attractions_name.append(row['name'])

