In [1]:
import numpy as np
import random
import requests
import geopy
import pprint as pp
from typing import List, Tuple
from scipy.stats import beta

In [7]:
API_URL = "http://router.project-osrm.org/route/v1/{profile}/{lon1},{lat1};{lon2},{lat2}"
GEOLOCATOR = geopy.geocoders.Nominatim(user_agent="My Geocoder")

class Location:
    def __init__(self, name: str, lat: float = None, lon: float = None):
        self.name = name
        self.lat = lat
        self.lon = lon

    def __repr__(self):
        return str(self.__dict__)

    @classmethod
    def from_name(cls, name: str):
        coords = GEOLOCATOR.geocode(name, exactly_one=True)
        if coords:
            return Location(name=name, lat=coords.latitude, lon=coords.longitude)
        else:
            return None

    @staticmethod
    def get_distance(profile: str, lat1: float, lon1: float, lat2: float, lon2: float):
        """
        Get the distance between two points using the OSRM API.
        - profile: Either "walking" or "driving"
        - lat1, lon1: Coordinates of the first location
        - lat2, lon2: Coordinates of the second location
        Returns the distance in meters.
        """
        url = API_URL.format(profile=profile, lat1=lat1, lon1=lon1, lat2=lat2, lon2=lon2)
        params = {"overview": "false"}
        response = requests.get(url, params=params)
        if response.status_code == 200:
            data = response.json()
            distance = data["routes"][0]["distance"]  # distance in meters
            return distance
        else:
            raise Exception(f"Error: {response.status_code} - {response.text}")

    @staticmethod
    def get_duration_by_distance(distance: float, speed: float):
        """
        Calculate duration by dividing distance by speed.
        - distance: in meters
        - speed: in meters per second
        Returns the duration in minutes.
        """
        duration_seconds = distance / speed
        return duration_seconds / 60.0  # convert to minutes

    def get_walking_duration_to(self, other, speed_mph: float = 3):
        # Get the distance in meters for walking
        distance = self.get_distance("walking", self.lat, self.lon, other.lat, other.lon)
        # Convert MPH to M/S
        speed_mps = speed_mph * 0.44704
        return self.get_duration_by_distance(distance, speed_mps)

    def get_driving_duration_to(self, other, speed_mph: float = 10):
        # Get the distance in meters for driving
        distance = self.get_distance("driving", self.lat, self.lon, other.lat, other.lon)
        # Convert MPH to M/S
        speed_mps = speed_mph * 0.44704
        return self.get_duration_by_distance(distance, speed_mps)


In [8]:
class ParkingStructure:
    def __init__(self, index: int, location: Location, is_full: bool = False):
        self.index = index
        self.location = location  # Location object representing the parking structure

    def __repr__(self):
        return str(self.__dict__)

