In [0]:
from pyspark.sql import functions as F
import numpy as np
from math import radians, cos, sin, asin, sqrt


In [0]:
STARTING_LAT= 35.780175
STARTING_LON= -78.633199

In [0]:
customer_table_df = spark.read.option("header", "true").csv("/Volumes/datasets/default/data_for_tech_opt/customer_table.csv")

tech_table_df = spark.read.option("header", "true").csv("/Volumes/datasets/default/data_for_tech_opt/tech_table.csv")

customer_schedule_df = spark.read.option("header", "true").csv("/Volumes/datasets/default/data_for_tech_opt/customer_schedule.csv") 

In [0]:
import pandas as pd
import numpy as np
from collections import defaultdict
 
tech_table_pd = tech_table_df.toPandas()
customer_table_pd = customer_table_df.toPandas()
customer_schedule_pd = customer_schedule_df.toPandas()

# Get unique technicians
technicians = tech_table_pd['Technician'].unique() 

# Function to assign technicians to jobs
def assign_technicians(schedule_df, technicians):
    """
    Randomly assign technicians to jobs with constraints:
    - Each tech gets 4 jobs per day (2 morning, 2 afternoon)
    - Each tech gets 2 jobs per time slot
    """
    # Create a copy to work with
    result_df = schedule_df.copy()
    result_df['Assigned_Tech'] = None
    
    # Get unique days
    days = result_df['Day_of_Week'].unique()
    
    # Track assignments per tech per day and time slot
    # Format: {day: {time_slot: {tech: count}}}
    assignments = {}
    
    for day in days:
        assignments[day] = {
            'Morning': {tech: 0 for tech in technicians},
            'Afternoon': {tech: 0 for tech in technicians}
        }
    
    # Process each day separately
    for day in days:
        day_jobs = result_df[result_df['Day_of_Week'] == day].copy()
        
        # Process each time slot
        for time_slot in ['Morning', 'Afternoon']:
            slot_jobs = day_jobs[day_jobs['Arrival_Time_Slot'] == time_slot]
            slot_indices = slot_jobs.index.tolist()
            
            # Shuffle the jobs randomly
            np.random.shuffle(slot_indices)
            
            # Assign jobs in a round-robin fashion to ensure even distribution
            tech_idx = 0
            for job_idx in slot_indices:
                # Find a tech that hasn't reached their limit for this time slot
                attempts = 0
                while attempts < len(technicians):
                    tech = technicians[tech_idx % len(technicians)]
                    
                    # Check if this tech can take another job in this time slot
                    if assignments[day][time_slot][tech] < 2:
                        result_df.loc[job_idx, 'Assigned_Tech'] = tech
                        assignments[day][time_slot][tech] += 1
                        tech_idx += 1
                        break
                    
                    tech_idx += 1
                    attempts += 1
                
                if attempts >= len(technicians):
                    print(f"Warning: Could not assign job {job_idx} on {day} {time_slot}")
    
    return result_df

In [0]:
# Set random seed for reproducibility (remove this line for truly random assignments)
np.random.seed(42)

# Assign technicians
assigned_schedule = assign_technicians(customer_schedule_pd, technicians)  

assigned_schedule['job_number'] = assigned_schedule.sort_values('Job_Type') \
             .groupby(['Assigned_Tech','Day_of_Week','Arrival_Time_Slot']) \
             .cumcount() + 1

assigned_schedule['Arrival_Time_Slot_int'] = np.where(assigned_schedule['Arrival_Time_Slot'] == 'Afternoon', 2, 1)

sorted_df = assigned_schedule.sort_values(['Day_of_Week', 'Assigned_Tech', 'Arrival_Time_Slot_int','job_number'], ascending=True)

#Get the cumulative Job number for a given day and tech
assigned_schedule['cumulative_job_number'] = sorted_df.groupby(['Assigned_Tech', 'Day_of_Week'])['job_number'].cumcount()

#display(assigned_schedule)

In [0]:
assigned_schedule_spark_df = spark.createDataFrame(assigned_schedule)

