In [90]:
import pandas as pd
import requests
import folium
import polyline
import time
from geopy.distance import geodesic

In [104]:
# ======= CONFIGURATION =======
API_KEY = 'AIzaSyACpYMHnmkd8DWxWS3KTJ70EeKIRYN2xHM' # Replace with your actual API key
# FILE_PATH = 'data/Gps-Collection.csv'  # Path to your Excel file
FILE_PATH = 'data/Workbook.csv'  # Path to your Excel file
ACCURACY_THRESHOLD = 50 # Max acceptable GPS accuracy (in meters)
MIN_MOVE_DISTANCE_M = 15  # Minimum movement to keep consecutive point
MAX_SNAP_DISTANCE_M = 5  # Max snap-to-road distance allowed

In [105]:
# Step 1: Load and filter by GPS accuracy
df = pd.read_csv(FILE_PATH)
df.columns = df.columns.str.strip()
# Convert timestamps
df['DateTime'] = pd.to_datetime(df['DateTime'], format='%d/%m/%Y %I:%M:%S %p')
df_filtered = df[df['Accuracy'] <= ACCURACY_THRESHOLD]
df_filtered = df_filtered[df_filtered['Provider'] == 'gps']  # Optional: Use GPS provider only
df_filtered = df_filtered.sort_values('TimeStamp')

In [106]:
raw_points = list(zip(df_filtered['Latitude'], df_filtered['Longitude']))
print(f"Loaded {len(raw_points)} raw points (accuracy ≤ {ACCURACY_THRESHOLD})")

Loaded 99147 raw points (accuracy ≤ 50)


In [107]:
df.columns

Index(['Id', 'Longitude', 'Latitude', 'Accuracy', 'TimeStamp', 'Date',
       'DateTime', 'IsGPSOn', 'BatLevel', 'Provider', 'Bearing', 'DeviceTime',
       'RepId'],
      dtype='object')

In [108]:
DISTANCE_THRESHOLD_METERS = 50  # Minimum distance to retain a point
# === STEP 2: REMOVE CONCENTRATED POINTS ===
cleaned_points = []
last_point = None

for _, row in df_filtered.iterrows():
    current_point = (row['Latitude'], row['Longitude'])
    if last_point is None:
        cleaned_points.append(row)
        last_point = current_point
    else:
        distance = geodesic(last_point, current_point).meters
        if distance >= DISTANCE_THRESHOLD_METERS:
            cleaned_points.append(row)
            last_point = current_point

# === STEP 3: SAVE CLEANED DATA ===
cleaned_df = pd.DataFrame(cleaned_points)
cleaned_df.shape

(299, 13)

In [109]:
raw_points = list(zip(cleaned_df['Latitude'], cleaned_df['Longitude']))
print(f"Loaded {len(raw_points)} raw points (accuracy ≤ {ACCURACY_THRESHOLD})")

Loaded 299 raw points (accuracy ≤ 50)


In [110]:
def visualize_all_locations(df):
    """
    Visualize all locations from a CSV file on a folium map.
    :param csv_file_path: Path to the CSV file containing location data
    :return: folium.Map object
    """
    # Load the CSV data into a DataFrame
    # try:
    #     df = df
    # except Exception as e:
    #     print(f"Error loading CSV file {csv_file_path}: {e}")
    #     return None
    try:
        # Create a map centered at the mean of all latitudes and longitudes
        map_center = [df['Latitude'].mean(), df['Longitude'].mean()]
        m = folium.Map(location=map_center, zoom_start=10)
    except Exception as e:
        print(f"Error getting gps data: {e}")
        return None

    # Add a marker for each location
    for _, row in df.iterrows():
        folium.Marker(
            location=[row['Latitude'], row['Longitude']],
            # popup=f"CODE: {row['CODE']}<br>LOCATION: {row['LOCATION']}<br>ADDRESS: {row['ADDRESS']}<br>BRAND: {row['BRAND']}",
            popup=f"Accuracy: {row['Accuracy']}<br>Latitude: {row['Latitude']}<br>Longitude: {row['Longitude']}<br>DeviceTime: {row['DeviceTime']}",
            icon=folium.Icon(color='blue', icon='info-sign')
        ).add_to(m)

    # Add title
    title_html = '<h3 align="center" style="font-size:16px">All Locations Map</h3>'
    m.get_root().html.add_child(folium.Element(title_html))

    return m

