<a href="https://colab.research.google.com/github/shivanshgarg22/Sketch-Ai/blob/main/Untitled19.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [16]:
import math
import csv
import os
import time
import datetime
from heapq import heappush, heappop
from collections import defaultdict

# ======================
# DATA LAYER
# ======================
class TransportDataManager:
    """Handles geospatial data storage and retrieval"""

    def __init__(self):
        self.cities = self._load_city_data()
        self.road_network = self._load_road_network()
        self.historical_data = self._init_historical_data()

    def _load_city_data(self):
        """Load city coordinates"""
        return {
            "Mumbai": (19.08, 72.88), "Thane": (19.22, 72.98),
            "Pune": (18.52, 73.86), "Nagpur": (21.15, 79.09),
            "Nashik": (20.00, 73.79), "Aurangabad": (19.88, 75.34),
            "Solapur": (17.66, 75.91), "Amravati": (20.94, 77.78),
            "Kolhapur": (16.71, 74.24), "Sangli": (16.85, 74.58),
            "Latur": (18.40, 76.60), "Dhule": (20.90, 74.77),
            "Ahmednagar": (19.08, 74.80), "Chandrapur": (19.96, 79.30),
            "Jalgaon": (21.08, 75.67), "Nanded": (19.14, 77.32),
            "Satara": (17.66, 75.91), "Wardha": (20.75, 78.65),
            "Yavatmal": (20.38, 78.18), "Parbhani": (19.13, 76.78),
            "Osmanabad": (18.18, 76.04), "Malegaon": (20.55, 74.53),
            "Bhusawal": (21.03, 75.78), "Ratnagiri": (16.99, 73.30),
            "Alibag": (18.64, 72.87), "Sindhudurg": (16.15, 73.68),
            "Pimpri-Chinchwad": (18.63, 73.80), "Ulhasnagar": (19.22, 73.15),
            "Bhiwandi": (19.30, 73.07), "Mira-Bhayandar": (19.30, 72.85),
            "Panvel": (18.99, 73.11), "Jalna": (19.85, 75.89),
            "Barshi": (18.23, 75.68), "Akola": (20.70, 77.03),
            "Gondia": (21.46, 80.19)
        }

    def _load_road_network(self):
        """Simulate real highway network"""
        return {
            "Mumbai": ["Thane", "Panvel", "Pune"],
            "Thane": ["Mumbai", "Ulhasnagar", "Bhiwandi"],
            "Panvel": ["Mumbai", "Pune", "Alibag"],
            "Pune": ["Mumbai", "Panvel", "Satara", "Ahmednagar"],
            "Nagpur": ["Amravati", "Wardha", "Chandrapur"],
            "Nashik": ["Mumbai", "Dhule", "Ahmednagar"],
            "Aurangabad": ["Ahmednagar", "Jalna", "Parbhani"],
            "Solapur": ["Pune", "Osmanabad", "Barshi"],
            "Amravati": ["Akola", "Yavatmal", "Wardha"],
            "Kolhapur": ["Satara", "Sangli", "Ratnagiri"],
            "Sangli": ["Kolhapur", "Satara", "Solapur"],
            "Latur": ["Osmanabad", "Nanded", "Barshi"],
            "Dhule": ["Nashik", "Jalgaon", "Bhusawal"],
            "Ahmednagar": ["Pune", "Nashik", "Aurangabad"],
            "Chandrapur": ["Nagpur", "Gondia", "Yavatmal"],
            "Jalgaon": ["Dhule", "Bhusawal", "Akola"],
            "Nanded": ["Parbhani", "Latur", "Yavatmal"],
            "Satara": ["Pune", "Kolhapur", "Sangli"],
            "Wardha": ["Nagpur", "Amravati", "Yavatmal"],
            "Yavatmal": ["Amravati", "Wardha", "Nanded"],
            "Parbhani": ["Aurangabad", "Nanded", "Jalna"],
            "Osmanabad": ["Solapur", "Aurangabad", "Latur"],
            "Malegaon": ["Nashik", "Dhule"],
            "Bhusawal": ["Dhule", "Jalgaon"],
            "Ratnagiri": ["Kolhapur", "Sindhudurg"],
            "Alibag": ["Panvel", "Ratnagiri"],
            "Sindhudurg": ["Ratnagiri"],
            "Pimpri-Chinchwad": ["Pune"],
            "Ulhasnagar": ["Thane"],
            "Bhiwandi": ["Thane", "Mumbai"],
            "Mira-Bhayandar": ["Mumbai"],
            "Panvel": ["Mumbai"],
            "Jalna": ["Aurangabad"],
            "Barshi": ["Solapur", "Latur"],
            "Akola": ["Jalgaon", "Amravati"],
            "Gondia": ["Chandrapur"]
        }

    def _init_historical_data(self):
        """Initialize historical trip database"""
        if os.path.exists('trips.db'):
            return self._load_historical_data()
        return {
            'headers': ['timestamp', 'source', 'dest', 'path', 'distance', 'time'],
            'records': []
        }

    def _load_historical_data(self):
        """Load historical trip data from file"""
        data = {
            'headers': ['timestamp', 'source', 'dest', 'path', 'distance', 'time'],
            'records': []
        }
        try:
            with open('trips.db', 'r') as f:
                reader = csv.DictReader(f)
                data['records'] = list(reader)
        except Exception as e:
            print(f"Error loading historical data: {e}")
        return data

    def add_trip_record(self, trip_data):
        """Store completed trip data"""
        self.historical_data['records'].append(trip_data)
        self._persist_data()

    def _persist_data(self):
        """Save data to CSV format"""
        with open('trips.db', 'w') as f:
            writer = csv.DictWriter(f, fieldnames=self.historical_data['headers'])
            writer.writeheader()
            writer.writerows(self.historical_data['records'])

