<a href="https://colab.research.google.com/github/workingbetter/ITNPBD5_Dissertation/blob/main/Chapter_4_GA_maps.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install googlemaps
!pip install folium
!pip install polyline
!pip install deap
api = "AIzaSyBGeyXJRZoi7BiYFjdW0yehhEPebZua3rM"

In [None]:
# Installing and importing libraries
import googlemaps
import pandas as pd
import folium
from folium.plugins import AntPath
import polyline
import math
import random
import numpy as np
from deap import base, creator, tools, algorithms
from scipy.interpolate import interp1d
import matplotlib.pyplot as plt

gmaps = googlemaps.Client(key=api)


def get_route_segments(start, end, segment_length=2):
    segment_length_m = segment_length * 1000
    directions = gmaps.directions(start, end, mode="driving")
    route = directions[0]['legs'][0]

    segments = []
    distance_covered = 0
    for step in route['steps']:
        distance_covered += step['distance']['value']
        while distance_covered >= segment_length_m:
            distance_covered -= segment_length_m
            segments.append(step['end_location'])

    return segments

def haversine_distance(coord1, coord2):
    R = 6371.0
    lat1 = math.radians(coord1['lat'])
    lon1 = math.radians(coord1['lng'])
    lat2 = math.radians(coord2['lat'])
    lon2 = math.radians(coord2['lng'])

    dlat = lat2 - lat1
    dlon = lon2 - lon1
    a = math.sin(dlat / 2)**2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon / 2)**2
    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
    return R * c

def get_ev_charging_stations(start, end, segment_length=2, radius=1000, keyword='charging station'):
    segment_endpoints = get_route_segments(start, end, segment_length)
    charging_stations = []
    last_station = None

    for i, endpoint in enumerate(segment_endpoints):
        places_result = gmaps.places_nearby(location=endpoint, radius=radius, keyword=keyword)

        if places_result['results']:
            closest_station = min(places_result['results'], key=lambda x: haversine_distance(x['geometry']['location'], endpoint))

            # Check if it's the same as the last station
            if last_station and last_station['name'] == closest_station['name']:
                continue  # Skip to the next iteration

            last_station = closest_station

            charging_station = {
                'name': closest_station['name'],
                'latitude': closest_station['geometry']['location']['lat'],
                'longitude': closest_station['geometry']['location']['lng'],
                'location(km)': (i+1) * segment_length  # It will give the actual distance
            }

            charging_stations.append(charging_station)

    directions = gmaps.directions(start, end)
    route = directions[0]
    trip_distance = sum(leg['distance']['value'] for leg in route['legs']) / 1000

    df_charging_stations = pd.DataFrame(charging_stations)
    return df_charging_stations, route['overview_polyline']['points'], trip_distance


# Test
start = 'FK9 5AL'
end = 'B4 7XG'
df_charging_stations, route_polyline, trip_distance = get_ev_charging_stations(start, end)
# print(df_charging_stations)

total_distance = trip_distance  # using the trip distance from the API



data = {
    'minutes': [i for i in range(61)],
    'state_of_charge': [9, 12, 14, 16, 19, 21, 23, 26, 28, 30, 33, 36, 38, 41, 43, 46, 48, 50, 53, 55, 58, 61, 63, 65,
                       68, 70, 72, 74, 76, 77, 79, 80, 81, 83, 84, 85, 86, 87, 88, 89, 90, 90, 91, 92, 93, 93, 94, 94,
                       95, 96, 96, 97, 97, 97, 98, 98, 98, 99, 99, 99, 100]
}

# Convert data to DataFrame
charging_rate_df = pd.DataFrame(data)
station_df = df_charging_stations

# Convert the charging_rate_df data to numpy arrays for the interpolation function
minutes = charging_rate_df['minutes'].values
soc = charging_rate_df['state_of_charge'].values / 100  #  state of charge is given in percentages


In [None]:
station_df.to_csv('stations.csv', index=False)

In [None]:

print("Total distance from the start is:  " + str(total_distance))
# print(total_distance)
display(station_df)

# Decode the polyline into coordinates
polyline_coords = polyline.decode(route_polyline)

# Generate a map centered around the middle of your route
start_location = gmaps.geocode(start)[0]['geometry']['location']
end_location = gmaps.geocode(end)[0]['geometry']['location']