In [111]:
visualize_all_locations(cleaned_df)

In [112]:
# Step 3: Snap to nearest road, reject if too far from road
def snap_to_nearest_road_filtered(points, max_snap_distance_m=15):
    snapped = []
    for i in range(0, len(points), 100):
        batch = points[i:i+100]
        path = "|".join([f"{lat},{lng}" for lat, lng in batch])
        url = f"https://roads.googleapis.com/v1/snapToRoads?path={path}&interpolate=false&key={API_KEY}"
        r = requests.get(url)
        data = r.json()

        if 'snappedPoints' in data:
            for p in data['snappedPoints']:
                if 'originalIndex' not in p:
                    continue
                idx = p['originalIndex']
                original = batch[idx]
                snapped_point = (p['location']['latitude'], p['location']['longitude'])

                distance = geodesic(original, snapped_point).meters
                if distance <= max_snap_distance_m:
                    snapped.append(snapped_point)
        else:
            print("Snap error:", data.get("status"), data.get("error_message"))
        time.sleep(0.1)
    return snapped

In [113]:
snapped_path =snap_to_nearest_road_filtered(raw_points)

In [114]:
"""View snapped path on map"""
def visualize_snapped_path(snapped_path):
    """
    Visualize the snapped path on a folium map.
    :param snapped_path: List of snapped points
    :return: folium.Map object
    """
    try:
        # Create a map centered at the mean of all latitudes and longitudes
        map_center = [sum(p[0] for p in snapped_path) / len(snapped_path),
                      sum(p[1] for p in snapped_path) / len(snapped_path)]
        m = folium.Map(location=map_center, zoom_start=10)
    except Exception as e:
        print(f"Error getting gps data: {e}")
        return None

    # Add a marker for each snapped point
    for point in snapped_path:
        folium.Marker(
            location=point,
            popup=f"Latitude: {point[0]}<br>Longitude: {point[1]}",
            icon=folium.Icon(color='green', icon='info-sign')
        ).add_to(m)

    # Add title
    title_html = '<h3 align="center" style="font-size:16px">Snapped Path Map</h3>'
    m.get_root().html.add_child(folium.Element(title_html))

    return m
visualize_snapped_path(snapped_path)


In [54]:
def calculate_road_distance(snapped_points):
    total_distance = 0.0

    for i in range(len(snapped_points) - 1):
        origin = f"{snapped_points[i][0]},{snapped_points[i][1]}"
        destination = f"{snapped_points[i+1][0]},{snapped_points[i+1][1]}"
        url = f"https://maps.googleapis.com/maps/api/directions/json?origin={origin}&destination={destination}&key={API_KEY}"

        response = requests.get(url).json()
        if response['status'] == 'OK':
            legs = response['routes'][0]['legs']
            for leg in legs:
                total_distance += leg['distance']['value']  # meters
        else:
            print("Error getting directions:", response.get("status"), response.get("error_message"))
        time.sleep(0.1)  # avoid rate limiting

    return total_distance  # in meters

road_distance_m = calculate_road_distance(snapped_path)

In [55]:
road_distance_m

16154.0

In [27]:
# # Step 5A: Get total road distance and path (point-by-point)
# def get_precise_road_path(points):
#     total_km = 0
#     route_coords = []

#     for i in range(len(points) - 1):
#         origin = points[i]
#         destination = points[i + 1]

#         url = (
#             f"https://maps.googleapis.com/maps/api/directions/json?"
#             f"origin={origin[0]},{origin[1]}&destination={destination[0]},{destination[1]}"
#             f"&key={API_KEY}"
#         )

