GROUP S members

| Name | Student Number | Registration Number |
| :--- | :--- | :--- |
| Kala Samuel | 2400725045 | 24/U/25045/PS |
| Chebosis Lynn Kuboi | 2400704567 | 24/U/04567/EVE |
| Mayanja Jessy Elijah | 2400700680 | 24/U/0680 |
| Njayaana Melissa Violet | 2400701171 | 24/U/1171 |
| Kuoi David Manyang | 2400721473 | 24/E/21473/PS |

FOUNDATION

In [None]:
import collections
import math
from typing import Any, DefaultDict, List, Set, Tuple
SparseVector = DefaultDict[Any, float]
Position = Tuple[int, int]

def find_alphabetically_first_word(text: str) -> str:
 # BEGIN_CODE
    words = text.split()
    return min(words) if words else ""
    # END_CODE
    
    
def euclidean_distance(loc1: Position, loc2: Position) -> float:
    # BEGIN_CODE
    return math.sqrt((loc2[0] - loc1[0])**2 + (loc2[1] - loc1[1])**2)
    # END_CODE



def mutate_sentences(sentence: str) -> List[str]:
    # BEGIN_CODE
    words = sentence.split()
    if not words:
        return []

    
    adj = collections.defaultdict(list)
    for i in range(len(words) - 1):
        adj[words[i]].append(words[i + 1])

    results = set()

    def backtrack(current):
        if len(current) == len(words):
            results.add(" ".join(current))
            return
        last_word = current[-1]
        for next_word in adj.get(last_word, []):
            current.append(next_word)
            backtrack(current)
            current.pop()

    for w in words:
        backtrack([w])

    return list(results)

    
        # END_CODE


def sparse_vector_dot_product(v1: SparseVector, v2: SparseVector) -> float:
# BEGIN_CODE
    return sum(v1[k] * v2[k] for k in v1 if k in v2)
     # END_CODE
     
     
     
def increment_sparse_vector(v1: SparseVector, scale: float, v2: SparseVector) -> None:
# BEGIN_CODE
    for k, value in v2.items():
        v1[k] += scale * value
         # END_CODE
         
         
def find_nonsingleton_words(text: str) -> Set[str]:
# BEGIN_CODE
    counts = collections.defaultdict(int)
    for word in text.split():
        counts[word] += 1
    return {word for word, count in counts.items() if count > 1}
     # END_CODE


if __name__ == '__main__':
    print("=== Testing find_alphabetically_first_word ===")
    text = "zebra apple mango"
    print(f"Input: {text} -> First word alphabetically: {find_alphabetically_first_word(text)}\n")

    print("=== Testing euclidean_distance ===")
    p1 = (0, 0)
    p2 = (3, 4)
    print(f"Distance between {p1} and {p2}: {euclidean_distance(p1, p2)}\n")

    print("=== Testing mutate_sentences ===")
    sentence = "I am what I am"
    mutated = mutate_sentences(sentence)
    print(f"Original sentence: {sentence}")
    print(f"Mutated sentences: {mutated}\n")

    print("=== Testing sparse_vector_dot_product and increment_sparse_vector ===")
    v1 = collections.defaultdict(float, {'a': 1.0, 'b': 2.0})
    v2 = collections.defaultdict(float, {'a': 3.0, 'b': 4.0, 'c': 5.0})
    print(f"Dot product: {sparse_vector_dot_product(v1, v2)}")
    increment_sparse_vector(v1, 0.5, v2)
    print(f"v1 after increment by 0.5 * v2: {dict(v1)}\n")

    print("=== Testing find_nonsingleton_words ===")
    text2 = "apple banana apple cherry banana banana"
    print(f"Non-singleton words in '{text2}': {find_nonsingleton_words(text2)}")


SENTIMENT

In [None]:
import random
import sys
from collections import Counter
from typing import Callable, Dict, List, Tuple, TypeVar

from util import *

FeatureVector = Dict[str, int]
WeightVector = Dict[str, float]
Example = Tuple[FeatureVector, int]


def extractWordFeatures(x: str) -> FeatureVector:
    """
    Extract word features for a string x. Words are delimited by
    whitespace characters only.
    @param string x:
    @return dict: feature vector representation of x.
    Example: "I am what I am" --> {'I': 2, 'am': 2, 'what': 1}
    """
    # BEGIN_YOUR_CODE 
    features = {}
    words = x.split()
    for word in words:
        features[word] = features.get(word, 0) + 1
    return features
    # END_YOUR_CODE
    
    
T = TypeVar('T')

def learnPredictor(trainExamples: List[Tuple[T, int]],
                   validationExamples: List[Tuple[T, int]],
                   featureExtractor: Callable[[T], FeatureVector],
                   numEpochs: int, eta: float) -> WeightVector:
    '''
    Given |trainExamples| and |validationExamples| (each one is a list of (x,y)
    pairs), a |featureExtractor| to apply to x, and the number of epochs to
    train |numEpochs|, the step size |eta|, return the weight vector (sparse
    feature vector) learned.

    You should implement stochastic gradient descent.

    Notes:
    - Only use the trainExamples for training!
    - You should call evaluatePredictor() on both trainExamples and
      validationExamples to see how you're doing as you learn after each epoch.
    - The predictor should output +1 if the score is precisely 0.
    '''
    weights = {}  # feature => weight

    # BEGIN_YOUR_CODE (our solution is 13 lines of code, but don't worry if you deviate from this)
    def predictor(x: T) -> int:
        phi = featureExtractor(x)
        score = dotProduct(weights, phi)
        return 1 if score >= 0 else -1


    print(f"Starting training for {numEpochs} epochs with learning rate = {eta}\n")

    for epoch in range(numEpochs):
        # Shuffle training examples
        random.shuffle(trainExamples)
        
        # Perform SGD updates
        for x, y in trainExamples:
            phi = featureExtractor(x)
            score = dotProduct(weights, phi)
            
            # Hinge loss: max(0, 1 - y * score)
            if y * score < 1:
                # Gradient update: w += eta * y * phi
                increment(weights, eta * y, phi)
        
        # Evaluate after each epoch
        trainError = evaluatePredictor(trainExamples, predictor)
        validationError = evaluatePredictor(validationExamples, predictor)
        print(f"Epoch {epoch+1}: train error = {trainError:.4f}, validation error = {validationError:.4f}")
    
    # END_YOUR_CODE
    return weights


def generateDataset(numExamples: int, weights: WeightVector) -> List[Example]:
    '''
    Return a set of examples (phi(x), y) randomly which are classified
      correctly by |weights|.
    '''
    random.seed(42)

    # Return a single example (phi(x), y).
    # phi(x) should be a dict whose keys are a subset of the keys in weights
    # and values can be anything (randomize!) with a score for the given weight vector.
    # note that there is intentionally flexibility in how you define phi.
    # y should be 1 or -1 as classified by the weight vector.
    # IMPORTANT: In the case that the score is 0, y should be set to 1.

    # Note that the weight vector can be arbitrary during testing.
    def generateExample() -> Tuple[Dict[str, int], int]:
        # BEGIN_YOUR_CODE (our solution is 3 lines of code, but don't worry if you deviate from this)
        # Select a random subset of features from weights
        available_features = list(weights.keys())
        num_features = random.randint(1, min(10, len(available_features)))
        selected_features = random.sample(available_features, num_features)
        
        # Create feature vector with random values
        phi = {}
        for feature in selected_features:
            phi[feature] = random.randint(1, 5)  # Random positive integer values
        
        # Calculate score and determine label
        score = dotProduct(weights, phi)
        y = 1 if score >= 0 else -1
        
        # END_YOUR_CODE
        return phi, y

    return [generateExample() for _ in range(numExamples)]


def extractCharacterFeatures(n: int) -> Callable[[str], FeatureVector]:
    '''
    Return a function that takes a string |x| and returns a sparse feature
    vector consisting of all n-grams of |x| without spaces mapped to their n-gram counts.
    EXAMPLE: (n = 3) "I like tacos" --> {'Ili': 1, 'lik': 1, 'ike': 1, ...
    You may assume that 1 <= n <= len(x).
    '''
    def extract(x: str) -> Dict[str, int]:
        # BEGIN_YOUR_CODE (our solution is 6 lines of code, but don't worry if you deviate from this)
        # Remove all whitespace characters
        text = ''.join(x.split())
        
        features = {}
        # Extract all n-grams
        for i in range(len(text) - n + 1):
            ngram = text[i:i+n]
            features[ngram] = features.get(ngram, 0) + 1
        
        return features
        # END_YOUR_CODE

    return extract


def testValuesOfN(n: int):
    '''
    Use this code to test different values of n for extractCharacterFeatures
    This code is exclusively for testing.
    Your full written solution for this problem must be in sentiment.pdf.
    '''
    trainExamples = readExamples('polarity.train')
    validationExamples = readExamples('polarity.dev')
    featureExtractor = extractCharacterFeatures(n)
    weights = learnPredictor(trainExamples,
                             validationExamples,
                             featureExtractor,
                             numEpochs=20,
                             eta=0.01)
    outputWeights(weights, 'weights')
    outputErrorAnalysis(validationExamples, featureExtractor, weights,
                        'error-analysis')  # Use this to debug
    trainError = evaluatePredictor(
        trainExamples, lambda x:
        (1 if dotProduct(featureExtractor(x), weights) >= 0 else -1))
    validationError = evaluatePredictor(
        validationExamples, lambda x:
        (1 if dotProduct(featureExtractor(x), weights) >= 0 else -1))
    print(("Official: train error = %s, validation error = %s" %
           (trainError, validationError)))