start_lat, start_lng = start_location['lat'], start_location['lng']
end_lat, end_lng = end_location['lat'], end_location['lng']

map_center = [(start_lat + end_lat) / 2, (start_lng + end_lng) / 2]
map = folium.Map(location=map_center, zoom_start=6)

# Add markers for start and end points
folium.Marker(location=[start_lat, start_lng], popup='Start', icon=folium.Icon(color='green')).add_to(map)
folium.Marker(location=[end_lat, end_lng], popup='End', icon=folium.Icon(color='red')).add_to(map)

# Add polyline for the route
AntPath(locations=polyline_coords, color="blue", weight=2.5, opacity=1).add_to(map)

# Add markers for all the charging stations
for index, row in df_charging_stations.iterrows():
    folium.Marker(location=[row['latitude'], row['longitude']], popup=row['name']).add_to(map)

# Save the map
map.save('map_with_stations.html')

Total distance from the start is:  494.342
494.342


Unnamed: 0,name,latitude,longitude,location(km)
0,"Charge Your Car,ChargePlace Scotland Charging ...",56.130497,-3.964284,2
1,Chargeplace Scotland Charging Station,56.083414,-3.927975,4
2,Monta Charging Station,55.927712,-4.050797,10
3,ChargePlace Scotland Charging Station,55.8327,-4.11426,32
4,Applegreen Charging Station,55.507599,-3.695619,44
5,ChargePlace Scotland Charging Station,54.993439,-3.067321,90
6,EB Charging Station,53.730537,-2.65131,168
7,Shell Recharge Charging Station,53.069547,-2.336625,328
8,GRIDSERVE Charging Station,52.872672,-2.160472,412
9,Virta Charging Station,52.510081,-1.858773,466


#GA

In [None]:
# Assigning given values
initial_soc = 0.4  # initial state of charge
average_speed = 70  # average speed in kmph
discharge_rate = 0.004/1.61  # discharge rate in SOC per km
min_soc = 0.09  # minimum SOC

# GA hyperparameters
POPULATION_SIZE = 500 # Number of individuals in each generation
MUTATION_PROB = 0.2 # Probability of mutation for each individual
CROSSOVER_PROB = 0.5 # Probability of crossover for each individual
TOURNAMENT_SIZE = 3 # Size of the tournament selection group
NUMBER_OF_GENERATIONS = 150  # Total number of generations for the evolution
MUTATION_CHANCE = 0.05  # Chance to mutate each decision
STOP_PENALTY = 5  # time penalty for each stop in minutes

#Interpolates the charging time given a starting and ending state of charge.
def interpolate_charge_time(start_soc, end_soc):

    if start_soc > end_soc or start_soc < min(soc) or end_soc > max(soc):
        return 1e6  # Assign a high penalty if charging to the desired SOC is impossible

    # Locate nearest indices for the provided states of charge
    start_idx = np.searchsorted(soc, start_soc, side='left')
    if soc[start_idx] > start_soc and start_idx > 0:
        start_idx -= 1

    end_idx = np.searchsorted(soc, end_soc, side='left')
    if soc[end_idx] < end_soc and end_idx < len(soc) - 1:
        end_idx += 1

    if start_idx == end_idx:
        return 0  # Start and end SOC are the same

    # Extract values for interpolation
    start_min = minutes[start_idx]
    end_min = minutes[end_idx]
    start_soc_val = soc[start_idx]
    end_soc_val = soc[end_idx]

    # Linear interpolation formula: y = y1 + ((y2 - y1) / (x2 - x1)) * (x - x1)

    # Time interpolation at the start and end SOC
    start_time = start_min + ((minutes[start_idx+1] - start_min) / (soc[start_idx+1] - start_soc_val)) * (start_soc - start_soc_val)
    end_time = end_min + ((minutes[end_idx-1] - end_min) / (soc[end_idx-1] - end_soc_val)) * (end_soc - end_soc_val)

    return (end_time - start_time)



# Define the fitness and individual classes
if "FitnessMin" in creator.__dict__:
    del creator.__dict__["FitnessMin"]

# Define a minimization fitness class for optimization
creator.create('FitnessMin', base.Fitness, weights=(-1.0,))

if "Individual" in creator.__dict__:
    del creator.__dict__["Individual"]