In [9]:
class QLearningParking:
    def __init__(self,
                 parking_structures: List[ParkingStructure],
                 start: Location,
                 end: Location,
                 prior: List[List[int]] = None,
                 alpha: float = 0.1,
                 gamma: float = 0.9,
                 epsilon: float= 0.1,
                 precomputed_duration: List[List[float]] = None):
        self.parking_structures = parking_structures
        self.start = start
        self.end = end
        self.alpha = alpha # Learning rate
        self.gamma = gamma # Discount factor
        self.epsilon = epsilon # Exploration rate

        # Initialize prior belief about the probability of finding parking at each structure using the Beta distribution.
        # prior_alpha represents success counts, prior_beta represents fail counts
        if prior:
            self.prior = prior
        else:
            self.prior = [[1.0, 1.0]] * len(parking_structures) # Uniform prior

        if precomputed_duration:
            self.duration_matrix = precomputed_duration
        else:
            self.duration_matrix = self.precompute_duration()

        self.start_duration = [self.start.get_driving_duration_to(parking.location) for parking in self.parking_structures]
        self.end_duration = [parking.location.get_walking_duration_to(self.end) for parking in self.parking_structures]

        self.table = dict()


    @classmethod
    def success_prob_to_prior(cls, prob: float):
        return [int(round(prob * 100.0)), int(round((1 - prob) * 100.0))]

    def precompute_duration(self):
        """
        Precompute the duration of time (driving) between all pairs of parking structures
        and store them in a matrix.
        """
        num_structures = len(self.parking_structures)
        duration_matrix = np.zeros((num_structures, num_structures))
        print("Precomputing duration of time between all pairs of parking structures.")
        for i in range(num_structures):
            for j in range(i + 1, num_structures):
                # Calculate driving durations between parking structures i and j

                duration = self.parking_structures[i].location.get_driving_duration_to(self.parking_structures[j].location)
                print(self.parking_structures[i].location.name, "->", self.parking_structures[j].location.name, ":", duration, "mins")
                duration_matrix[i][j] = duration
                duration_matrix[j][i] = duration # Mirror the values for j->i
        return duration_matrix

    def choose_action(self, state: Tuple[int], valid_actions: List[int]):
        """
        Using Epsilon-greedy, choose an action (parking structure).
        - valid_actions: List of indices of parking structures that are not full.
        - state: Current state as a list.
        """

        # Ensure the current state is in the Q-table
        if state not in self.table:
            self.table[state] = np.zeros(len(self.parking_structures))

        if random.uniform(0, 1) < self.epsilon:
            # Choose a random valid action
            action = random.choice(valid_actions)
        else:
            # Choose the action with the highest Q-value among valid actions
            mask = np.ones_like(self.table[state], dtype=bool)
            for visited in state:
                mask[visited] = False

            # mask off indices of parking structures we already visited
            masked_q_values = np.where(mask, self.table[state], -np.inf)
            action = int(np.argmax(masked_q_values))

        return action

    def get_reward(self, current: ParkingStructure, next_: ParkingStructure, found_parking: bool):
        """
        Determine reward
        """
        reward = 0.0
        if current == self.start:
            # driving duration from start to first parking tried
            reward -= self.start_duration[next_.index]
        else:
          if found_parking:
              # success, found parking
              self.prior[current.index][0] += 1
              # walking duration to final destination from successful parking
              reward -= self.end_duration[current.index]
          else:
              # failure, did not find parking
              self.prior[current.index][1] += 1

              # have to spend time driving to the next parking structure
              reward -= self.duration_matrix[current.index][next_.index]

        return reward

    def update(self, state: List[int], action: int, reward: float, next_state: List[int]):
        """
        Update Q-value table
        - state: Current state as a list (binary vector indicating visited parking structures)
        - action: The index of the action (parking structure) taken
        - reward: The reward received
        - next_state: Next state as a list
        """

        # Ensure the current and next states are in the Q-table
        if state not in self.table:
            self.table[state] = np.zeros(len(self.parking_structures))

        if next_state not in self.table:
            self.table[next_state] = np.zeros(len(self.parking_structures))

        # Get the current Q-value
        current_q = self.table[state][action]

        # Calculate the maximum Q-value for the next state
        valid_future_q_indices = list(set(range(len(self.parking_structures))) - set(next_state))

        if valid_future_q_indices:
            max_future_q = max(self.table[next_state][valid_future_q_indices])
        else:
            max_future_q = 0.0

        # Update Q-value using the Bellman equation
        self.table[state][action] = current_q + self.alpha * (reward + self.gamma * max_future_q - current_q)

    def train(self, episodes: int = 1000):
        """
        Train the model
        """
        for episode in range(episodes):
            state = tuple()
            current = self.start
            while True:
                # Determine valid actions based on state
                valid_actions = list(set(range(len(self.parking_structures))) - set(state))
                if not valid_actions:
                    # No valid actions left, didn't find parking, apply large negative reward.
                    reward = -9999
                    break

                action = self.choose_action(state, valid_actions)

                next_state = state + tuple((action,))
                next_ = self.parking_structures[action]
                sample = beta.rvs(*self.prior[action])
                found_parking = sample < random.random()

                reward = self.get_reward(current, next_, found_parking)
                #print(action, self.table[state], reward)
                self.update(state, action, reward, next_state)
                #print("updated", action, self.table[state])
                state = next_state
                current = next_