def kmeans(examples: List[Dict[str, float]], K: int,
           maxEpochs: int) -> Tuple[List, List, float]:
    '''
    Perform K-means clustering on |examples|, where each example is a sparse feature vector.

    examples: list of examples, each example is a string-to-float dict representing a sparse vector.
    K: number of desired clusters. Assume that 0 < K <= |examples|.
    maxEpochs: maximum number of epochs to run (you should terminate early if the algorithm converges).
    Return: (length K list of cluster centroids,
            list of assignments (i.e. if examples[i] belongs to centers[j], then assignments[i] = j),
            final reconstruction loss)
    '''
    # BEGIN_YOUR_CODE (our solution is 28 lines of code, but don't worry if you deviate from this)
    # Initialize centroids randomly
    centroids = random.sample(examples, K)
    
    # Precompute ||x||^2 for all examples for efficient distance computation
    example_norms = []
    for example in examples:
        norm_sq = 0.0
        for value in example.values():
            norm_sq += value * value
        example_norms.append(norm_sq)
    
    assignments = [-1] * len(examples)
    prev_assignments = None
    
    for epoch in range(maxEpochs):
        # Precompute ||c||^2 and c for all centroids
        centroid_norms = []
        for centroid in centroids:
            norm_sq = 0.0
            for value in centroid.values():
                norm_sq += value * value
            centroid_norms.append(norm_sq)
        
        # Assignment step: assign each example to closest centroid
        assignments_changed = False
        reconstruction_loss = 0.0
        
        for i, example in enumerate(examples):
            min_distance = float('inf')
            best_cluster = -1
            
            # Find closest centroid using: ||x - c||^2 = ||x||^2 + ||c||^2 - 2*xÂ·c
            x_norm = example_norms[i]
            
            for j in range(K):
                # Compute squared Euclidean distance efficiently
                # distance = ||x||^2 + ||c_j||^2 - 2 * dot(x, c_j)
                dot_product = dotProduct(example, centroids[j])
                distance = x_norm + centroid_norms[j] - 2 * dot_product
                
                if distance < min_distance:
                    min_distance = distance
                    best_cluster = j
            
            if assignments[i] != best_cluster:
                assignments_changed = True
                assignments[i] = best_cluster
            
            reconstruction_loss += min_distance
        print(f"Epoch {epoch+1}/{maxEpochs}: Reconstruction loss = {reconstruction_loss:.4f}")
        # Check for convergence
        if not assignments_changed:
            break
            
        # Update step: recompute centroids
        # Reset cluster accumulators
        cluster_sums = [{} for _ in range(K)]
        cluster_counts = [0] * K
        
        # Accumulate examples for each cluster
        for i, example in enumerate(examples):
            cluster_idx = assignments[i]
            cluster_counts[cluster_idx] += 1
            
            for feature, value in example.items():
                cluster_sums[cluster_idx][feature] = cluster_sums[cluster_idx].get(feature, 0.0) + value
        
        # Compute new centroids by averaging
        for j in range(K):
            if cluster_counts[j] > 0:
                new_centroid = {}
                for feature, total in cluster_sums[j].items():
                    new_centroid[feature] = total / cluster_counts[j]
                centroids[j] = new_centroid
            # If cluster becomes empty, reinitialize with a random example
            else:
                centroids[j] = random.choice(examples)
    print("\nK-means complete.")
    print(f"Final reconstruction loss = {reconstruction_loss:.4f}")
    # END_YOUR_CODE
    return centroids, assignments, reconstruction_loss



if __name__ == '__main__':
    print("Running a quick smoke test for implemented functions...")

    # Word features
    s = "not good good"
    print("Word features for:\n", s, "->", extractWordFeatures(s))

    # Define a sample weight vector for artificial data generation
    weights = {'a': 1, 'b': -1, 'c': 2, 'd': 1, 'e': -1, 'f': 2, 'g': -1, 'h': 1}

    # Generate dataset
    dataset = generateDataset(numExamples=200, weights=weights)

    # Split dataset into train, validation, and gen sets
    n = len(dataset)
    train = dataset[:int(0.7 * n)]
    val = dataset[int(0.7 * n):int(0.85 * n)]
    gen = dataset[int(0.85 * n):]

    print("Generated dataset sizes -> train:", len(train), "val:", len(val), "gen:", len(gen))

    # Train predictor
    w = learnPredictor(train, val, lambda x: x, numEpochs=5, eta=0.1)
    print("Learned weight vector (top 10 entries):", dict(list(w.items())[:10]))

    # Character features
    f3 = extractCharacterFeatures(3)
    print("char3 features of 'hello world' ->", f3('hello world'))

    # K-means quick example
    pts = [{'0': 0.0}, {'0': 4.0}, {'0': 6.0}, {'0': 11.0}]
    centers, assigns, loss = kmeans(pts, K=2, maxEpochs=10)
    print("kmeans centers:", centers)
    print("assignments:", assigns)
    print("final reconstruction loss:", loss)



ROUTE

In [None]:
from typing import List, Tuple

from mapUtil import (
    CityMap,
    computeDistance,
    createStanfordMap,
    locationFromTag,
    makeTag,
)
from util import Heuristic, SearchProblem, State, UniformCostSearch


# *IMPORTANT* :: A key part of this assignment is figuring out how to model states
# effectively. We've defined a class `State` to help you think through this, with a
# field called `memory`.
#
# As you implement the different types of search problems below, think about what
# `memory` should contain to enable efficient search!
#   > Please read the docstring for `State` in `util.py` for more details and code.
#   > Please read the docstrings for in `mapUtil.py`, especially for the CityMap class

########################################################################################
# Problem 2a: Modeling the Shortest Path Problem.


class ShortestPathProblem(SearchProblem):
    """
    Defines a search problem that corresponds to finding the shortest path
    from `startLocation` to any location with the specified `endTag`.
    """

    def __init__(self, startLocation: str, endTag: str, cityMap: CityMap):
        self.startLocation = startLocation
        self.endTag = endTag
        self.cityMap = cityMap

    def startState(self) -> State:
        # BEGIN_YOUR_CODE (our solution is 1 line of code, but don't worry if you deviate from this)
        return State(self.startLocation, memory=None)
        # END_YOUR_CODE

    def isEnd(self, state: State) -> bool:
        # BEGIN_YOUR_CODE (our solution is 1 line of code, but don't worry if you deviate from this)
        return self.endTag in self.cityMap.tags[state.location]
        # END_YOUR_CODE

    def actionSuccessorsAndCosts(self, state: State) -> List[Tuple[str, State, float]]:
        """
        Note we want to return a list of *3-tuples* of the form:
            (actionToReachSuccessor: str, successorState: State, cost: float)
        Our action space is the set of all named locations, where a named location 
        string represents a transition from the current location to that new location.
        """
        # BEGIN_YOUR_CODE (our solution is 7 lines of code, but don't worry if you deviate from this)
        results = []
        currentLocation = state.location
        
        # Get all neighbors of current location
        for neighborLocation in self.cityMap.distances[currentLocation]:
            cost = self.cityMap.distances[currentLocation][neighborLocation]
            successorState = State(neighborLocation, memory=None)
            results.append((neighborLocation, successorState, cost))
        
        return results
        # END_YOUR_CODE


########################################################################################
# Problem 2b: Custom -- Plan a Route through Stanford


def getStanfordShortestPathProblem() -> ShortestPathProblem:
    """
    Create your own search problem using the map of Stanford, specifying your own
    `startLocation`/`endTag`. 

    Run `python mapUtil.py > readableStanfordMap.txt` to dump a file with a list of
    locations and associated tags; you might find it useful to search for the following
    tag keys (amongst others):
        - `landmark=` - Hand-defined landmarks (from `data/stanford-landmarks.json`)
        - `amenity=`  - Various amenity types (e.g., "parking_entrance", "food")
        - `parking=`  - Assorted parking options (e.g., "underground")
    """
    cityMap = createStanfordMap()

    # BEGIN_YOUR_CODE (our solution is 2 lines of code, but don't worry if you deviate from this)
    startLocation = locationFromTag("landmark=gates", cityMap)
    endTag = "amenity=food"
    # END_YOUR_CODE
    return ShortestPathProblem(startLocation, endTag, cityMap)


########################################################################################
# Problem 3a: Modeling the Waypoints Shortest Path Problem.


class WaypointsShortestPathProblem(SearchProblem):
    """
    Defines a search problem that corresponds to finding the shortest path from
    `startLocation` to any location with the specified `endTag` such that the path also
    traverses locations that cover the set of tags in `waypointTags`. Note that tags 
    from the `startLocation` count towards covering the set of tags.

    Hint: naively, your `memory` representation could be a list of all locations visited.
    However, that would be too large of a state space to search over! Think 
    carefully about what `memory` should represent.
    """
    def __init__(
        self, startLocation: str, waypointTags: List[str], endTag: str, cityMap: CityMap
    ):
        self.startLocation = startLocation
        self.endTag = endTag
        self.cityMap = cityMap

        # We want waypointTags to be consistent/canonical (sorted) and hashable (tuple)
        self.waypointTags = tuple(sorted(waypointTags))

    def startState(self) -> State:
        # BEGIN_YOUR_CODE (our solution is 6 lines of code, but don't worry if you deviate from this)
        collectedTags = set()
        for tag in self.cityMap.tags[self.startLocation]:
            if tag in self.waypointTags:
                collectedTags.add(tag)
        
        return State(self.startLocation, memory=frozenset(collectedTags))
        # END_YOUR_CODE

    def isEnd(self, state: State) -> bool:
        # BEGIN_YOUR_CODE (our solution is 5 lines of code, but don't worry if you deviate from this)
        hasEndTag = self.endTag in self.cityMap.tags[state.location]
        hasAllWaypoints = len(state.memory) == len(self.waypointTags)
        
        return hasEndTag and hasAllWaypoints
        # END_YOUR_CODE

    def actionSuccessorsAndCosts(self, state: State) -> List[Tuple[str, State, float]]:
        # BEGIN_YOUR_CODE (our solution is 17 lines of code, but don't worry if you deviate from this)
        results = []
        currentLocation = state.location
        currentCollectedTags = set(state.memory)
        
        # Get all neighbors
        for neighborLocation in self.cityMap.distances[currentLocation]:
            cost = self.cityMap.distances[currentLocation][neighborLocation]
            
            # Check if neighbor has any waypoint tags we haven't collected
            newCollectedTags = currentCollectedTags.copy()
            for tag in self.cityMap.tags[neighborLocation]:
                if tag in self.waypointTags:
                    newCollectedTags.add(tag)
            
            successorState = State(neighborLocation, memory=frozenset(newCollectedTags))
            results.append((neighborLocation, successorState, cost))
        
        return results

        # END_YOUR_CODE


########################################################################################
# Problem 3c: Custom -- Plan a Route with Unordered Waypoints through Stanford