joined_df = assigned_schedule_spark_df.join(
    customer_table_df,
    on='Customer_ID',
    how='inner'
) 

joined_df = joined_df.withColumn(
    "dow_int",
    F.when(joined_df.Day_of_Week == 'Monday', 1)
     .when(joined_df.Day_of_Week == 'Tuesday', 2)
     .when(joined_df.Day_of_Week == 'Wednesday', 3)
     .when(joined_df.Day_of_Week == 'Thursday', 4)
     .when(joined_df.Day_of_Week == 'Friday', 5)
     .otherwise(10)
)

# Sort by tech, day, and time slot first
sorted_data_df = joined_df.orderBy(
    ['Assigned_Tech', 'dow_int', 'cumulative_job_number']
) 

In [0]:
def haversine_distance(lat1, lon1, lat2, lon2):
    """
    Calculate distance between two points on Earth using Haversine formula.
    
    Parameters:
    -----------
    lat1, lon1 : float
        Latitude and longitude of first point (in decimal degrees)
    lat2, lon2 : float
        Latitude and longitude of second point (in decimal degrees)
    
    Returns:
    --------
    float
        Distance in miles
    
    Example:
    --------
    >>> # Distance from Raleigh to Durham, NC
    >>> dist = haversine_distance(35.7796, -78.6382, 35.9940, -78.8986)
    >>> print(f"{dist:.2f} miles")
    """
    # Convert to radians
    lat1, lon1, lat2, lon2 = map(radians, [lat1, lon1, lat2, lon2])
    
    # Haversine formula
    dlat = lat2 - lat1
    dlon = lon2 - lon1
    a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
    c = 2 * asin(sqrt(a))
    
    # Earth radius in miles (use 6371 for kilometers)
    miles = 3956 * c
    
    return miles

In [0]:
from pyspark.sql.functions import when, col, lit, lag
from pyspark.sql.window import Window
# Register the UDF (User Defined Function) for Spark
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType


In [0]:
# Dynamically add a "return to home" row for each technician and day
max_job_df = sorted_data_df.groupBy("Assigned_Tech", "Day_of_Week") \
    .agg(F.max("cumulative_job_number").alias("max_cum_job"))#sorted_data_df  df_with_distances

return_home_df = max_job_df.withColumn("cumulative_job_number", F.col("max_cum_job") + 1) \
    .withColumn("Latitude", lit(STARTING_LAT)) \
    .withColumn("Longitude", lit(STARTING_LON)) \
    .withColumn("from_latitude", lit(None).cast("double")) \
    .withColumn("from_longitude", lit(None).cast("double")) \
        .withColumn('Arrival_Time_Slot', lit('Afternoon'))

# Add other columns as null or default as needed to match sorted_data_df schema
for col_name in sorted_data_df.columns:
    if col_name not in return_home_df.columns:
        return_home_df = return_home_df.withColumn(col_name, lit(None))

# Select columns in the same order as df_with_distances
return_home_df = return_home_df.select(sorted_data_df.columns)

# Union the return_home_df to the original dataframe
sorted_data_df = sorted_data_df.unionByName(return_home_df)         

In [0]:
# Define window specification: partition by technician and day, order by job number
# Create columns for previous location using lag function
window_spec = Window.partitionBy("Assigned_Tech", "Day_of_Week") \
                    .orderBy("cumulative_job_number")

df_with_prev = sorted_data_df.withColumn("prev_latitude", lag("Latitude").over(window_spec)) \
                 .withColumn("prev_longitude", lag("Longitude").over(window_spec)) 

In [0]:
from pyspark.sql import functions as F

# For the first job of each tech/day, use the starting point
# For subsequent jobs, use the previous job's location
df_with_distances = df_with_prev.withColumn(
    "from_latitude",
    when(col("prev_latitude").isNull(), lit(STARTING_LAT))
    .otherwise(col("prev_latitude"))
).withColumn(
    "from_longitude", 
    when(col("prev_longitude").isNull(), lit(STARTING_LON))
    .otherwise(col("prev_longitude"))
)

In [0]:
haversine_udf = udf(haversine_distance, DoubleType())