# Create a new individual class with specified attributes
# including fitness, charging times, charging stations, SOC (state of charge) before and after charging
creator.create('Individual', list, fitness=creator.FitnessMin, charging_times=list(), charging_stations=list(), socs_before_charging=list(), socs_after_charging=list())

# Create a toolbox to hold functions and parameters

toolbox = base.Toolbox()

# Define how individuals are initialized
def initialize_individual():
    individual = creator.Individual()
    for _ in range(len(station_df)):
        decision = toolbox.charging_decision()
        individual.append(decision)
    return individual

# Register functions for generating individuals and populations
toolbox.register('individual', initialize_individual)
toolbox.register('population', tools.initRepeat, list, toolbox.individual)

# Register the charging decision generation function
toolbox.register('charging_decision', random.randint, 0, 1)

# Fitness evaluation function.
def evaluate(individual):
    total_time = 0
    soc = initial_soc
    previous_location = 0
    charging_times = []
    charging_stations = []
    socs_before_charging = []
    socs_after_charging = []
    stop_penalty = STOP_PENALTY

    # Loop through each decision in the individual's charging plan
    for i, decision in enumerate(individual):
        stop = decision # Extract the charging decision for this station

        # Driving time to next station
        distance = station_df.loc[i, 'location(km)'] - previous_location
        driving_time = (distance / average_speed) * 60  # Convert hours to minutes
        total_time += driving_time

        # Update SOC and location
        soc -= distance * discharge_rate
        previous_location = station_df.loc[i, 'location(km)']

        # Check if the SOC is enough to reach the station
        if soc < min_soc:
            # print(f"Individual {individual} failed because SOC is below minimum")

            return float('inf'),  # Return very high fitness value

        if stop:
            # Generate a new charging target between current SOC and 1

            new_charging_target = random.uniform(soc, 1)


            # Cannot charge if the new target SOC is less than the current SOC
            if new_charging_target <= soc:
                # print(f"Individual {individual} failed because the new target SOC is less than the current SOC")

                return float('inf'),  # Return very high fitness value

            # Record SOC before charging
            socs_before_charging.append(soc)

            charging_time = interpolate_charge_time(soc, new_charging_target)
            charging_time += stop_penalty  # Add stop penalty to charging time
            total_time += charging_time  # Add the charging time (including the stop penalty) to the total time

            charging_times.append(charging_time)  # Record charging time (including the stop penalty)
            charging_stations.append(i)

            soc = new_charging_target  # Update SOC

            # Record SOC after charging
            socs_after_charging.append(soc)

    # Assign charging-related data to the individual
    individual.charging_times = charging_times
    individual.charging_stations = charging_stations
    individual.socs_before_charging = socs_before_charging
    individual.socs_after_charging = socs_after_charging

    return total_time,



# Mutation
def mutate(individual):
    for i in range(len(individual)):
        decision = individual[i]
        if random.random() < MUTATION_CHANCE:
            new_decision = 1 - decision # Flip the decision (0 to 1 or 1 to 0)
            individual[i] = new_decision
    return individual,

# Register the mutation function in the toolbox
toolbox.register('mutate', mutate)

# Mate individuals
def mate(ind1, ind2):
    for i in range(len(ind1)):
        if random.random() < 0.5:
            ind1[i], ind2[i] = ind2[i], ind1[i]
    return ind1, ind2

# Register the mate function in the toolbox
toolbox.register('mate', mate)

# Define the genetic operators
toolbox.register('evaluate', evaluate)
toolbox.register('select', tools.selTournament, tournsize = TOURNAMENT_SIZE)

# Define the main function to run the genetic algorithm
def main():
    # Initialize a population of individuals with specified size
    pop = toolbox.population(n=POPULATION_SIZE)

    # Create a Hall of Fame to store the best individual
    hof = tools.HallOfFame(1)

    # Create statistics object to track average and minimum fitness values
    stats = tools.Statistics(lambda ind: ind.fitness.values)
    stats.register('Avg', np.mean)
    stats.register('Min', np.min)

    # Run the evolution process using eaSimple algorithm
    pop, log = algorithms.eaSimple(pop, toolbox, cxpb=CROSSOVER_PROB, mutpb=MUTATION_PROB, ngen=NUMBER_OF_GENERATIONS, stats=stats, halloffame=hof, verbose=True)  # Use the hyperparameters here

    # Get the best individual
    best_individual = hof[0]
    total_travel_time_mins = best_individual.fitness.values[0]
    total_travel_time = f'{int(total_travel_time_mins // 60)} hours {int(total_travel_time_mins % 60)} minutes'

    # Print the total travel time
    print(f'Total travel time: {total_travel_time}')

    # Print charging times and stations
    print('Charging times: ', best_individual.charging_times)
    print('Charging stations: ', best_individual.charging_stations)

    return pop, log, hof