def getStanfordWaypointsShortestPathProblem() -> WaypointsShortestPathProblem:
    """
    Create your own search problem with waypoints using the map of Stanford, 
    specifying your own `startLocation`/`waypointTags`/`endTag`.

    Similar to Problem 2b, use `readableStanfordMap.txt` to identify potential
    locations and tags.
    """
    cityMap = createStanfordMap()
    # BEGIN_YOUR_CODE (our solution is 4 lines of code, but don't worry if you deviate from this)
    # Use locationFromTag to get valid starting location
    startLocation = locationFromTag("landmark=gates", cityMap)
    waypointTags = ["amenity=food", "amenity=parking_entrance"]
    endTag = "landmark=bookstore"
    # END_YOUR_CODE
    return WaypointsShortestPathProblem(startLocation, waypointTags, endTag, cityMap)


########################################################################################
# Problem 4a: A* to UCS reduction

# Turn an existing SearchProblem (`problem`) you are trying to solve with a
# Heuristic (`heuristic`) into a new SearchProblem (`newSearchProblem`), such
# that running uniform cost search on `newSearchProblem` is equivalent to
# running A* on `problem` subject to `heuristic`.
#
# This process of translating a model of a problem + extra constraints into a
# new instance of the same problem is called a reduction; it's a powerful tool
# for writing down "new" models in a language we're already familiar with.
# See util.py for the class definitions and methods of Heuristic and SearchProblem.


def aStarReduction(problem: SearchProblem, heuristic: Heuristic) -> SearchProblem:
    class NewSearchProblem(SearchProblem):
        def startState(self) -> State:
            # BEGIN_YOUR_CODE (our solution is 1 line of code, but don't worry if you deviate from this)
             return problem.startState()
            # END_YOUR_CODE

        def isEnd(self, state: State) -> bool:
            # BEGIN_YOUR_CODE (our solution is 1 line of code, but don't worry if you deviate from this)
            return problem.isEnd(state)
            # END_YOUR_CODE

        def actionSuccessorsAndCosts(self, state: State) -> List[Tuple[str, State, float]]:
            # BEGIN_YOUR_CODE (our solution is 8 lines of code, but don't worry if you deviate from this)
            results = []
            
            for action, successor, cost in problem.actionSuccessorsAndCosts(state):
                # Modify cost: newCost = cost + h(successor) - h(state)
                hState = heuristic.evaluate(state)
                hSuccessor = heuristic.evaluate(successor)
                newCost = cost + hSuccessor - hState
                results.append((action, successor, newCost))
            
            return results
            # END_YOUR_CODE

    return NewSearchProblem()


########################################################################################
# Problem 4b: "straight-line" heuristic for A*


class StraightLineHeuristic(Heuristic):
    """
    Estimate the cost between locations as the straight-line distance.
        > Hint: you might consider using `computeDistance` defined in `mapUtil.py`
    """
    def __init__(self, endTag: str, cityMap: CityMap):
        self.endTag = endTag
        self.cityMap = cityMap

        # Precompute
        # BEGIN_YOUR_CODE (our solution is 5 lines of code, but don't worry if you deviate from this)
        self.endLocations = []
        for location in cityMap.tags:
            if endTag in cityMap.tags[location]:
                self.endLocations.append(location)

        # END_YOUR_CODE

    def evaluate(self, state: State) -> float:
        # BEGIN_YOUR_CODE (our solution is 6 lines of code, but don't worry if you deviate from this)
        if not self.endLocations:
            return 0.0
        
        minDistance = float('inf')
        currentGeo = self.cityMap.geoLocations[state.location]
        for endLocation in self.endLocations:
            endGeo = self.cityMap.geoLocations[endLocation]
            distance = computeDistance(currentGeo, endGeo)
            minDistance = min(minDistance, distance)
        
        return minDistance
        # END_YOUR_CODE


########################################################################################
# Problem 4c: "no waypoints" heuristic for A*


class NoWaypointsHeuristic(Heuristic):
    """
    Returns the minimum distance from `startLocation` to any location with `endTag`,
    ignoring all waypoints.
    """
    def __init__(self, endTag: str, cityMap: CityMap):
        """
        Precompute cost of shortest path from each location to a location with the desired endTag
        """
        # Define a reversed shortest path problem from a special END state
        # (which connects via 0 cost to all end locations) to `startLocation`.
        # Solving this reversed shortest path problem will give us our heuristic,
        # as it estimates the minimal cost of reaching an end state from each state
        class ReverseShortestPathProblem(SearchProblem):
            def startState(self) -> State:
                """
                Return special "END" state
                """
                # BEGIN_YOUR_CODE (our solution is 1 line of code, but don't worry if you deviate from this)
                return State("END", memory=None)
                # END_YOUR_CODE

            def isEnd(self, state: State) -> bool:
                """
                Return False for each state.
                Because there is *not* a valid end state (`isEnd` always returns False), 
                UCS will exhaustively compute costs to *all* other states.
                """
                # BEGIN_YOUR_CODE (our solution is 1 line of code, but don't worry if you deviate from this)
                return False
                # END_YOUR_CODE

            def actionSuccessorsAndCosts(
                self, state: State
            ) -> List[Tuple[str, State, float]]:
                # If current location is the special "END" state, 
                # return all the locations with the desired endTag and cost 0 
                # (i.e, we connect the special location "END" with cost 0 to all locations with endTag)
                # Else, return all the successors of current location and their corresponding distances according to the cityMap
                # BEGIN_YOUR_CODE (our solution is 14 lines of code, but don't worry if you deviate from this)
                results = []
                
                if state.location == "END":
                    # Connect END to all locations with endTag with cost 0
                    for location in cityMap.tags:
                        if endTag in cityMap.tags[location]:
                            successorState = State(location, memory=None)
                            results.append((location, successorState, 0.0))
                else:
                    # Return all neighbors with their distances
                    for neighborLocation in cityMap.distances[state.location]:
                        cost = cityMap.distances[state.location][neighborLocation]
                        successorState = State(neighborLocation, memory=None)
                        results.append((neighborLocation, successorState, cost))
                
                return results
                # END_YOUR_CODE

        # Call UCS.solve on our `ReverseShortestPathProblem` instance. Because there is
        # *not* a valid end state (`isEnd` always returns False), will exhaustively
        # compute costs to *all* other states.

        # BEGIN_YOUR_CODE (our solution is 2 lines of code, but don't worry if you deviate from this)
        ucs = UniformCostSearch(verbose=0)
        ucs.solve(ReverseShortestPathProblem())
        # END_YOUR_CODE

        # Now that we've exhaustively computed costs from any valid "end" location
        # (any location with `endTag`), we can retrieve `ucs.pastCosts`; this stores
        # the minimum cost path to each state in our state space.
        #   > Note that we're making a critical assumption here: costs are symmetric!

        # BEGIN_YOUR_CODE (our solution is 1 line of code, but don't worry if you deviate from this)
        self.costs = ucs.pastCosts
        # END_YOUR_CODE

    def evaluate(self, state: State) -> float:
        # BEGIN_YOUR_CODE (our solution is 1 line of code, but don't worry if you deviate from this)
        return self.costs.get(state.location, 0.0)
        # END_YOUR_CODE


MOUNTAIN CAR

In [None]:
import math, random
from collections import defaultdict
from typing import List, Callable, Tuple, Dict, Any, Optional, Iterable, Set
import gymnasium as gym
import numpy as np

import util
from util import ContinuousGymMDP, StateT, ActionT
from custom_mountain_car import CustomMountainCarEnv

############################################################
# Problem 3a
# Implementing Value Iteration on Number Line (from Problem 1)
def valueIteration(succAndRewardProb: Dict[Tuple[StateT, ActionT], List[Tuple[StateT, float, float]]], discount: float, epsilon: float = 0.001):
    '''
    Given transition probabilities and rewards, computes and returns V and
    the optimal policy pi for each state.
    - succAndRewardProb: Dictionary mapping tuples of (state, action) to a list of (nextState, prob, reward) Tuples.
    - Returns: Dictionary mapping each state to an action.
    '''
    # Define a mapping from states to Set[Actions] so we can determine all the actions that can be taken from s.
    # You may find this useful in your approach.
    stateActions = defaultdict(set)
    for state, action in succAndRewardProb.keys():
        stateActions[state].add(action)

    def computeQ(V: Dict[StateT, float], state: StateT, action: ActionT) -> float:
        # Return Q(state, action) based on V(state)
        # BEGIN_YOUR_CODE (our solution is 2 lines of code, but don't worry if you deviate from this)
        # raise Exception("Not implemented yet")
        q=0.0
        for (nextState, prob, reward) in succAndRewardProb.get((state, action), []):
            q = q + prob * (reward + discount * V[nextState])
        return q
        # END_YOUR_CODE

    def computePolicy(V: Dict[StateT, float]) -> Dict[StateT, ActionT]:
        # Return the policy given V.
        # Remember the policy for a state is the action that gives the greatest Q-value.
        # IMPORTANT: if multiple actions give the same Q-value, choose the largest action number for the policy. 
        # HINT: We only compute policies for states in stateActions.
        # BEGIN_YOUR_CODE (our solution is 4 lines of code, but don't worry if you deviate from this)
        # raise Exception("Not implemented yet")
        policy = {}
        for state in stateActions.keys():
            best_action = None
            best_q = -float('inf')
            for action in stateActions[state]:
                q= computeQ(V, state, action)
                if q > best_q or (abs(q- best_q) < 1e-12 and (best_action is None or action > best_action)):
                    best_q = q
                    best_action = action
            if best_action is not None:
                policy[state] = best_action
        return policy
        # END_YOUR_CODE

    print('Running valueIteration...')
    V = defaultdict(float) # This will return 0 for states not seen (handles terminal states)
    numIters = 0
    while True:
        newV = defaultdict(float) # This will return 0 for states not seen (handles terminal states)
        # update V values using the computeQ function above.
        # repeat until the V values for all states do not change by more than epsilon.
        # BEGIN_YOUR_CODE (our solution is 4 lines of code, but don't worry if you deviate from this)
        # raise Exception("Not implemented yet")
        max_diff = 0.0
        for state in stateActions.keys():
            newV[state] = max(computeQ(V, state, action) for action in stateActions[state])
            max_diff = max(max_diff, abs(newV[state] - V[state]))
        if max_diff <= epsilon: break
        # END_YOUR_CODE
        V = newV
        numIters += 1
    V_opt = V
    print(("valueIteration: %d iterations" % numIters))
    return computePolicy(V_opt)