In [0]:
df_with_distances = df_with_distances.withColumn("Latitude", col("Latitude").cast("double"))\
    .withColumn("Longitude", col("Longitude").cast("double"))\
        .withColumn('Historic_Avg_Ticket', col('Historic_Avg_Ticket').cast('double'))\
            .withColumn('Historic_Close_Rate', col('Historic_Close_Rate').cast('double'))

In [0]:
# Calculate distance using the haversine UDF
df_final = df_with_distances.withColumn(
    "distance_miles",
    haversine_udf(
        col("from_latitude"),
        col("from_longitude"),
        col("Latitude"),
        col("Longitude")
    )
) 

## Get expected Revenue Per Day / Tech, and Per Day / Tech / Period

In [0]:
# Calculate distance using the haversine UDF
df_final = df_final.withColumn(
    "expected_rev", col("Historic_Avg_Ticket") * col("Historic_Close_Rate")
)

In [0]:
result_dow = df_final.groupBy("Day_of_Week", "Assigned_Tech").agg(
    F.sum("expected_rev").alias("total_expected_rev"),
        F.sum('distance_miles').alias('total_miles')
) 

## Optimization Part 1: Without removing techs from jobs, what is the correct order a given job should have been done for that day and tech

In [0]:
def create_distance_matrix(latitudes, longitudes):
    """
    Create a distance matrix from lists of coordinates.
    
    This is the CORE FUNCTION you need for TSP!
    
    Parameters:
    -----------
    latitudes : list or array
        List of latitude values for all locations
    longitudes : list or array
        List of longitude values for all locations
        (must be same length as latitudes)
    
    Returns:
    --------
    numpy.ndarray
        2D array where distances[i][j] = distance from location i to location j
        Shape: (n, n) where n = number of locations
    
    Example:
    --------
    >>> lats = [35.7796, 35.9940, 36.0999]  # Raleigh, Durham, Chapel Hill
    >>> lons = [-78.6382, -78.8986, -79.0654]
    >>> distances = create_distance_matrix(lats, lons)
    >>> 
    >>> # Now use with any TSP algorithm!
    >>> print(f"Raleigh to Durham: {distances[0][1]:.2f} miles")
    >>> print(f"Durham to Chapel Hill: {distances[1][2]:.2f} miles")
    """
    
    n = len(latitudes)
    
    # Validate inputs
    if len(longitudes) != n:
        raise ValueError("latitudes and longitudes must have same length")
    
    # Initialize n x n matrix of zeros
    distance_matrix = np.zeros((n, n))
    
    # Calculate distance between every pair of locations
    for i in range(n):
        for j in range(n):
            if i == j:
                # Distance from a location to itself is 0
                distance_matrix[i][j] = 0
            else:
                # Calculate actual distance
                distance = haversine_distance(
                    latitudes[i], longitudes[i],
                    latitudes[j], longitudes[j]
                )
                distance_matrix[i][j] = distance
    
    return distance_matrix

In [0]:
def tsp_nearest_neighbor(distances, start=0):
    """Fast greedy approach - go to closest unvisited city"""
    n = len(distances)
    unvisited = set(range(n))
    current = start
    route = [current]
    unvisited.remove(current)
    total_distance = 0
    
    while unvisited:
        # Find nearest unvisited city
        nearest = min(unvisited, key=lambda city: distances[current][city])
        total_distance += distances[current][nearest]
        current = nearest
        route.append(current)
        unvisited.remove(current)
    
    # Return to start
    total_distance += distances[current][start]
    route.append(start)
    
    return route, total_distance

##Optimize the Route of a given tech for a given day

In [0]:
import pandas as pd

In [0]:
#Get necessary columns for TSP
data_for_tsp = df_final[['Assigned_Tech','Day_of_Week','Latitude','Longitude','cumulative_job_number']].toPandas()
#Adjust dataframe so that the "final" job, which is home, is now the starting position. This will be sorted later
data_for_tsp.loc[data_for_tsp['cumulative_job_number'] == 7, 'cumulative_job_number'] = 0