# Run the GA
pop, log, hof = main()

# Extract the best individual
best_individual = hof[0]

# Calculate total driving and charging time
total_charging_time_mins = sum(best_individual.charging_times)
total_driving_time_mins = best_individual.fitness.values[0] - total_charging_time_mins

print(f"Total driving time: {int(total_driving_time_mins // 60)} hours {int(total_driving_time_mins % 60)} minutes")
print(f"Total charging time: {int(total_charging_time_mins // 60)} hours {int(total_charging_time_mins % 60)} minutes")

# Station Names
station_names = [station_df.loc[i, 'name'] for i in best_individual.charging_stations]  # Assuming you have a column 'name' in station_df



# Prepare data for DataFrame
data = {
    'Charging Stations': station_names,
    'Charging Time(minute)': best_individual.charging_times,
    'SOCs Before Charging': best_individual.socs_before_charging,
    'SOCs After Charging': best_individual.socs_after_charging
}

# Create DataFrame
output_df = pd.DataFrame(data)
# Extract the distance information
distances = [station_df.loc[i, 'location(km)'] for i in best_individual.charging_stations]

# Add the distance information to the DataFrame
output_df['Distance (km)'] = distances

if best_individual.charging_stations:
    last_charging_location = station_df.loc[best_individual.charging_stations[-1], 'location(km)']
else:
    last_charging_location = 0  # if no charging stations were used

remaining_distance = total_distance - last_charging_location
soc_consumed = remaining_distance * discharge_rate

if best_individual.socs_after_charging:
    final_soc = best_individual.socs_after_charging[-1] - soc_consumed
else:
    final_soc = initial_soc - soc_consumed

print(f"State of Charge upon reaching final destination: {final_soc * 100:.2f}%")

display(output_df)


gen	nevals	Avg	Min    
0  	500   	inf	475.176
1  	302   	inf	475.176
2  	308   	inf	475.176
3  	324   	inf	475.176
4  	309   	inf	478.81 
5  	304   	inf	477.977
6  	315   	inf	477.048
7  	302   	inf	473.929
8  	328   	inf	473.929
9  	294   	inf	474.577
10 	306   	inf	476.397
11 	288   	inf	472.098
12 	306   	inf	472.098
13 	311   	inf	472.098
14 	300   	inf	472.098
15 	320   	inf	472.35 
16 	298   	inf	471.353
17 	295   	inf	472.999
18 	308   	inf	472.899
19 	303   	inf	472.899
20 	312   	inf	471.715
21 	287   	inf	471.715
22 	297   	inf	471.715
23 	292   	inf	471.715
24 	290   	inf	471.715
25 	293   	inf	471.715
26 	297   	inf	471.715
27 	298   	inf	471.715
28 	307   	inf	471.715
29 	301   	inf	471.715
30 	286   	inf	471.715
31 	302   	inf	471.715
32 	264   	inf	471.715
33 	303   	inf	471.433
34 	317   	inf	471.433
35 	282   	inf	471.715
36 	312   	inf	471.715
37 	270   	inf	471.715
38 	299   	inf	471.715
39 	308   	inf	471.715
40 	311   	inf	471.715
41 	295   	inf	471.715
42 	312   	

Unnamed: 0,Charging Stations,Charging Time(minute),SOCs Before Charging,SOCs After Charging,Distance (km)
0,Applegreen Charging Station,29.284411,0.290683,0.826371,44
1,Shell Recharge Charging Station,20.691388,0.120781,0.494609,328


In [None]:
m = folium.Map(location=[52.3555, -1.1743], zoom_start=6)  # Initial coordinates set to center of UK

# Add charging stations to the map
for idx in best_individual.charging_stations:
    lat, long = station_df.loc[idx, ['latitude', 'longitude']]  # Assuming you have latitude and longitude columns in station_df
    folium.Marker([lat, long], tooltip=station_df.loc[idx, 'name']).add_to(m)

m.save("map.html")
print("Map saved as map.html")


Map saved as map.html