############################################################
# Problem 3b
# Model-Based Monte Carlo

# Runs value iteration algorithm on the number line MDP
# and prints out optimal policy for each state.
def run_VI_over_numberLine(mdp: util.NumberLineMDP):
    succAndRewardProb = {
        (-mdp.n + 1, 1): [(-mdp.n + 2, 0.2, mdp.penalty), (-mdp.n, 0.8, mdp.leftReward)],
        (-mdp.n + 1, 2): [(-mdp.n + 2, 0.3, mdp.penalty), (-mdp.n, 0.7, mdp.leftReward)],
        (mdp.n - 1, 1): [(mdp.n - 2, 0.8, mdp.penalty), (mdp.n, 0.2, mdp.rightReward)],
        (mdp.n - 1, 2): [(mdp.n - 2, 0.7, mdp.penalty), (mdp.n, 0.3, mdp.rightReward)]
    }

    for s in range(-mdp.n + 2, mdp.n - 1):
        succAndRewardProb[(s, 1)] = [(s+1, 0.2, mdp.penalty), (s - 1, 0.8, mdp.penalty)]
        succAndRewardProb[(s, 2)] = [(s+1, 0.3, mdp.penalty), (s - 1, 0.7, mdp.penalty)]

    pi = valueIteration(succAndRewardProb, mdp.discount)
    return pi


class ModelBasedMonteCarlo(util.RLAlgorithm):
    def __init__(self, actions: List[ActionT], discount: float, calcValIterEvery: int = 10000,
                 explorationProb: float = 0.2,) -> None:
        self.actions = actions
        self.discount = discount
        self.calcValIterEvery = calcValIterEvery
        self.explorationProb = explorationProb
        self.numIters = 0

        # (state, action) -> {nextState -> ct} for all nextState
        self.tCounts = defaultdict(lambda: defaultdict(int))
        # (state, action) -> {nextState -> totalReward} for all nextState
        self.rTotal = defaultdict(lambda: defaultdict(float))

        self.pi = {} # Optimal policy for each state. state -> action

    # This algorithm will produce an action given a state.
    # Here we use the epsilon-greedy algorithm: with probability |explorationProb|, take a random action.
    # Should return random action if the given state is not in self.pi.
    # The input boolean |explore| indicates whether the RL algorithm is in train or test time. If it is false (test), we
    # should always follow the policy if available.
    # HINT: Use random.random() (not np.random()) to sample from the uniform distribution [0, 1]
    def getAction(self, state: StateT, explore: bool = True) -> ActionT:
        if explore:
            self.numIters += 1
        explorationProb = self.explorationProb
        if self.numIters < 2e4: # Always explore
            explorationProb = 1.0
        elif self.numIters > 1e6: # Lower the exploration probability by a logarithmic factor.
            explorationProb = explorationProb / math.log(self.numIters - 100000 + 1)
        # BEGIN_YOUR_CODE (our solution is 5 lines of code, but don't worry if you deviate from this)
        # raise Exception("Not implemented yet")
        if explore and random.random() < explorationProb:
            return random.choice(self.actions)
        return self.pi.get(state, random.choice(self.actions))
        # END_YOUR_CODE

    # We will call this function with (s, a, r, s'), which is used to update tCounts and rTotal.
    # For every self.calcValIterEvery steps, runs value iteration after estimating succAndRewardProb.
    def incorporateFeedback(self, state: StateT, action: ActionT, reward: int, nextState: StateT, terminal: bool):

        self.tCounts[(state, action)][nextState] += 1
        self.rTotal[(state, action)][nextState] += reward

        if self.numIters % self.calcValIterEvery == 0:
            # Estimate succAndRewardProb based on self.tCounts and self.rTotal.
            # Hint 1: prob(s, a, s') = (counts of transition (s,a) -> s') / (total transtions from (s,a))
            # Hint 2: Reward(s, a, s') = (total reward of (s,a) -> s') / (counts of transition (s,a) -> s')
            # Then run valueIteration and update self.pi.
            succAndRewardProb = defaultdict(list)
            # BEGIN_YOUR_CODE (our solution is 7 lines of code, but don't worry if you deviate from this)
            # raise Exception("Not implemented yet")
            for (s_a), nexts in list(self.tCounts.items()):
                total = sum(nexts.values())
                if total == 0:
                    continue
                s,a = s_a
                for s_prime, ct in nexts.items():
                    prob = ct /total
                    avg_reward = self.rTotal[s_a][s_prime] / ct if ct >0 else 0.0
                    succAndRewardProb[(s,a)].append((s_prime, prob, avg_reward))
            if  len(succAndRewardProb) > 0:
                self.pi = valueIteration(succAndRewardProb, self.discount)
            else:
                self.pi ={}
            # END_YOUR_CODE

############################################################
# Problem 4a
# Performs Tabular Q-learning. Read util.RLAlgorithm for more information.
class TabularQLearning(util.RLAlgorithm):
    def __init__(self, actions: List[ActionT], discount: float,
                 explorationProb: float = 0.2, initialQ: float = 0):
        '''
        - actions: the list of valid actions
        - discount: a number between 0 and 1, which determines the discount factor
        - explorationProb: the epsilon value indicating how frequently the policy returns a random action
        - intialQ: the value for intializing Q values.
        '''
        self.actions = actions
        self.discount = discount
        self.explorationProb = explorationProb
        self.Q = defaultdict(lambda: initialQ)
        self.numIters = 0

    # This algorithm will produce an action given a state.
    # Here we use the epsilon-greedy algorithm: with probability |explorationProb|, take a random action.
    # The input boolean |explore| indicates whether the RL algorithm is in train or test time. If it is false (test), we
    # should always choose the maximum Q-value action.
    # HINT 1: You can access Q-value with self.Q[state, action]
    # HINT 2: Use random.random() to sample from the uniform distribution [0, 1]
    def getAction(self, state: StateT, explore: bool = True) -> ActionT:
        if explore:
            self.numIters += 1
        explorationProb = self.explorationProb
        if self.numIters < 2e4: # explore
            explorationProb = 1.0
        elif self.numIters > 1e5: # Lower the exploration probability by a logarithmic factor.
            explorationProb = explorationProb / math.log(self.numIters - 100000 + 1)
        # BEGIN_YOUR_CODE (our solution is 5 lines of code, but don't worry if you deviate from this)
        # raise Exception("Not implemented yet")
        if explore and random.random() < explorationProb:
            return random.choice(self.actions)
        best_action =None
        best_q = -float('inf')
        for a in self.actions:
            q=self.Q[(state, a)]
            if q > best_q or (abs(q-best_q) <  1e-12 and (best_action is None or a > best_action)):
                best_q = q
                best_action = a
        return best_action if best_action is not None  else random.choice(self.actions)
    
        # END_YOUR_CODE

    # Call this function to get the step size to update the weights.
    def getStepSize(self) -> float:
        return 0.1

    # We will call this function with (s, a, r, s'), which you should use to update |Q|.
    # Note that if s' is a terminal state, then terminal will be True.  Remember to check for this.
    # You should update the Q values using self.getStepSize() 
    # HINT 1: The target V for the current state is a combination of the immediate reward
    # and the discounted future value.
    # HINT 2: V for terminal states is 0
    def incorporateFeedback(self, state: StateT, action: ActionT, reward: float, nextState: StateT, terminal: bool) -> None:
        # BEGIN_YOUR_CODE (our solution is 8 lines of code, but don't worry if you deviate from this)
        # raise Exception("Not implemented yet")
        alpha = self.getStepSize()
        cur = self.Q[(state, action)]
        if terminal:
            target =reward
        else:
            best_q_next = -float('inf')
            for a in self.actions:
                qn = self.Q[(nextState, a)]
                if qn > best_q_next:
                    best_q_next=qn
            if best_q_next == -float('inf'):
                best_q_next=0.0
            target=reward + self.discount * best_q_next
        self.Q[(state, action)] = cur + alpha * (target - cur)
        # END_YOUR_CODE

############################################################
# Problem 4b: Fourier feature extractor

def fourierFeatureExtractor(
        state: StateT,
        maxCoeff: int = 5,
        scale: Optional[Iterable] = None
    ) -> np.ndarray:
    '''
    For state (x, y, z), maxCoeff 2, and scale [2, 1, 1], this should output (in any order):
    [1, cos(pi * 2x), cos(pi * y), cos(pi * z),
     cos(pi * (2x + y)), cos(pi * (2x + z)), cos(pi * (y + z)),
     cos(pi * (4x)), cos(pi * (2y)), cos(pi * (2z)),
     cos(pi*(4x + y)), cos(pi * (4x + z)), ..., cos(pi * (4x + 2y + 2z))]
    '''
    if scale is None:
        scale = np.ones_like(state)
    features = None

    # Below, implement the fourier feature extractor as similar to the doc string provided.
    # The return shape should be 1 dimensional ((maxCoeff+1)^(len(state)),).
    #
    # HINT: refer to util.polynomialFeatureExtractor as a guide for
    # doing efficient arithmetic broadcasting in numpy.

    # BEGIN_YOUR_CODE (our solution is 7 lines of code, but don't worry if you deviate from this)
    # raise Exception("Not implemented yet")
    s = np.array(state, dtype=float)
    scale_arr = np.array(scale, dtype=float)
    if scale_arr.shape != s.shape:
        scale_arr = np.ones_like(s) * scale_arr
    dims = s.shape[0]
    ranges = [np.arange(maxCoeff + 1, dtype=int)] * dims
    mash = np.stack(np.meshgrid(*ranges, indexing='ij'), axis=-1)
    coeffs = mash.reshape(-1, dims).astype(float)
    dot = np.dot(coeffs, (s * scale_arr))
    features = np.cos(np.pi * dot)
    # END_YOUR_CODE

    return features

