<a href="https://colab.research.google.com/github/rushp7/anago-inspection-router/blob/main/Client_Inspection_Route_Planner.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
pip install googlemaps



In [None]:
pip install pandas




In [None]:
pip install ortools

Collecting ortools
  Downloading ortools-9.14.6206-cp311-cp311-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl.metadata (3.3 kB)
Collecting absl-py>=2.0.0 (from ortools)
  Downloading absl_py-2.3.1-py3-none-any.whl.metadata (3.3 kB)
Collecting protobuf<6.32,>=6.31.1 (from ortools)
  Downloading protobuf-6.31.1-cp39-abi3-manylinux2014_x86_64.whl.metadata (593 bytes)
Downloading ortools-9.14.6206-cp311-cp311-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl (27.7 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m27.7/27.7 MB[0m [31m23.4 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading absl_py-2.3.1-py3-none-any.whl (135 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m135.8/135.8 kB[0m [31m9.8 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading protobuf-6.31.1-cp39-abi3-manylinux2014_x86_64.whl (321 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m321.1/321.1 kB[0m [31m19.2 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages:

In [None]:

class InspectionRoutePlanner:
    """
    Calculates an optimal daily inspection route for a month
    based on client cleaning schedules and visit frequencies.
    """
    def __init__(self, api_key, depot_address):
        if api_key == "YOUR_GOOGLE_MAPS_API_KEY" or not api_key:
            print("ERROR: Please replace 'YOUR_GOOGLE_MAPS_API_KEY' with your actual Google Maps API key.")
            self.gmaps = None
        else:
            self.gmaps = googlemaps.Client(key=api_key)
        self.depot_address = depot_address
        self.client_df = None
        self.distance_matrix = None
        self.address_map = None

    def load_and_prepare_data(self, csv_path):
        """Loads client data from CSV and prepares it for processing."""
        try:
            df = pd.read_csv(csv_path)
        except FileNotFoundError:
            print(f"ERROR: The file '{csv_path}' was not found. Please make sure it's in the same directory.")
            return False

        # Standardize column names (remove leading/trailing spaces)
        df.columns = df.columns.str.strip()

        # Define the correct column name based on the CSV file (with the typo)
        freq_col = 'Inspection Frequency (montly)'

        # Check if the column exists before using it
        if freq_col not in df.columns:
            print(f"CRITICAL ERROR: The required column '{freq_col}' was not found in the CSV.")
            print(f"Available columns are: {list(df.columns)}")
            return False

        # Filter out clients that don't require physical visits
        df = df[df[freq_col].str.lower() != 'tc'].copy()

        # Convert frequency to numeric, coercing errors and handling '0.5'
        df['visits_required'] = pd.to_numeric(df[freq_col], errors='coerce')
        df.dropna(subset=['visits_required'], inplace=True)
        df['visits_required'] = df['visits_required'].astype(int)

        # Initialize tracking columns
        df['visits_made'] = 0
        df['last_visited'] = pd.NaT

        self.client_df = df
        print(f"Successfully loaded and prepared data for {len(df)} clients requiring visits.")
        return True

    def _get_distance_matrix(self):
        """
        Generates a distance matrix using Google Directions API.
        Caches results to a JSON file to avoid redundant API calls.
        """
        if not self.gmaps:
            print("Cannot get distance matrix without a valid API key.")
            return False

        addresses = [self.depot_address] + self.client_df['Full Address'].tolist()
        self.address_map = {i: addr for i, addr in enumerate(addresses)}
        num_locations = len(addresses)

        # Load cache if it exists
        if os.path.exists(CACHE_FILE_PATH):
            with open(CACHE_FILE_PATH, 'r') as f:
                cache = json.load()
            # Basic validation of cache
            if cache.get("addresses") == addresses:
                self.distance_matrix = cache["matrix"]
                print("Loaded distance matrix from cache.")
                return True

        # --- API Cost Estimation and Progress Bar ---
        api_calls_needed = num_locations * (num_locations - 1)
        estimated_cost = api_calls_needed * COST_PER_API_CALL_USD
        print("\n--- API Usage Notice ---")
        print(f"Generating a new distance matrix requires {api_calls_needed} API calls.")
        print(f"Estimated Cost: ${estimated_cost:.2f} USD")
        input("Press Enter to continue or Ctrl+C to cancel...")

        print("\nGenerating new distance matrix...")
        matrix = [[0] * num_locations for _ in range(num_locations)]

        # Use tqdm for a progress bar
        with tqdm(total=api_calls_needed, desc="Calculating Distances") as pbar:
            for i in range(num_locations):
                for j in range(num_locations):
                    if i == j:
                        continue
                    try:
                        directions_result = self.gmaps.directions(
                            addresses[i],
                            addresses[j],
                            mode="driving",
                            departure_time=datetime.now()
                        )
                        distance = directions_result[0]['legs'][0]['distance']['value']
                        matrix[i][j] = distance
                    except Exception as e:
                        print(f"\nCould not calculate distance between '{addresses[i]}' and '{addresses[j]}': {e}", file=sys.stderr)
                        matrix[i][j] = 999999999
                    pbar.update(1)

        self.distance_matrix = matrix

        with open(CACHE_FILE_PATH, 'w') as f:
            json.dump({"addresses": addresses, "matrix": matrix}, f)

        print("Distance matrix generated and cached.")
        return True

    def _parse_working_days(self, days_str):
        """Parses various day string formats into a list of day indices (Mon=0)."""
        days_str = str(days_str).lower()
        day_map = {'mon': 0, 'tue': 1, 'wed': 2, 'thu': 3, 'fri': 4, 'sat': 5, 'sun': 6}
        days = set()

        if 'mon-fri' in days_str:
            days.update(range(5))
        elif 'mon-sat' in days_str:
            days.update(range(6))
        elif 'mon-sun' in days_str:
            days.update(range(7))
        else:
            parts = [p.strip() for p in days_str.split(',')]
            for part in parts:
                if part in day_map:
                    days.add(day_map[part])
        return list(days)

    def _parse_working_time(self, time_str):
        """Parses the cleaning end time from a string like '9am-5pm' or '7pm'."""
        time_str = str(time_str).lower()
        # Look for the last time mentioned in the string (e.g., the '5pm' in '9am-5pm')
        matches = re.findall(r'(\d{1,2})(?::(\d{2}))?\s*(am|pm)?', time_str)

        if not matches:
            return None

        # Get the last match found
        hour, minute, period = matches[-1]
        hour = int(hour)
        minute = int(minute) if minute else 0

        if period == 'pm' and hour != 12:
            hour += 12
        elif period == 'am' and hour == 12: # Midnight case
            hour = 0

        # Handle cases like '5' being interpreted as 5pm
        if not period and hour >= 1 and hour <= 6:
            hour += 12

        try:
            return datetime.strptime(f"{hour}:{minute}", "%H:%M").time()
        except ValueError:
            return None

    def _is_visitable(self, client_idx, target_date):
        """
        Checks if a client can be visited on a date based on cleaning schedule.
        Visit can be next day, or same day if 5+ hours after cleaning ends.
        """
        client = self.client_df.iloc[client_idx]
        cleaning_days = self._parse_working_days(client['Working Days'])

        # Rule 1: Visit is allowed on the day AFTER a cleaning day.
        yesterday = target_date - timedelta(days=1)
        if yesterday.weekday() in cleaning_days:
            return True

        # # Rule 2: Visit is allowed on the SAME DAY as cleaning, but with a time delay.
        # if target_date.weekday() in cleaning_days:
        #     cleaning_end_time = self._parse_working_time(client['Working Time'])
        #     if not cleaning_end_time:
        #         # If time is un-parseable, default to allowing the visit to not exclude clients.
        #         # print(f"Warning: Could not parse time for {client['Client Name']}. Allowing same-day visit by default.")
        #         return True

        #     # Check if the inspection window (end_time + delay) is still on the same day.
        #     if (cleaning_end_time.hour + INSPECTION_DELAY_HOURS) < 24:
        #         return True

        return False

    def _solve_tsp_for_day(self, client_indices_for_day):
        """
        Solves the Traveling Salesperson Problem for a given set of clients for one day.
        Returns the optimal route order and total distance.
        """
        if not client_indices_for_day:
            return [], 0

        location_indices = [0] + [i + 1 for i in client_indices_for_day]

        manager = pywrapcp.RoutingIndexManager(len(location_indices), 1, 0)
        routing = pywrapcp.RoutingModel(manager)

        def distance_callback(from_index, to_index):
            from_node = manager.IndexToNode(from_index)
            to_node = manager.IndexToNode(to_index)
            mat_from_idx = location_indices[from_node]
            mat_to_idx = location_indices[to_node]
            return self.distance_matrix[mat_from_idx][mat_to_idx]

        transit_callback_index = routing.RegisterTransitCallback(distance_callback)
        routing.SetArcCostEvaluatorOfAllVehicles(transit_callback_index)

        search_parameters = pywrapcp.DefaultRoutingSearchParameters()
        search_parameters.first_solution_strategy = (
            routing_enums_pb2.FirstSolutionStrategy.PATH_CHEAPEST_ARC)

        solution = routing.SolveWithParameters(search_parameters)

        if solution:
            index = routing.Start(0)
            route_order = []
            route_distance = 0
            while not routing.IsEnd(index):
                node_index = manager.IndexToNode(index)
                if node_index != 0:
                    original_df_index = node_index - 1
                    route_order.append(original_df_index)

                previous_index = index
                index = solution.Value(routing.NextVar(index))
                route_distance += routing.GetArcCostForVehicle(previous_index, index, 0)

            return route_order, route_distance
        return [], 0


    def generate_monthly_plan(self, start_date, num_days, max_visits_per_day, mandatory_visit):
        """Generates the full routing plan for the specified period."""
        if self.client_df is None or self.distance_matrix is None:
            print("Data or distance matrix not loaded. Cannot generate plan.")
            return None, None

        while start_date.weekday() != 0: # 0 = Monday
            start_date += timedelta(days=1)
        print(f"Planning starts on the next Monday: {start_date.strftime('%Y-%m-%d')}")

        mandatory_visit_date = datetime.strptime(mandatory_visit['date'], '%Y-%m-%d')
        full_plan = {}

        for day_offset in range(num_days):
            current_date = start_date + timedelta(days=day_offset)
            if current_date.weekday() >= 5: # Skip weekends
                continue

            eligible_clients_idx = []

            mandatory_client_idx = None
            if current_date.date() == mandatory_visit_date.date():
                matches = self.client_df.index[self.client_df['Client Name'] == mandatory_visit['client_name']].tolist()
                if matches:
                    mandatory_client_idx = matches[0]

            for i in range(len(self.client_df)):
                client = self.client_df.iloc[i]
                if i == mandatory_client_idx:
                    if self._is_visitable(i, current_date):
                        eligible_clients_idx.append(i)
                    else:
                        print(f"WARNING: Mandatory visit for {client['Client Name']} on {current_date.date()} is not possible due to cleaning schedule.")
                    continue

                if client['visits_made'] < client['visits_required']:
                    if self._is_visitable(i, current_date):
                        eligible_clients_idx.append(i)

            if not eligible_clients_idx:
                continue

            clients_to_route_idx = []
            if mandatory_client_idx is not None and mandatory_client_idx in eligible_clients_idx:
                 clients_to_route_idx.append(mandatory_client_idx)
                 eligible_clients_idx.remove(mandatory_client_idx)

            remaining_slots = max_visits_per_day - len(clients_to_route_idx)
            clients_to_route_idx.extend(eligible_clients_idx[:remaining_slots])

            if clients_to_route_idx:
                best_route_idx, total_dist = self._solve_tsp_for_day(clients_to_route_idx)

                if best_route_idx:
                    daily_plan = []
                    for client_idx in best_route_idx:
                        client_name = self.client_df.iloc[client_idx]['Client Name']
                        daily_plan.append(client_name)

                        self.client_df.loc[client_idx, 'visits_made'] += 1
                        self.client_df.loc[client_idx, 'last_visited'] = pd.to_datetime(current_date)

                    full_plan[current_date.strftime('%Y-%m-%d')] = {
                        "route": daily_plan,
                        "distance_km": total_dist / 1000
                    }

        unvisited_report = self.client_df[self.client_df['visits_made'] < self.client_df['visits_required']]
        return full_plan, unvisited_report

def format_and_print_plan(plan, report):
    """Prints the generated plan and report in a readable format."""
    print("\n" + "="*60)
    print("        OPTIMAL MONTHLY INSPECTION PLAN")
    print("="*60)

    if not plan:
        print("No routes could be generated for the given period.")
        return

    total_distance = 0
    for date, daily_plan in plan.items():
        dt = datetime.strptime(date, '%Y-%m-%d')
        day_name = dt.strftime('%A')
        print(f"\n--- {date} ({day_name}) ---")
        print("Route:")
        for i, client in enumerate(daily_plan['route']):
            print(f"  {i+1}. {client}")
        dist = daily_plan['distance_km']
        total_distance += dist
        print(f"Estimated Driving Distance: {dist:.2f} km")

    print("\n" + "="*60)
    print("                 PLAN SUMMARY")
    print("="*60)
    print(f"Total Estimated Driving Distance: {total_distance:.2f} km")

    if not report.empty:
        print("\nWARNING: The following clients could not be fully scheduled:")
        for _, client in report.iterrows():
            print(f"  - {client['Client Name']} (Required: {client['visits_required']}, Scheduled: {client['visits_made']})")
    else:
        print("\nAll required client visits were successfully scheduled!")


if __name__ == '__main__':
    # --- Main Execution ---
    planner = InspectionRoutePlanner(API_KEY, DEPOT_ADDRESS)

    if planner.load_and_prepare_data(CSV_FILE_PATH):
        if planner._get_distance_matrix():
            monthly_plan, unvisited_report = planner.generate_monthly_plan(
                start_date=PLAN_START_DATE,
                num_days=PLAN_DAYS,
                max_visits_per_day=MAX_VISITS_PER_DAY,
                mandatory_visit=MANDATORY_VISIT
            )

            if monthly_plan:
                format_and_print_plan(monthly_plan, unvisited_report)



Successfully loaded and prepared data for 47 clients requiring visits.

--- API Usage Notice ---
Generating a new distance matrix requires 2256 API calls.
Estimated Cost: $11.28 USD
Press Enter to continue or Ctrl+C to cancel...

Generating new distance matrix...


Calculating Distances: 100%|██████████| 2256/2256 [07:50<00:00,  4.79it/s]


Distance matrix generated and cached.
Planning starts on the next Monday: 2025-07-14

        OPTIMAL MONTHLY INSPECTION PLAN

--- 2025-07-14 (Monday) ---
Route:
  1. Oakville Academy for the Arts
  2. Signature Cars
  3. Montana's BBQ & Bar Brampton
Estimated Driving Distance: 93.00 km

--- 2025-07-15 (Tuesday) ---
Route:
  1. Sheridan College Trafalgar Campus
  2. Steinway Piano Gallery Toronto
  3. Montana's BBQ & Bar Brampton
  4. Canal Street Inc.(East Side Mario's)
  5. Oakville Academy for the Arts
  6. Signature Cars
Estimated Driving Distance: 145.00 km

--- 2025-07-16 (Wednesday) ---
Route:
  1. Canal Street Inc.(East Side Mario's)
  2. Signature Cars
  3. Montana's BBQ & Bar Brampton
  4. Sheridan College Trafalgar Campus
  5. Oakville Academy for the Arts
Estimated Driving Distance: 136.07 km

--- 2025-07-17 (Thursday) ---
Route:
  1. Montana's BBQ & Bar Brampton
  2. Canal Street Inc.(East Side Mario's)
  3. Oakville Academy for the Arts
  4. Signature Cars
Estimated Drivi