In [10]:
# Prior for likelihood of finding parking at each parking structure
parking_probability_map = {
    "Roble Field Garage" : 0.80,
    "Via Ortega Garage": 0.50,
    "Stock Farm Garage": 0.50,
    "Tressider Student Union": 0.10,
    "Wilbur Field Garage": 0.50,
}

parking_structures = []
prior = []
for i, (name, prob) in enumerate(parking_probability_map.items()):
    prior.append(QLearningParking.success_prob_to_prior(prob))
    location = Location.from_name(name)
    parking_structures.append(ParkingStructure(i, location))

start_point_name = "Galvez Street & El Camino Real"
start_point = Location.from_name(start_point_name)
end_point_name = "NVIDIA Auditorium"
end_point = Location.from_name(end_point_name)
print(start_point)
print(end_point)
print(prior)

{'name': 'Galvez Street & El Camino Real', 'lat': 37.4369847, 'lon': -122.1611878}
{'name': 'NVIDIA Auditorium', 'lat': 37.4281301, 'lon': -122.1742004}
[[80, 20], [50, 50], [50, 50], [10, 90], [50, 50]]


In [11]:
q_learn = QLearningParking(parking_structures=parking_structures,
                           start=start_point,
                           end=end_point,
                           prior=prior,
                           alpha=0.1,
                           gamma=0.9,
                           epsilon=0.1)

Precomputing duration of time between all pairs of parking structures.
Roble Field Garage -> Via Ortega Garage : 0.8384782868050585 mins
Roble Field Garage -> Stock Farm Garage : 4.388496182295395 mins
Roble Field Garage -> Tressider Student Union : 3.1410313767597233 mins
Roble Field Garage -> Wilbur Field Garage : 6.230613218802196 mins
Via Ortega Garage -> Stock Farm Garage : 4.219607492245287 mins
Via Ortega Garage -> Tressider Student Union : 3.9795096635647824 mins
Via Ortega Garage -> Wilbur Field Garage : 7.069464328322597 mins
Stock Farm Garage -> Tressider Student Union : 7.519088523025531 mins
Stock Farm Garage -> Wilbur Field Garage : 12.04478346456693 mins
Tressider Student Union -> Wilbur Field Garage : 3.3095472440944884 mins


In [12]:
pp.pprint(q_learn.duration_matrix)

array([[ 0.        ,  0.83847829,  4.38849618,  3.14103138,  6.23061322],
       [ 0.83847829,  0.        ,  4.21960749,  3.97950966,  7.06946433],
       [ 4.38849618,  4.21960749,  0.        ,  7.51908852, 12.04478346],
       [ 3.14103138,  3.97950966,  7.51908852,  0.        ,  3.30954724],
       [ 6.23061322,  7.06946433, 12.04478346,  3.30954724,  0.        ]])


In [13]:
q_learn.train(episodes=10000)

In [14]:
print(q_learn.start_duration)
print(q_learn.end_duration)

[10.573625029825818, 9.770937723693631, 10.082617513719876, 11.137705798138871, 9.116633858267717]
[4.717450091465839, 3.3665891195418753, 16.569484212200745, 15.187554680664913, 25.43396564065855]


In [15]:
print(q_learn.table)