#         r = requests.get(url).json()
#         if r["status"] == "OK":
#             poly = r["routes"][0]["overview_polyline"]["points"]
#             route_coords.extend(polyline.decode(poly))
#             for leg in r["routes"][0]["legs"]:
#                 total_km += leg["distance"]["value"] / 1000  # meters to km
#         else:
#             print(f"❗ Segment error [{i}]:", r.get("status"), r.get("error_message"))
#         time.sleep(0.1)

#     return total_km, route_coords
# total_km, route_coords = get_precise_road_path(snapped_path)

from geopy.distance import geodesic

def get_precise_road_path(points, max_segment_m=2000):
    total_km = 0
    route_coords = []

    for i in range(len(points) - 1):
        origin = points[i]
        destination = points[i + 1]
        distance_m = geodesic(origin, destination).meters

        # if distance_m > max_segment_m:
        #     print(f"⚠️ Skipping segment {i} (too far: {distance_m:.1f}m)")
        #     continue

        url = (
            f"https://maps.googleapis.com/maps/api/directions/json?"
            f"origin={origin[0]},{origin[1]}&destination={destination[0]},{destination[1]}"
            f"&key={API_KEY}"
        )

        r = requests.get(url).json()
        if r.get("status") == "OK":
            try:
                poly = r["routes"][0]["overview_polyline"]["points"]
                route_coords.extend(polyline.decode(poly))
                for leg in r["routes"][0]["legs"]:
                    total_km += leg["distance"]["value"] / 1000
            except Exception as e:
                print(f"⚠️ Polyline decode error at segment {i}: {e}")
        else:
            print(f"❌ Directions error [{i}]:", r.get("status"), r.get("error_message"))

        time.sleep(0.1)

    return total_km, route_coords


In [74]:
from geopy.distance import geodesic

def filter_dense_points(points, min_distance_m=10):
    filtered = [points[0]]
    for pt in points[1:]:
        if geodesic(filtered[-1], pt).meters >= min_distance_m:
            filtered.append(pt)
    return filtered

In [75]:
def get_road_distance(points, min_distance_m=20):
    points = filter_dense_points(points, min_distance_m)
    total_km = 0
    route_coords = []

    for i in range(0, len(points) - 1, 9):
        segment = points[i:i+10]
        if len(segment) < 2:
            continue

        origin = segment[0]
        destination = segment[-1]
        waypoints = "|".join([f"via:{lat},{lng}" for lat, lng in segment[1:-1]])

        url = (
            f"https://maps.googleapis.com/maps/api/directions/json?"
            f"origin={origin[0]},{origin[1]}&destination={destination[0]},{destination[1]}"
            f"&waypoints={waypoints}&key={API_KEY}"
        )

        r = requests.get(url).json()
        if r["status"] == "OK":
            for leg in r["routes"][0]["legs"]:
                total_km += leg["distance"]["value"] / 1000
            poly = r["routes"][0]["overview_polyline"]["points"]
            route_coords.extend(polyline.decode(poly))
        else:
            print("Directions error:", r.get("status"), r.get("error_message"))
            # Fallback to straight line
            total_km += geodesic(origin, destination).km
        time.sleep(0.1)
    
    return total_km, route_coords

In [76]:
from geopy.distance import geodesic
import requests
import polyline
import time