############################################################
# Problem 4c: Q-learning with Function Approximation
# Performs Function Approximation Q-learning. Read util.RLAlgorithm for more information.
class FunctionApproxQLearning(util.RLAlgorithm):
    def __init__(self, featureDim: int, featureExtractor: Callable, actions: List[int],
                 discount: float, explorationProb=0.2):
        '''
        - featureDim: the dimensionality of the output of the feature extractor
        - featureExtractor: a function that takes a state and returns a numpy array representing the feature.
        - actions: the list of valid actions
        - discount: a number between 0 and 1, which determines the discount factor
        - explorationProb: the epsilon value indicating how frequently the policy returns a random action
        '''
        self.featureDim = featureDim
        self.featureExtractor = featureExtractor
        self.actions = actions
        self.discount = discount
        self.explorationProb = explorationProb
        self.W = np.random.standard_normal(size=(featureDim, len(actions)))
        self.numIters = 0

    def getQ(self, state: np.ndarray, action: int) -> float:
        # BEGIN_YOUR_CODE (our solution is 3 lines of code, but don't worry if you deviate from this)
        # raise Exception("Not implemented yet")
        phi = np.array(self.featureExtractor(state)).reshape(-1)
        action_index = self.actions.index(action)
        return float(np.dot(phi, self.W[:, action_index]))
        # END_YOUR_CODE

    # This algorithm will produce an action given a state.
    # Here we use the epsilon-greedy algorithm: with probability |explorationProb|, take a random action.
    # The input boolean |explore| indicates whether the RL algorithm is in train or test time. If it is false (test), we
    # should always choose the maximum Q-value action.
    # HINT: This function should be the same as your implementation for 4a.
    def getAction(self, state: np.ndarray, explore: bool = True) -> int:
        if explore:
            self.numIters += 1
        explorationProb = self.explorationProb
        if self.numIters < 2e4: # Always explore
            explorationProb = 1.0
        elif self.numIters > 1e5: # Lower the exploration probability by a logarithmic factor.
            explorationProb = explorationProb / math.log(self.numIters - 100000 + 1)

        # BEGIN_YOUR_CODE (our solution is 5 lines of code, but don't worry if you deviate from this)
        # raise Exception("Not implemented yet")
        if explore and random.random() < explorationProb:
            return random.choice(self.actions)
        best_action = None
        best_q = -float('inf')
        for a in self.actions:
            q = self.getQ(state, a)
            if q > best_q or (abs(q - best_q) < 1e-12 and (best_action is None or a > best_action)):
                best_q = q
                best_action = a
        return best_action if best_action is not None else random.choice(self.actions)
        # END_YOUR_CODE

    # Call this function to get the step size to update the weights.
    def getStepSize(self) -> float:
        return 0.005 * (0.99)**(self.numIters / 500)

    # We will call this function with (s, a, r, s'), which you should use to update |weights|.
    # Note that if s' is a terminal state, then terminal will be True.  Remember to check for this.
    # You should update W using self.getStepSize()
    # HINT 1: this part will look similar to 4a, but you are updating self.W
    # HINT 2: review the function approximation module for the update rule
    def incorporateFeedback(self, state: np.ndarray, action: int, reward: float, nextState: np.ndarray, terminal: bool) -> None:
        # BEGIN_YOUR_CODE (our solution is 8 lines of code, but don't worry if you deviate from this)
        # raise Exception("Not implemented yet")
        alpha = self.getStepSize()
        phi = np.array(self.featureExtractor(state)).reshape(-1)
        action_index = self.actions.index(action)
        q_cur = self.getQ(state, action)
        if terminal:
            target = reward
        else:
            best_q_next = -float('inf')
            for a in self.actions:
                qn = self.getQ(nextState, a)
                if qn > best_q_next:
                    best_q_next = qn
            if best_q_next == -float('inf'):
                best_q_next = 0.0
            target = reward + self.discount * best_q_next
        td_error = target - q_cur
        self.W[:, action_index] = self.W[:, action_index] + alpha * td_error * phi
        # END_YOUR_CODE

############################################################
# Problem 5c: Constrained Q-learning

class ConstrainedQLearning(FunctionApproxQLearning):
    def __init__(self, featureDim: int, featureExtractor: Callable, actions: List[int],
                 discount: float, force: float, gravity: float,
                 max_speed: Optional[float] = None,
                 explorationProb=0.2):
        super().__init__(featureDim, featureExtractor, actions,
                         discount, explorationProb)
        self.force = force
        self.gravity = gravity
        self.max_speed = max_speed

    # This algorithm will produce an action given a state.
    # Here we use the epsilon-greedy algorithm: with probability |explorationProb|, take a random action.
    # The input boolean |explore| indicates whether the RL algorithm is in train or test time. If it is false (test), we
    # should always choose the maximum Q-value action that is valid.
    def getAction(self, state: np.ndarray, explore: bool = True) -> int:
        if explore:
            self.numIters += 1
        explorationProb = self.explorationProb
        if self.numIters < 2e4: # Always explore
            explorationProb = 1.0
        elif self.numIters > 1e5: # Lower the exploration probability by a logarithmic factor.
            explorationProb = explorationProb / math.log(self.numIters - 100000 + 1)

        # BEGIN_YOUR_CODE (our solution is 15 lines of code, but don't worry if you deviate from this)
        # raise Exception("Not implemented yet")
        actions_valid = []
        try:
            vel=float(state[1])
        except Exception:
            vel= None
        if self.max_speed is None or vel is None:
            actions_valid = list(self.actions)
        else:
            for a in self.actions:
                if a==0 and vel <= -self.max_speed:
                    continue
                if a==2 and vel >= self.max_speed:
                    continue
                actions_valid.append(a)
            if len(actions_valid) ==0:
                actions_valid = list(self.actions)
        if explore and random.random() < explorationProb:
            return random.choice(actions_valid)
        best_action = None
        best_q = -float('inf')
        for a in actions_valid:
            q = self.getQ(state, a)
            if q > best_q or (abs(q - best_q) < 1e-12 and (best_action is None or a > best_action)):
                best_q = q
                best_action = a
        return best_action if best_action is not None else random.choice(actions_valid)
        # END_YOUR_CODE

############################################################
# This is helper code for comparing the predicted optimal
# actions for 2 MDPs with varying max speed constraints
gym.register(
    id="CustomMountainCar-v0",
    entry_point="custom_mountain_car:CustomMountainCarEnv",
    max_episode_steps=1000,
    reward_threshold=-110.0,
)

mdp1 = ContinuousGymMDP("CustomMountainCar-v0", discount=0.999, timeLimit=1000)
mdp2 = ContinuousGymMDP("CustomMountainCar-v0", discount=0.999, timeLimit=1000)

# This is a helper function for 5c. This function runs
# ConstrainedQLearning, then simulates various trajectories through the MDP
# and compares the frequency of various optimal actions.
def compare_MDP_Strategies(mdp1: ContinuousGymMDP, mdp2: ContinuousGymMDP):
    rl1 = ConstrainedQLearning(
        36,
        lambda s: fourierFeatureExtractor(s, maxCoeff=5, scale=[1, 15]),
        mdp1.actions,
        mdp1.discount,
        mdp1.env.force,
        mdp1.env.gravity,
        10000,
        explorationProb=0.2,
    )
    rl2 = ConstrainedQLearning(
        36,
        lambda s: fourierFeatureExtractor(s, maxCoeff=5, scale=[1, 15]),
        mdp2.actions,
        mdp2.discount,
        mdp2.env.force,
        mdp2.env.gravity,
        0.065,
        explorationProb=0.2,
    )
    sampleKRLTrajectories(mdp1, rl1)
    sampleKRLTrajectories(mdp2, rl2)

def sampleKRLTrajectories(mdp: ContinuousGymMDP, rl: ConstrainedQLearning):
    accelerate_left, no_accelerate, accelerate_right = 0, 0, 0
    for n in range(100):
        traj = util.sample_RL_trajectory(mdp, rl)
        accelerate_left = traj.count(0)
        no_accelerate = traj.count(1)
        accelerate_right = traj.count(2)

    print(f"\nRL with MDP -> start state:{mdp.startState()}, max_speed:{rl.max_speed}")
    print(f"  *  total accelerate left actions: {accelerate_left}, total no acceleration actions: {no_accelerate}, total accelerate right actions: {accelerate_right}")


PACMAN

SCHEDULING

In [None]:
import copy
import util  # expose as submission.util for external callers
from util import CSP, get_or_variable, CourseBulletin, Profile
from typing import Dict, List

############################################################
# Problem 1

# Hint: Take a look at the CSP class and the CSP examples in util.py for
# reference; they should be helpful.
# Hint: The following general examples should be helpful references.
# Add a variable to the CSP:
#   csp = CSP()
#   csp.add_variable(VAR_NAME, VAR_DOMAIN)
# Add a unary factor:
#   csp.add_unary_factor(VAR_NAME, factor_function)
# Add a binary factor:
#   csp.add_binary_factor(VAR1_NAME, VAR2_NAME, factor_function)
#
# Notice that the third input to add_binary_factor is a function!
# factor_function should return 0 if the constraints are unsatisfied, or
# 1 or weight (for problem 3) otherwise.
# You can define factor_function with lambdas or a nested function in Python.
# The following are example functions corresponding to binary factors:
# Using lambdas:
#   csp.add_binary_factor(VAR1_NAME, VAR2_NAME, lambda x, y: x == y)
# Using nested functions:
#   def are_equal(x,y):
#     return x == y
#   csp.add_binary_factor(VAR1_NAME, VAR2_NAME, are_equal)
# See util.py for more examples.


def create_chain_csp(n: int) -> CSP:
    # name variables as x1, x2, ..., xn
    variables = [f'x{i}' for i in range(1, n+1)]
    csp = CSP()
    # Problem 1c
    # BEGIN_YOUR_CODE (our solution is 4 lines of code, but don't worry if you deviate from this)
    for i in range(1, n+1):
        csp.add_variable(f'x{i}', [0, 1])
    for i in range(1, n):
        csp.add_binary_factor(f'x{i}', f'x{i+1}', lambda x, y: 1 if x != y else 0)
    # END_YOUR_CODE
    return csp


############################################################
# Problem 2