{(): array([-32.5628041 , -31.43693081, -32.04373191, -33.64849791,
       -31.71941803]), (0,): array([  0.        , -24.5010232 , -25.0181322 , -25.03324763,
       -25.807126  ]), (0, 1): array([  0.        ,   0.        , -24.12310332, -24.48269255,
       -25.24106748]), (0, 1, 2): array([  0.        ,   0.        ,   0.        , -21.8174906 ,
       -29.54443622]), (0, 1, 2, 3): array([ 0.        ,  0.        ,  0.        ,  0.        , -8.23972666]), (0, 1, 2, 3, 4): array([0., 0., 0., 0., 0.]), (1,): array([-24.71576747,   0.        , -23.36772531, -25.74974581,
       -29.07538754]), (1, 0): array([  0.        ,   0.        , -25.45514461, -25.67398288,
       -29.75049254]), (1, 0, 2): array([  0.        ,   0.        ,   0.        , -23.35580313,
       -28.82309118]), (1, 0, 2, 3): array([  0.        ,   0.        ,   0.        ,   0.        ,
       -10.29941797]), (1, 0, 2, 3, 4): array([0., 0., 0., 0., 0.]), (2,): array([-26.71618586, -22.01500579,   0.        , -26.4981

In [16]:

policy = {}
for state, q_values in q_learn.table.items():
    mask = np.ones_like(q_values, dtype=bool)
    for visited in state:
        mask[visited] = False

    # mask off indices of parking structures we already visited
    masked_q_values = np.where(mask, q_values, -np.inf)
    best_action = np.argmax(masked_q_values)
    if best_action in state:
        policy[state] = None
    else:
        policy[state] = np.argmax(masked_q_values)


In [21]:
print("Learned Policy:")
names = list(parking_probability_map.keys())
results = dict()
for state, action in policy.items():
    # Pair each number in the order with the corresponding name
    name_state = ' -> '.join([names[visited] for visited in state])
    optimal_action_name = names[action] if action else None
    print("Visited:", name_state)
    print("Optimal Action:", optimal_action_name)
    print()
    results[name_state] = optimal_action_name




Learned Policy:
Visited: 
Optimal Action: Via Ortega Garage

Visited: Roble Field Garage
Optimal Action: Via Ortega Garage

Visited: Roble Field Garage -> Via Ortega Garage
Optimal Action: Stock Farm Garage

Visited: Roble Field Garage -> Via Ortega Garage -> Stock Farm Garage
Optimal Action: Tressider Student Union

Visited: Roble Field Garage -> Via Ortega Garage -> Stock Farm Garage -> Tressider Student Union
Optimal Action: Wilbur Field Garage

Visited: Roble Field Garage -> Via Ortega Garage -> Stock Farm Garage -> Tressider Student Union -> Wilbur Field Garage
Optimal Action: None

Visited: Via Ortega Garage
Optimal Action: Stock Farm Garage

Visited: Via Ortega Garage -> Roble Field Garage
Optimal Action: Stock Farm Garage

Visited: Via Ortega Garage -> Roble Field Garage -> Stock Farm Garage
Optimal Action: Tressider Student Union

Visited: Via Ortega Garage -> Roble Field Garage -> Stock Farm Garage -> Tressider Student Union
Optimal Action: Wilbur Field Garage

Visited: Via O

In [None]:
!pip install folium

In [19]:
import folium

coords = [parking_structures[i].location.lat for i in range(len(parking_structures))], [parking_structures[i].location.lon for i in range(len(parking_structures))]
coordinates = list(zip(coords[0], coords[1]))
# Create a map centered at the first coordinate
map_center = parking_structures[0].location.lat, parking_structures[0].location.lon

mymap = folium.Map(location=map_center, zoom_start=15)

# Add markers for each coordinate
for idx, (lat, lon) in enumerate(coordinates):
    folium.Marker(location=[lat, lon], popup=f"Point {idx+1}").add_to(mymap)

folium.Marker(location=[start_point.lat, start_point.lon], popup="Start", icon=folium.Icon(color="green", icon="info-sign")).add_to(mymap)
folium.Marker(location=[end_point.lat, end_point.lon], popup="End", icon=folium.Icon(color="red", icon="info-sign")).add_to(mymap)
# Save the map to an HTML file
mymap.save("map.html")

# Display the map directly in a Jupyter Notebook (if you're using one)
mymap