def get_hybrid_road_path(points, max_direct_distance_m=50):
    total_km = 0
    route_coords = []

    for i in range(len(points) - 1):
        origin = points[i]
        destination = points[i + 1]
        distance_m = geodesic(origin, destination).meters

        if distance_m <= max_direct_distance_m:
            total_km += distance_m / 1000
            if not route_coords or route_coords[-1] != origin:
                route_coords.append(origin)
            route_coords.append(destination)
        else:
            # 🌐 Use Google Directions API for road-based path
            url = (
                f"https://maps.googleapis.com/maps/api/directions/json?"
                f"origin={origin[0]},{origin[1]}&destination={destination[0]},{destination[1]}"
                f"&key={API_KEY}"
            )

            r = requests.get(url).json()
            if r.get("status") == "OK":
                try:
                    poly = r["routes"][0]["overview_polyline"]["points"]
                    route_coords.extend(polyline.decode(poly))
                    for leg in r["routes"][0]["legs"]:
                        total_km += leg["distance"]["value"] / 1000
                except Exception as e:
                    print(f"⚠️ Polyline decode error at segment {i}: {e}")
            else:
                print(f"❌ Directions error [{i}]:", r.get("status"), r.get("error_message"))
                # fallback to geodesic
                total_km += distance_m / 1000
                route_coords.append(origin)
                route_coords.append(destination)

            time.sleep(0.1)  # API rate limit safety

    return total_km, route_coords

In [85]:
total_km, route_coords = get_precise_road_path(snapped_path)
total_km

72.779

In [88]:
total_km, route_coords = get_road_distance(snapped_path)
total_km

80.695

In [102]:
total_km, route_coords = get_hybrid_road_path(snapped_path)
total_km

72.8505654090216

In [103]:
"""Visualize route_coords on map"""
def visualize_route_coords(route_coords):
    """
    Visualize the route coordinates on a folium map.
    :param route_coords: List of route coordinates
    :return: folium.Map object
    """
    try:
        # Create a map centered at the mean of all latitudes and longitudes 
        map_center = [sum(p[0] for p in route_coords) / len(route_coords),
                      sum(p[1] for p in route_coords) / len(route_coords)]
        m = folium.Map(location=map_center, zoom_start=10)
    except Exception as e:
        print(f"Error getting gps data: {e}")
        return None

    # Add a marker for each route coordinate
    for point in route_coords:
            folium.Marker(
                location=point,
                popup=f"Latitude: {point[0]}<br>Longitude: {point[1]}",
                icon=folium.Icon(color='red', icon='info-sign')
            ).add_to(m)
            
    # Add title
    title_html = '<h3 align="center" style="font-size:16px">Route Coordinates Map</h3>'
    m.get_root().html.add_child(folium.Element(title_html))

    return m
visualize_route_coords(route_coords)

In [31]:
def snap_to_roads_full_path(points):
    snapped = []
    for i in range(0, len(points), 100):
        batch = points[i:i+100]
        path = "|".join([f"{lat},{lng}" for lat, lng in batch])
        url = f"https://roads.googleapis.com/v1/snapToRoads?path={path}&interpolate=true&key={API_KEY}"
        r = requests.get(url)
        data = r.json()
        if 'snappedPoints' in data:
            snapped.extend([
                (p['location']['latitude'], p['location']['longitude'])
                for p in data['snappedPoints']
            ])
        else:
            print("Snap error:", data.get("status"), data.get("error_message"))
        time.sleep(0.1)
    return snapped

snapped_path =snap_to_roads_full_path(raw_points)

KeyboardInterrupt: 

In [32]:
visualize_snapped_path(snapped_path)

In [32]:
# Step 5: Visualize accurate road-following path
def get_directions_geometry(points):
    route_coords = []
    for i in range(0, len(points) - 1, 20):
        segment = points[i:i+20]
        origin = segment[0]
        destination = segment[-1]
        waypoints = "|".join([f"via:{lat},{lng}" for lat, lng in segment[1:-1]])
        url = (
            f"https://maps.googleapis.com/maps/api/directions/json?"
            f"origin={origin[0]},{origin[1]}&destination={destination[0]},{destination[1]}"
            f"&waypoints={waypoints}&key={API_KEY}"
        )
        r = requests.get(url).json()
        if r["status"] == "OK":
            poly = r["routes"][0]["overview_polyline"]["points"]
            route_coords.extend(polyline.decode(poly))
        else:
            print("Polyline error:", r.get("status"), r.get("error_message"))
        time.sleep(0.1)
    return route_coords

route_coords = get_directions_geometry(snapped_path)

In [33]:
visualize_route_coords(route_coords)