def create_nqueens_csp(n: int = 8) -> CSP:
    """
    Return an N-Queen problem on the board of size |n| * |n|.
    You should call csp.add_variable() and csp.add_binary_factor().

    Hint: When defining the factor function (the last argument to
          csp.add_binary_factor()), you can use any variable that
          exists during the moment in which you create the factor
          function. For example, you can define a lambda function
          such as
            lambda a, b: (a + x) > (b + y)
          as long as x and y are defined at the instant in which
          you create this lambda function.

    @param n: number of queens, or the size of one dimension of the board.

    @return csp: A CSP problem with correctly configured factor tables
        such that it can be solved by a weighted CSP solver.
    """
    csp = CSP()
    # Problem 2a
    # BEGIN_YOUR_CODE (our solution is 7 lines of code, but don't worry if you deviate from this)
    for i in range(n):
        csp.add_variable(f'row_{i}', list(range(n)))
    for i in range(n):
        for j in range(i + 1, n):
            def make_constraint(row_i, row_j):
                def constraint(ri, rj):
                    if ri == rj or abs(ri - rj) == abs(row_i - row_j):
                        return 0
                    return 1
                return constraint
            csp.add_binary_factor(f'row_{i}', f'row_{j}', make_constraint(i, j))
    # END_YOUR_CODE
    return csp

# A backtracking algorithm that solves weighted CSP.
# Usage:
#   search = BacktrackingSearch()
#   search.solve(csp)


class BacktrackingSearch:
    def __init__(self, csp=None):
        """Initialize the backtracking search solver."""
        self.csp = csp
        self.mcv = False
        self.ac3 = False
        self.reset_results()
    
    def reset_results(self) -> None:
        """
        This function resets the statistics of the different aspects of the
        CSP solver. We will be using the values here for grading, so please
        do not make any modification to these variables.
        """
        # Keep track of the best assignment and weight found.
        self.optimalAssignment = {}
        self.optimalWeight = 0

        # Keep track of the number of optimal assignments and assignments. These
        # two values should be identical when the CSP is unweighted or only has binary
        # weights.
        self.numOptimalAssignments = 0
        self.numAssignments = 0

        # Keep track of the number of times backtrack() gets called.
        self.numOperations = 0

        # Keep track of the number of operations to get to the very first successful
        # assignment (doesn't have to be optimal).
        self.firstAssignmentNumOperations = 0

        # List of all solutions found.
        self.allAssignments = []
        self.allOptimalAssignments = []

    @property
    def num_operations(self):
        """Alias for numOperations for compatibility."""
        return self.numOperations
    
    @property
    def num_optimal(self):
        """Alias for numOptimalAssignments for compatibility."""
        return self.numOptimalAssignments

    def print_stats(self) -> None:
        """
        Prints a message summarizing the outcome of the solver.
        """
        if self.optimalAssignment:
            print(f'Found {self.numOptimalAssignments} optimal assignments \
                    with weight {self.optimalWeight} in {self.numOperations} operations')
            print(
                f'First assignment took {self.firstAssignmentNumOperations} operations')
        else:
            print(
                "No consistent assignment to the CSP was found. The CSP is not solvable.")

    def get_delta_weight(self, assignment: Dict, var, val) -> float:
        """
        Given a CSP, a partial assignment, and a proposed new value for a variable,
        return the change of weights after assigning the variable with the proposed
        value.

        @param assignment: A dictionary of current assignment. Unassigned variables
            do not have entries, while an assigned variable has the assigned value
            as value in dictionary. e.g. if the domain of the variable A is [5,6],
            and 6 was assigned to it, then assignment[A] == 6.
        @param var: name of an unassigned variable.
        @param val: the proposed value.

        @return w: Change in weights as a result of the proposed assignment. This
            will be used as a multiplier on the current weight.
        """
        assert var not in assignment
        w = 1.0
        if var in self.csp.unaryFactors:
            w *= self.csp.unaryFactors[var](val)
            if w == 0:
                return w
        for var2, factor in list(self.csp.binaryFactors[var].items()):
            if var2 not in assignment:
                continue  # Not assigned yet
            w *= factor(val, assignment[var2])
            if w == 0:
                return w
        return w

    def satisfies_constraints(self, assignment: Dict, var, val) -> bool:
        """
        Given a CSP, a partial assignment, and a proposed new value for a variable,
        return whether or not assigning the variable with the proposed new value
        still statisfies all of the constraints.

        @param assignment: A dictionary of current assignment. Unassigned variables
            do not have entries, while an assigned variable has the assigned value
            as value in dictionary. e.g. if the domain of the variable A is [5,6],
            and 6 was assigned to it, then assignment[A] == 6.
        @param var: name of an unassigned variable.
        @param val: the proposed value.

        @return w: Change in weights as a result of the proposed assignment. This
            will be used as a multiplier on the current weight.
        """
        return self.get_delta_weight(assignment, var, val) != 0

    def solve(self, csp: CSP = None, mcv: bool = False, ac3: bool = False) -> None:
        """
        Solves the given weighted CSP using heuristics as specified in the
        parameter. Note that unlike a typical unweighted CSP where the search
        terminates when one solution is found, we want this function to find
        all possible assignments. The results are stored in the variables
        described in reset_result().

        @param csp: A weighted CSP. If None, uses the CSP from initialization.
        @param mcv: When enabled, Most Constrained Variable heuristics is used.
        @param ac3: When enabled, AC-3 will be used after each assignment of an
            variable is made.
        """
        # CSP to be solved.
        if csp is not None:
            self.csp = csp
        elif self.csp is None:
            raise ValueError("No CSP provided to solve")

        # Set the search heuristics requested asked.
        self.mcv = mcv
        self.ac3 = ac3

        # Reset solutions from previous search.
        self.reset_results()

        # The dictionary of domains of every variable in the CSP.
        self.domains = {
            var: list(self.csp.values[var]) for var in self.csp.variables}

        # Perform backtracking search.
        self.backtrack({}, 0, 1)
        # Print summary of solutions.
        self.print_stats()

        # Return all optimal assignments (or all assignments if no weights)
        return self.allOptimalAssignments if self.allOptimalAssignments else self.allAssignments

    def backtrack(self, assignment: Dict, numAssigned: int, weight: float) -> None:
        """
        Perform the back-tracking algorithms to find all possible solutions to
        the CSP.

        @param assignment: A dictionary of current assignment. Unassigned variables
            do not have entries, while an assigned variable has the assigned value
            as value in dictionary. e.g. if the domain of the variable A is [5,6],
            and 6 was assigned to it, then assignment[A] == 6.
        @param numAssigned: Number of currently assigned variables
        @param weight: The weight of the current partial assignment.
        """

        self.numOperations += 1
        assert weight > 0
        if numAssigned == self.csp.numVars:
            # A satisfiable solution have been found. Update the statistics.
            self.numAssignments += 1
            newAssignment = {}
            for var in self.csp.variables:
                newAssignment[var] = assignment[var]
            self.allAssignments.append(newAssignment)

            if len(self.optimalAssignment) == 0 or weight >= self.optimalWeight:
                if weight == self.optimalWeight:
                    self.numOptimalAssignments += 1
                    self.allOptimalAssignments.append(newAssignment)
                else:
                    self.numOptimalAssignments = 1
                    self.allOptimalAssignments = [newAssignment]
                self.optimalWeight = weight

                self.optimalAssignment = newAssignment
                if self.firstAssignmentNumOperations == 0:
                    self.firstAssignmentNumOperations = self.numOperations
            return

        # Select the next variable to be assigned.
        var = self.get_unassigned_variable(assignment)
        # Get an ordering of the values.
        ordered_values = self.domains[var]

        # Continue the backtracking recursion using |var| and |ordered_values|.
        if not self.ac3:
            # When arc consistency check is not enabled.
            for val in ordered_values:
                deltaWeight = self.get_delta_weight(assignment, var, val)
                if deltaWeight > 0:
                    assignment[var] = val
                    self.backtrack(assignment, numAssigned +
                                   1, weight * deltaWeight)
                    del assignment[var]
        else:
            # Arc consistency check is enabled. This is helpful to speed up 3c.
            for val in ordered_values:
                deltaWeight = self.get_delta_weight(assignment, var, val)
                if deltaWeight > 0:
                    assignment[var] = val
                    # create a deep copy of domains as we are going to look
                    # ahead and change domain values
                    localCopy = copy.deepcopy(self.domains)
                    # fix value for the selected variable so that hopefully we
                    # can eliminate values for other variables
                    self.domains[var] = [val]

                    # enforce arc consistency
                    self.apply_arc_consistency(var)

                    self.backtrack(assignment, numAssigned +
                                   1, weight * deltaWeight)
                    # restore the previous domains
                    self.domains = localCopy
                    del assignment[var]

    def get_unassigned_variable(self, assignment: Dict):
        """
        Given a partial assignment, return a currently unassigned variable.

        @param assignment: A dictionary of current assignment. This is the same as
            what you've seen so far.

        @return var: a currently unassigned variable. The type of the variable
            depends on what was added with csp.add_variable
        """
            
        if not self.mcv:
            # Select a variable without any heuristics.
            for var in self.csp.variables:
                if var not in assignment:
                    return var
        else:
            # Problem 2b
            # Heuristic: most constrained variable (MCV)
            # Select a variable with the least number of remaining domain values.
            # Hint: given var, self.domains[var] gives you all the possible values.
            #       Make sure you're finding the domain of the right variable!
            # Hint: satisfies_constraints determines whether or not assigning a
            #       variable to some value given a partial assignment continues
            #       to satisfy all constraints.
            # Hint: for ties, choose the variable with lowest index in self.csp.variables
            # BEGIN_YOUR_CODE (our solution is 13 lines of code, but don't worry if you deviate from this)
            min_domain_size = float('inf')
            best_var = None
            
            for var in self.csp.variables:
                if var not in assignment:
                    consistent_values = 0
                    for val in self.domains[var]:
                        if self.satisfies_constraints(assignment, var, val):
                            consistent_values += 1
                    
                    if consistent_values < min_domain_size:
                        min_domain_size = consistent_values
                    best_var = var
            
            return best_var
            # END_YOUR_CODE

    def apply_arc_consistency(self, var) -> None:
        """
        Perform the AC-3 algorithm. The goal is to reduce the size of the
        domain values for the unassigned variables based on arc consistency.

        @param var: The variable whose value has just been set.
        """

        def remove_inconsistent_values(var1, var2):
            removed = False
            # the binary factor must exist because we add var1 from var2's neighbor
            factor = self.csp.binaryFactors[var1][var2]
            for val1 in list(self.domains[var1]):
                # Note: in our implementation, it's actually unnecessary to check unary factors,
                #       because in get_delta_weight() unary factors are always checked.
                if (self.csp.unaryFactors[var1] and self.csp.unaryFactors[var1][val1] == 0) or \
                        all(factor[val1][val2] == 0 for val2 in self.domains[var2]):
                    self.domains[var1].remove(val1)
                    removed = True
            return removed

        queue = [var]
        while len(queue) > 0:
            curr = queue.pop(0)
            for neighbor in self.csp.get_neighbor_vars(curr):
                if remove_inconsistent_values(neighbor, curr):
                    queue.append(neighbor)