#data_for_tsp = data_for_tsp[(data_for_tsp['Assigned_Tech']=='Pat') & (data_for_tsp['Day_of_Week']=='Monday')] 

In [0]:
def create_distance_matrix(coordinates):
    """
    Create a distance matrix from an array of (latitude, longitude) tuples.
    
    Args:
        coordinates: List of (lat, lon) tuples
        
    Returns:
        2D numpy array with distances between all points
    """
    n = len(coordinates)
    matrix = np.zeros((n, n))
    
    for i in range(n):
        for j in range(n):
            if i != j:
                matrix[i][j] = haversine_distance(
                    coordinates[i][0], coordinates[i][1],
                    coordinates[j][0], coordinates[j][1]
                )
    
    return matrix

In [0]:
def optimize_routes(df):
    """
    Process technician schedule data and optimize routes using TSP nearest neighbor.
    
    Args:
        df: dataframe columns: 
                   Assigned_Tech, Day_of_Week, from_latitude, from_longitude, cumulative_job_number
    
    Returns:
        Dictionary with optimized routes for each technician and day
    """ 
    
    # Dictionary to store all optimized routes
    optimized_routes = {}
    
    # Group by technician and day
    for (tech, day), group in df.groupby(['Assigned_Tech', 'Day_of_Week']):
        # Sort by cumulative_job_number to maintain original order for reference
        group = group.sort_values('cumulative_job_number')
        
        # Extract coordinates
        coordinates = list(zip(group['Latitude'], group['Longitude']))
        
        # Create distance matrix
        distance_matrix = create_distance_matrix(coordinates)
        
        # Find optimal route using TSP nearest neighbor
        # Start at index 0 (first job of the day)
        optimal_route_indices, total_distance = tsp_nearest_neighbor(distance_matrix, start=0)
        
        # Map indices back to original job information
        jobs_list = group.reset_index(drop=True)
        optimal_route = []
        
        for idx in optimal_route_indices:
            job_info = {
                'optimized_job_number': idx,
                'original_job_number': int(jobs_list.iloc[idx]['cumulative_job_number']),
                'latitude': jobs_list.iloc[idx]['Latitude'],
                'longitude': jobs_list.iloc[idx]['Longitude']
            }
            optimal_route.append(job_info)
        
        # Store in results
        key = f"{tech}_{day}"
        optimized_routes[key] = {
            'technician': tech,
            'day': day,
            'total_jobs': len(coordinates),
            'total_optimized_distance_mi': round(total_distance, 2),
            'route': optimal_route
        }
    
    return optimized_routes

In [0]:
optimized_data = optimize_routes(data_for_tsp)

In [0]:
import pandas as pd

optimized_route = pd.DataFrame([
    {
        'technician': v['technician'],
        'day': v['day'],
        'total_jobs': v['total_jobs'],
        'total_optimized_distance_mi': v.get('total_optimized_distance_mi', None),
        'route': v['route']
    }
    for v in optimized_data.values()
]) 

In [0]:
df_explode = optimized_route.explode('route').reset_index(drop=True)
df_explode = pd.concat([
    df_explode.drop(['route'], axis=1),
    df_explode['route'].apply(pd.Series)
], axis=1)
 

df_explode['new_job_order'] = (
    df_explode.groupby(['technician', 'day'])
    .cumcount()
) 

In [0]:
optimized_df = df_explode[['day','technician','total_optimized_distance_mi']].drop_duplicates()

In [0]:

from pyspark.sql.functions import col

# Convert df (Pandas DataFrame) to Spark DataFrame
df_spark = spark.createDataFrame(optimized_df)

# Join df_spark into results_dow on Day_of_Week and Assigned_Tech
joined_df = result_dow.join(
    df_spark,
    (result_dow['Day_of_Week'] == df_spark['day']) & 
    (result_dow['Assigned_Tech'] == df_spark['technician']),
    how='left'
)

In [0]:
optimized_route_df = joined_df[['Day_of_Week','Assigned_Tech','total_expected_rev','total_miles','total_optimized_distance_mi']]