# ======================
# PATHFINDING ENGINE
# ======================
class PathfindingEngine:
    """Handles route calculation using A* with real-time factors"""

    def __init__(self, data_manager):
        self.dm = data_manager
        self.traffic_cache = {}

    def haversine(self, coord1, coord2):
        """Calculate great-circle distance between two points"""
        lat1, lon1 = math.radians(coord1[0]), math.radians(coord1[1])
        lat2, lon2 = math.radians(coord2[0]), math.radians(coord2[1])
        dlat, dlon = lat2 - lat1, lon2 - lon1
        a = math.sin(dlat/2)**2 + math.cos(lat1)*math.cos(lat2)*math.sin(dlon/2)**2
        return 6371 * 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))

    def get_traffic_factor(self, city):
        """Simulate real-time traffic data"""
        now = datetime.datetime.now().hour
        if 7 <= now < 11 or 17 <= now < 21:
            return min(0.8, 0.4 + (abs(12 - now)/12))
        return 0.2

    def a_star_search(self, start, end):
        """Optimized A* implementation with priority queue"""
        open_heap = []
        came_from = {}
        g_scores = defaultdict(lambda: math.inf)

        start_coord = self.dm.cities[start]
        end_coord = self.dm.cities[end]

        g_scores[start] = 0
        f_score = self.haversine(start_coord, end_coord)
        heappush(open_heap, (f_score, start))

        while open_heap:
            current = heappop(open_heap)[1]

            if current == end:
                return self._reconstruct_path(came_from, current)

            for neighbor in self.dm.road_network.get(current, []):
                if neighbor not in self.dm.cities:
                    continue

                # Calculate edge weight with traffic
                base_dist = self.haversine(self.dm.cities[current], self.dm.cities[neighbor])
                traffic = self.get_traffic_factor(neighbor)
                tentative_g = g_scores[current] + base_dist * (1 + traffic)

                if tentative_g < g_scores[neighbor]:
                    came_from[neighbor] = current
                    g_scores[neighbor] = tentative_g
                    f_score = tentative_g + self.haversine(
                        self.dm.cities[neighbor],
                        self.dm.cities[end]
                    )
                    heappush(open_heap, (f_score, neighbor))

        return None

    def _reconstruct_path(self, came_from, current):
        path = [current]
        while current in came_from:
            current = came_from[current]
            path.append(current)
        path.reverse()
        return path

# ======================
# MACHINE LEARNING MODULE
# ======================
class TravelTimePredictor:
    """Time prediction using average speed and real distances"""
    def __init__(self, data_manager):
        self.dm = data_manager

    def predict(self, path, pathfinder):
        """Predict travel time using realistic distance and speed"""
        total_dist = sum(
            pathfinder.haversine(self.dm.cities[path[i]], self.dm.cities[path[i+1]])
            for i in range(len(path)-1)
        )
        # Assume average speed 45 km/h for highways, adjust as needed
        avg_speed = 80.0
        return total_dist / avg_speed if avg_speed > 0 else 0


# ======================
# SYSTEM INTEGRATION
# ======================
class TransportSystem:
    """Main application controller"""

    def __init__(self):
        self.data_manager = TransportDataManager()
        self.pathfinder = PathfindingEngine(self.data_manager)
        self.predictor = TravelTimePredictor(self.data_manager)

    def find_route(self, start, end):
        if start not in self.data_manager.cities:
            return {"error": f"Unknown city: {start}"}
        if end not in self.data_manager.cities:
            return {"error": f"Unknown city: {end}"}

        path = self.pathfinder.a_star_search(start, end)
        if not path:
            return {"error": "No route found"}

        distance = sum(
            self.pathfinder.haversine(
                self.data_manager.cities[path[i]],
                self.data_manager.cities[path[i+1]]
            ) for i in range(len(path)-1)
        )

        predicted_time = self.predictor.predict(path,self.pathfinder)

        self.data_manager.add_trip_record({
            'timestamp': time.time(),
            'source': start,
            'dest': end,
            'path': '->'.join(path),
            'distance': round(distance, 2),
            'time': round(predicted_time, 2)
        })

        return {
            'route': path,
            'distance_km': round(distance, 2),
            'predicted_hours': round(predicted_time, 2),
            'traffic_advisories': self._generate_traffic_advisories(path)
        }

    def _generate_traffic_advisories(self, path):
      advisories = []
      for city in path:
        traffic = self.pathfinder.get_traffic_factor(city)
        if traffic > 0.5:
            advisories.append(f"Heavy traffic in {city} (Level {int(traffic*100)}%)")
        elif traffic > 0.2:
            advisories.append(f"Moderate traffic in {city}")
        else:
            advisories.append(f"Light traffic in {city}")
      return advisories



# ======================
# EXECUTION
# ======================
if __name__ == "__main__":
    system = TransportSystem()

    # Test case: Mumbai to Pune
    result = system.find_route("Nagpur", "Amravati")
    print("\n=== Route Recommendation ===")
    print(f"Optimal Path: {' → '.join(result['route'])}")
    print(f"Total Distance: {result['distance_km']} km")
    print(f"Predicted Time: {result['predicted_hours']} hours")
    print("\nTraffic Advisories:")
    for advisory in result['traffic_advisories']:
        print(f" - {advisory}")



=== Route Recommendation ===
Optimal Path: Nagpur → Amravati
Total Distance: 137.94 km
Predicted Time: 1.72 hours

Traffic Advisories:
 - Light traffic in Nagpur
 - Light traffic in Amravati