def create_sum_variable(csp: CSP, name: str, variables: List, maxSum: int) -> tuple:
    """
    Given a list of |variables| each with non-negative integer domains,
    returns the name of a new variable with domain range(0, maxSum+1), such that
    it's consistent with the value |n| iff the assignments for |variables|
    sums to |n|.

    @param name: Prefix of all the variables that are going to be added.
        Can be any hashable objects. For every variable |var| added in this
        function, it's recommended to use a naming strategy such as
        ('sum', |name|, |var|) to avoid conflicts with other variable names.
    @param variables: A list of variables that are already in the CSP that
        have non-negative integer values as its domain.
    @param maxSum: An integer indicating the maximum sum value allowed. You
        can use it to get the auxiliary variables' domain

    @return result: The name of a newly created variable with domain range
        [0, maxSum] such that it's consistent with an assignment of |n|
        iff the assignment of |variables| sums to |n|.
    """

    result = ('sum', name, 'aggregated')
    csp.add_variable(result, list(range(maxSum + 1)))

    if len(variables) == 0:
        csp.add_unary_factor(result, lambda x: x == 0)
        return result

    domain = []
    for i in range(maxSum + 1):
        for j in range(i, maxSum + 1):
            domain.append((i, j))

    for i in range(len(variables)):
        csp.add_variable(('sum', name, str(i)), domain)

    csp.add_unary_factor(('sum', name, '0'), lambda x: x[0] == 0)

    for i in range(len(variables)):
        f = ('sum', name, str(i))
        csp.add_binary_factor(f, variables[i], lambda x, y: x[1] == x[0] + y)

    for i in range(len(variables) - 1):
        f0 = ('sum', name, str(i))
        f1 = ('sum', name, str(i + 1))
        csp.add_binary_factor(f0, f1, lambda x, y: x[1] == y[0])

    csp.add_binary_factor(
        ('sum', name, str(len(variables) - 1)), result, lambda x, y: x[1] == y)

    return result

############################################################
# Problem 3

# A class providing methods to generate CSP that can solve the course scheduling
# problem.


class SchedulingCSPConstructor:
    def __init__(self, bulletin: CourseBulletin, profile: Profile):
        """
        Saves the necessary data.

        @param bulletin: Stanford Bulletin that provides a list of courses
        @param profile: A student's profile and requests
        """

        self.bulletin = bulletin
        self.profile = profile
        # Internal CSP used by zero-argument helpers expected by grader
        self.csp = CSP()

    def add_variables(self, csp: CSP = None) -> None:
        """
        Adding the variables into the CSP. Each variable, (request, quarter),
        can take on the value of one of the courses requested in request or None.
        For instance, for quarter='Aut2013', and a request object, request, generated
        from 'CS221 or CS246', (request, quarter) should have the domain values
        ['CS221', 'CS246', None]. Conceptually, if var is assigned 'CS221'
        then it means we are taking 'CS221' in 'Aut2013'. If it's None, then
        we are not taking either of them in 'Aut2013'.

        @param csp: Optional CSP to operate on. Defaults to self.csp.
        """
        csp = csp if csp is not None else self.csp
        for request in self.profile.requests:
            for quarter in self.profile.quarters:
                csp.add_variable((request, quarter), request.cids + [None])

    def add_bulletin_constraints(self, csp: CSP = None) -> None:
        """
        Add the constraints that a course can only be taken if it's offered in
        that quarter.

        @param csp: Optional CSP to operate on. Defaults to self.csp.
        """
        csp = csp if csp is not None else self.csp
        for request in self.profile.requests:
            for quarter in self.profile.quarters:
                csp.add_unary_factor((request, quarter),
                                     lambda cid: cid is None or
                                     self.bulletin.courses[cid].is_offered_in(quarter))

    def add_norepeating_constraints(self, csp: CSP = None) -> None:
        """
        No course can be repeated. Coupling with our problem's constraint that
        only one of a group of requested course can be taken, this implies that
        every request can only be satisfied in at most one quarter.

        @param csp: Optional CSP to operate on. Defaults to self.csp.
        """
        csp = csp if csp is not None else self.csp
        for request in self.profile.requests:
            for quarter1 in self.profile.quarters:
                for quarter2 in self.profile.quarters:
                    if quarter1 == quarter2:
                        continue
                    csp.add_binary_factor((request, quarter1), (request, quarter2),
                                          lambda cid1, cid2: cid1 is None or cid2 is None)

    def get_basic_csp(self) -> CSP:
        """
        Return a CSP that only enforces the basic constraints that a course can
        only be taken when it's offered and that a request can only be satisfied
        in at most one quarter.

        @return csp: A CSP where basic variables and constraints are added.
        """

        # Reset internal CSP and build basics onto it
        self.csp = CSP()
        self.add_variables(self.csp)
        self.add_bulletin_constraints(self.csp)
        self.add_norepeating_constraints(self.csp)
        return self.csp

    def add_quarter_constraints(self, csp: CSP) -> None:
        """
        If the profile explicitly wants a request to be satisfied in some given
        quarters, e.g. Aut2013, then add constraints to not allow that request to
        be satisfied in any other quarter. If a request doesn't specify the
        quarter(s), do nothing.

        @param csp: The CSP where the additional constraints will be added to.
        """
        # Problem 3a
        # Hint: If a request doesn't specify the quarter(s), do nothing.
        # Hint: To check which quarters are specified by a request variable
        #       named `request`, use request.quarters (NOT self.profile.quarters).
        # BEGIN_YOUR_CODE (our solution is 5 lines of code, but don't worry if you deviate from this)
        for request in self.profile.requests:
            if request.quarters:
                for quarter in self.profile.quarters:
                    if quarter not in request.quarters:
                        def make_quarter_factor():
                            def quarter_factor(cid):
                                return 1 if cid is None else 0
                            return quarter_factor
                        csp.add_unary_factor((request, quarter), make_quarter_factor())
        # END_YOUR_CODE

    def add_request_weights(self, csp: CSP) -> None:
        """
        Incorporate weights into the CSP. By default, a request has a weight
        value of 1 (already configured in Request). You should only use the
        weight when one of the requested course is in the solution. A
        unsatisfied request should also have a weight value of 1.

        @param csp: The CSP where the additional constraints will be added to.
        """

        for request in self.profile.requests:
            for quarter in self.profile.quarters:
                def make_weight_factor(req_weight):
                    def weight_factor(cid):
                        return req_weight if cid != None else 1.0
                    return weight_factor
                csp.add_unary_factor((request, quarter), make_weight_factor(request.weight))

    def add_prereq_constraints(self, csp: CSP) -> None:
        """
        Adding constraints to enforce prerequisite. A course can have multiple
        prerequisites. You can assume that *all courses in req.prereqs are
        being requested*. Note that if our parser inferred that one of your
        requested course has additional prerequisites that are also being
        requested, these courses will be added to req.prereqs. You will be notified
        with a message when this happens. Also note that req.prereqs apply to every
        single course in req.cids. If a course C has prerequisite A that is requested
        together with another course B (i.e. a request of 'A or B'), then taking B does
        not count as satisfying the prerequisite of C. You cannot take a course
        in a quarter unless all of its prerequisites have been taken *before* that
        quarter. You should take advantage of get_or_variable().

        @param csp: The CSP where the additional constraints will be added to.
        """

        # Iterate over all request courses
        for req in self.profile.requests:
            if len(req.prereqs) == 0:
                continue
            # Iterate over all possible quarters
            for quarter_i, quarter in enumerate(self.profile.quarters):
                # Iterate over all prerequisites of this request
                for pre_cid in req.prereqs:
                    # Find the request with this prerequisite
                    for pre_req in self.profile.requests:
                        if pre_cid not in pre_req.cids:
                            continue
                        # Make sure this prerequisite is taken before the requested course(s)
                        prereq_vars = [(pre_req, q)
                                       for i, q in enumerate(self.profile.quarters) if i < quarter_i]
                        v = (req, quarter)
                        orVar = get_or_variable(
                            csp, (v, pre_cid), prereq_vars, pre_cid)
                        # Note this constraint is enforced only when the course is taken
                        # in `quarter` (that's why we test `not val`)
                        csp.add_binary_factor(
                            orVar, v, lambda o, val: not val or o)

    def add_unit_constraints(self, csp: CSP) -> None:
        """
        Add constraints to the CSP to ensure that course units are correctly assigned.
        This means meeting two conditions:

        1- If a course is taken in a given quarter, it should be taken for
           a number of units that is within bulletin.courses[cid].minUnits/maxUnits.
           If not taken, it should be 0.
        2- In each quarter, the total number of units of courses taken should be between
           profile.minUnits/maxUnits, inclusively.
           You should take advantage of create_sum_variable() to implement this.

        For every requested course, you must create a variable named (courseId, quarter)
        (e.g. ('CS221', 'Aut2013')) and its assigned value is the number of units.
        This variable is how our solution extractor will obtain the number of units,
        so be sure to add it.

        For a request 'A or B', if you choose to take A, then you must use a unit
        number that's within the range of A.

        @param csp: The CSP where the additional constraints will be added to.
        """
        # Problem 3b
        # Hint 1: read the documentation above carefully
        # Hint 2: the domain for each (courseId, quarter) variable should contain 0
        #         because the course might not be taken
        # Hint 3: add appropriate binary factor between (request, quarter) and
        #         (courseId, quarter) variables. Remember that a course can only
        #         be requested at most once in a profile and that if you have a
        #         request such as 'request A or B', then A being taken must
        #         mean that B is NOT taken in the schedule.
        # Hint 4: you must ensure that the sum of units per quarter for your schedule
        #         are within the min and max threshold inclusive
        # Hint 5: use nested functions and lambdas like what get_or_variable and
        #         add_prereq_constraints do
        # Hint 6: don't worry about quarter constraints in each Request as they'll
        #         be enforced by the constraints added by add_quarter_constraints
        # Hint 7: if 'a' and 'b' are the names of two variables constrained to have
        #         sum 7, you can create a variable for their sum
        #         by calling var = create_sum_variable(csp, SOME_NAME, ['a', 'b'], 7).
        #         Note: 7 in the call just helps create the domain; it does
        #         not define a constraint. You can add a constraint to the
        #         sum variable (e.g., to make sure that the number of units
        #         taken in a quarter are within a specific range) just as
        #         you've done for other variables.
        # Hint 8: if having trouble with starting, here is some general structure
        #         that you could use to get started:
        #           loop through self.profile.quarters
        #               ...
        #               loop through self.profile.requests
        #                   loop through request.cids
        #                       ...
        #               ...
        # BEGIN_YOUR_CODE (our solution is 21 lines of code, but don't worry if you deviate from this)
        for quarter in self.profile.quarters:
            unit_vars = []
            for request in self.profile.requests:
                for cid in request.cids:
                    course = self.bulletin.courses[cid]
                    unit_var = (cid, quarter)
                    csp.add_variable(unit_var, list(range(course.maxUnits + 1)))
                    
                    def make_unit_factor(course_id, min_units, max_units):
                        def unit_factor(request_course, units):
                            if request_course is None:
                                return 1 if units == 0 else 0
                            elif request_course == course_id:
                                return 1 if min_units <= units <= max_units else 0
                            else:
                                return 1 if units == 0 else 0
                        return unit_factor
                    
                    csp.add_binary_factor((request, quarter), unit_var, 
                                         make_unit_factor(cid, course.minUnits, course.maxUnits))
                    unit_vars.append(unit_var)
            
            if unit_vars:
                max_total_units = sum(self.bulletin.courses[cid].maxUnits for cid in [var[0] for var in unit_vars])
                sum_var = create_sum_variable(csp, f'quarter_{quarter}', unit_vars, max_total_units)
                def make_unit_limit_factor(min_units, max_units):
                    def unit_limit_factor(total):
                        return 1 if min_units <= total <= max_units else 0
                    return unit_limit_factor
                csp.add_unary_factor(sum_var, make_unit_limit_factor(self.profile.minUnits, self.profile.maxUnits))
        # END_YOUR_CODE

    def add_all_additional_constraints(self, csp: CSP) -> None:
        """
        Add all additional constraints to the CSP.

        @param csp: The CSP where the additional constraints will be added to.
        """
        self.add_quarter_constraints(csp)
        self.add_request_weights(csp)
        self.add_prereq_constraints(csp)
        self.add_unit_constraints(csp)

    def construct_csp(self) -> CSP:
        """
        Construct and return the full scheduling CSP by adding basic and
        additional constraints.
        """
        csp = self.get_basic_csp()
        self.add_all_additional_constraints(csp)
        return csp

CAR

In [None]:
'''
Licensing Information: Please do not distribute or publish solutions to this
project. You are free to use and extend Driverless Car for educational
purposes. The Driverless Car project was developed at Stanford, primarily by
Chris Piech (piech@cs.stanford.edu). It was inspired by the Pacman projects.
'''
import collections
import math
import random
import util
from engine.const import Const
from util import Belief


class ExactInference:
    """ 
    Maintain and update a belief distribution over the probability of a car
    being in a tile using exact updates (correct, but slow times).
    """

    def __init__(self, numRows: int, numCols: int):
        """
        Constructor that initializes an ExactInference object which has
        numRows x numCols number of tiles.
        """
        self.skipElapse = False  ### ONLY USED BY GRADER.PY in case problem 2 has not been completed
        # util.Belief is a class (constructor) that represents the belief for a single
        # inference state of a single car (see util.py).
        self.belief = util.Belief(numRows, numCols)
        self.transProb = util.loadTransProb()

    ##################################################################################
    # Problem 1:
    # Function: Observe (update the probabilities based on an observation)
    # -----------------
    # Takes |self.belief| -- an object of class Belief, defined in util.py --
    # and updates it in place based on the distance observation $d_t$ and
    # your position $a_t$.
    #
    # - agentX: x location of your car (not the one you are tracking)
    # - agentY: y location of your car (not the one you are tracking)
    # - observedDist: true distance plus a mean-zero Gaussian with standard
    #                 deviation Const.SONAR_STD
    #
    # Notes:
    # - Convert row and col indices into locations using util.rowToY and util.colToX.
    # - util.pdf: computes the probability density function for a Gaussian
    # - Although the gaussian pdf is symmetric with respect to the mean and value,
    #   you should pass arguments to util.pdf in the correct order
    # - Don't forget to normalize self.belief after you update its probabilities!
    ##################################################################################

    def observe(self, agentX: int, agentY: int, observedDist: float) -> None:
        # BEGIN_YOUR_CODE (our solution is 7 lines of code, but don't worry if you deviate from this)
        # raise Exception("Not implemented yet")
        for row in range(self.belief.numRows):
            for col in range(self.belief.numCols):
                euclu_dist=math.sqrt((util.colToX(col) - agentX) **2 + (util.rowToY(row) - agentY) ** 2)
                prob_distrib =  util.pdf(euclu_dist, Const.SONAR_STD, observedDist)
                self.belief.setProb(row, col, self.belief.getProb(row,col) * prob_distrib)
        self.belief.normalize()
        # END_YOUR_CODE

    ##################################################################################
    # Problem 2:
    # Function: Elapse Time (propose a new belief distribution based on a learned transition model)
    # ---------------------
    # Takes |self.belief| and updates it based on the passing of one time step.
    # Notes:
    # - Tile coordinates are represented as (row, col) tuples
    # - Use the transition probabilities in self.transProb, which is a dictionary
    #   containing all the ((oldTile, newTile), transProb) key-val pairs that you
    #   must consider
    # - If there are ((oldTile, newTile), transProb) pairs not in self.transProb,
    #   they are assumed to have zero probability, and you can safely ignore them.
    # - Use the addProb (or setProb) and getProb methods of the Belief class to modify
    #   and access the probabilities associated with a belief.  (See util.py.)
    # - Be careful that you are using only the CURRENT self.belief distribution to compute
    #   updated beliefs.  Don't incrementally update self.belief and use the updated value
    #   for one grid square to compute the update for another square.
    # - Don't forget to normalize self.belief after all probabilities have been updated!
    #   (so that the sum of probabilities is exactly 1 as otherwise adding/multiplying
    #    small floating point numbers can lead to sum being close to but not equal to 1)
    ##################################################################################
    def elapseTime(self) -> None:
        if self.skipElapse: ### ONLY FOR THE GRADER TO USE IN Problem 1
            return
        # BEGIN_YOUR_CODE (our solution is 7 lines of code, but don't worry if you deviate from this)
        # raise Exception("Not implemented yet")
        new_belief =util.Belief(self.belief.numRows, self.belief.numCols, value=0)
        for old_tile, new_tile in self.transProb:
            new_belief.addProb(new_tile[0], new_tile[1], self.belief.getProb(*old_tile)* self.transProb[(old_tile, new_tile)])
        new_belief.normalize()
        self.belief = new_belief
        # END_YOUR_CODE

    def getBelief(self) -> Belief:
        """
        Returns your belief of the probability that the car is in each tile. Your
        belief probabilities should sum to 1.
        """
        return self.belief


class ExactInferenceWithSensorDeception(ExactInference):
    """
    Same as ExactInference except with sensor deception attack represented in the
    observation function.
    """

    def __init__(self, numRows: int, numCols: int, skewness: float = 0.5):
        """
        Constructor that initializes an ExactInference object which has
        numRows x numCols number of tiles, as well as a skewness factor
        used to calculate the skewed observed distance distribution.
        """
        super().__init__(numRows, numCols)
        self.skewness = skewness

    ##################################################################################
    # Problem 4:
    # Function: Observe with sensor deception (update the probabilities based on an observation)
    # -----------------
    # Apply the adjustment to observed distance based on the transformation
    # D_t_' = 1/(1+skewness**2) * D_t + sqrt(2 * (1/(1+skewness**2))) then copy
    # your previous observe() implementation from ExactInference() to update the probabilities.
    # You could also call the parent class' observe(x, y, dist) method in place of copying
    # the implementation, but either approach is acceptable.
    # Note that the skewness parameter is set in the constructor.
    #
    # - agentX: x location of your car (not the one you are tracking)
    # - agentY: y location of your car (not the one you are tracking)
    # - observedDist: true distance plus a mean-zero Gaussian with standard
    #                 deviation Const.SONAR_STD
    #
    # Notes:
    # - Convert row and col indices into locations using util.rowToY and util.colToX.
    # - util.pdf: computes the probability density function for a Gaussian
    # - Although the gaussian pdf is symmetric with respect to the mean and value,
    #   you should pass arguments to util.pdf in the correct order
    # - Don't forget to normalize self.belief after you update its probabilities!
    ##################################################################################

    def observe(self, agentX: int, agentY: int, observedDist: float) -> None:
        # BEGIN_YOUR_CODE (our solution is 5 lines of code, but don't worry if you deviate from this)
        # raise Exception("Not implemented yet")
        adjusted_dist = (1/(1 + self.skewness ** 2)) * observedDist + math.sqrt(2 * (1/(1 + self.skewness ** 2)))
        
        for row in range(self.belief.numRows):
            for col in range(self.belief.numCols):
                true_dist  = math.sqrt((util.colToX(col)-agentX) ** 2 + (util.rowToY(row)  -  agentY)**2)
                prob = util.pdf(true_dist, Const.SONAR_STD, adjusted_dist)
                self.belief.setProb(row, col, self.belief.getProb(row,col) * prob)
        self.belief.normalize()
        
        # END_YOUR_CODE

    def elapseTime(self) -> None:
        super().elapseTime()

    def getBelief(self) -> Belief:
        return super().getBelief()
