In [1]:
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from pyspark.sql.functions import concat, to_timestamp, col, lit

In [2]:

# Import necessary libraries
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum as spark_sum, col, hour, concat_ws, to_date, date_format
# Stop any existing Spark session
# Step 1: Initialize a Spark session
spark = SparkSession.builder \
    .appName("BigDataProcessing") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()
# Step 2: Load your CSV file into a Spark DataFrame
data = spark.read.csv("June2024.csv", header=True, inferSchema=True)


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/26 22:30:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/12/26 22:31:01 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
                                                                                

In [3]:
from pyspark.sql.functions import to_timestamp, concat, col, lit, date_format, expr
# Filter rows where ROUTE_ID is 'acwXkRFM'
# data = data.filter(col("ROUTE_ID") == 'acwXkRFM')
# Step 2: Format TICKET_ISSUE_TIME as a string in "HH:mm:ss" format (if not already) and combine date and time
data = data.withColumn("TICKET_ISSUE_TIME_STR", date_format(col("TICKET_ISSUE_TIME"), "HH:mm:ss"))
data = data.withColumn("TICKET_DATETIME_STR", concat(col("TICKET_ISSUE_DATE"), lit(" "), col("TICKET_ISSUE_TIME_STR")))

# Step 3: Convert the combined date-time string to a timestamp format
data = data.withColumn("TICKET_DATETIME", to_timestamp("TICKET_DATETIME_STR", "dd/MM/yyyy HH:mm:ss"))

# Step 4: Round TICKET_DATETIME to the nearest 10-minute interval
data = data.withColumn("TICKET_DATETIME_15MIN", expr("date_trunc('minute', TICKET_DATETIME) + INTERVAL 15 MINUTE * floor(minute(TICKET_DATETIME) / 15)"))


In [4]:
# # Group by TICKET_DATETIME, FROM_STOP_NAME, and TO_STOP_NAME, and sum the TOTAL_PASSENGER
aggregated_data_from = (
    data.groupBy("TICKET_DATETIME_15MIN", "FROM_STOP_NAME")
    .sum("TOTAL_PASSENGER")
    .withColumnRenamed("sum(TOTAL_PASSENGER)", "TOTAL_PASSENGER")
)
# Sort by TICKET_DATETIME_5MIN
aggregated_data_from = aggregated_data_from.orderBy("TICKET_DATETIME_15MIN")
# aggregated_data_from.show(50)


In [5]:
# !pip install folium geopy pyspark


In [6]:
from pyspark.sql import functions as F
import folium
import pandas as pd
from geopy.geocoders import Nominatim
from folium.plugins import MarkerCluster
import time

# Define the specific time range
start_time = "2024-06-01 04:45:00"
end_time = "2024-06-01 12:45:00"

# Aggregate data to get total passenger count per bus stop within the time range
# Sort by total passengers in descending order and select the top 20
top_bus_stops = (
    aggregated_data_from.groupBy("FROM_STOP_NAME")
    .agg(F.sum("TOTAL_PASSENGER").alias("TOTAL_PASSENGER"))
    .orderBy("TOTAL_PASSENGER", ascending=False)
    .limit(1000)
    .collect()
)


                                                                                

In [11]:
import json
import time
from geopy.geocoders import Nominatim

# Latitude and Longitude bounds for South India
SOUTH_INDIA_LAT_MIN = 8.0
SOUTH_INDIA_LAT_MAX = 14.5
SOUTH_INDIA_LON_MIN = 76.0
SOUTH_INDIA_LON_MAX = 85.0

# File to store previously geocoded bus stops
GEO_CACHE_FILE = 'geocoded_stops.json'
FAILURE_CACHE_FILE = 'geocoding_failures.json'
# Function to load previously cached geocoded data from a JSON file
def load_geocoded_data():
    try:
        with open(GEO_CACHE_FILE, 'r') as f:
            return json.load(f)
    except FileNotFoundError:
        return {}

# Function to save geocoded data to a JSON file
def save_geocoded_data(data):
    with open(GEO_CACHE_FILE, 'w') as f:
        json.dump(data, f, indent=4)

# Function to check if a location is in South India
def is_in_south_india(latitude, longitude):
    return (SOUTH_INDIA_LAT_MIN <= latitude <= SOUTH_INDIA_LAT_MAX) and (SOUTH_INDIA_LON_MIN <= longitude <= SOUTH_INDIA_LON_MAX)

def load_geocoded_failures():
    try:
        with open(FAILURE_CACHE_FILE, 'r') as f:
            return json.load(f)
    except FileNotFoundError:
        return {}

# Prepare data for geocoding (replace 'top_bus_stops' with your actual data)
bus_stops_data = [{"stop_name": row["FROM_STOP_NAME"], "passenger_count": row["TOTAL_PASSENGER"]} for row in top_bus_stops]

# Initialize the geocoder
geolocator = Nominatim(user_agent="bus_stop_locator")

# Load previously geocoded data from the cache
cached_data = load_geocoded_data()
cached_failure_data = load_geocoded_failures()
# Initialize counters for success and failure
success_count = 0
failure_count = 0
failures = []

# Geocode each bus stop with a timeout and sleep to avoid too many requests
for stop in bus_stops_data:
    stop_name = stop["stop_name"]
    # Skip if the stop is in the failures list
    
    # Check if the stop has already been geocoded
    if stop_name in cached_data:
        # Use cached data
        stop["latitude"] = cached_data[stop_name]["latitude"]
        stop["longitude"] = cached_data[stop_name]["longitude"]
        success_count += 1  # Increment success count for cached data
        continue  # Skip geocoding since it's already cached
    elif stop_name in cached_failure_data:
        # Skip geocoding if it previously failed
        failure_count += 1
        continue
    # If not cached, geocode this stop
    try:
        # Modify the stop name query to limit geocoding to South India
        location = geolocator.geocode(f"{stop_name}", timeout=20)
        
        if location:
            latitude = location.latitude
            longitude = location.longitude

            # Check if the coordinates are within South India's bounds
            if is_in_south_india(latitude, longitude):
                stop["latitude"] = latitude
                stop["longitude"] = longitude
                # Save the geocoded result in the cache
                cached_data[stop_name] = {"latitude": stop["latitude"], "longitude": stop["longitude"]}
                success_count += 1  # Increment success count
            else:
                # If outside South India, mark as None
                stop["latitude"] = None
                stop["longitude"] = None
                failure_count += 1  # Increment failure count
                failures.append(f"{stop_name} (outside South India)")
        else:
            stop["latitude"] = None
            stop["longitude"] = None
            failure_count += 1  # Increment failure count
            failures.append(stop_name)
    except Exception as e:
        print(f"Error geocoding {stop_name}: {e}")
        stop["latitude"] = None
        stop["longitude"] = None
        failures.append(stop_name)
        failure_count += 1  # Increment failure count

    # Pause for 1 second between requests to avoid hitting API rate limits
    time.sleep(1)

# Save the updated geocoded data to the cache file
save_geocoded_data(cached_data)

# Output the number of successes and failures
print(f"Geocoding Successes: {success_count}")
print(f"Geocoding Failures: {failure_count}")
if failures:
    print("Failed to geocode the following bus stops:")
    print(failures)
    # File to store geocoding failures
    

    # Function to save geocoding failures to a JSON file
    def save_geocoding_failures(failures):
        with open(FAILURE_CACHE_FILE, 'w') as f:
            json.dump(failures, f, indent=4)

    # Save the geocoding failures to the cache file
    save_geocoding_failures(failures)

Geocoding Successes: 549
Geocoding Failures: 451
Failed to geocode the following bus stops:
['East Fort North Bus Stand', 'Thampanoor Main Bus Stand', 'East South Fort Bus Stand', 'Pattom Sut Office', 'East Fort South Bus Stand', 'Statue Sbi (outside South India)', 'Statue Sbi Or Secretariat', 'Venjarammoodu Depot', 'Karamana Junction', 'Chakai (outside South India)', 'World Market (outside South India)', 'Njadoorkonam', 'Shangumukham Beach', 'Vellayambalam Elankim Devi', 'Sree Karyam', 'Kseb Chakkai', 'Rotary (outside South India)', 'Easwara Vilasam Cotton Hill School', 'Radio Station Monvila', '16th Mile (outside South India)', 'Aiyroopara', 'Chellamagalam Panjayath', 'Beema Pally', 'Shangumukham', 'Mangalapuram East', 'Valiyathura Shangumugam', 'Korani Junction', 'Kachani Junction', 'Crpf Gate', 'Panangodu Junction', 'Agricultural College Poonkulam', 'Punchakkari', 'Kalliyoor Grama Panchayath Office', 'Thekkada Junction', 'Vizhinjam Bus Stand', 'Pullampara Palam', 'Kazhakkoottam Rai

In [26]:
from folium.plugins import HeatMap, MarkerCluster
import folium
import pandas as pd
from folium import Icon

# Filter out stops without coordinates
stops_with_coords = [stop for stop in bus_stops_data if stop["latitude"] is not None and stop["longitude"] is not None]

# Convert to Pandas DataFrame for easier handling with Folium
stops_df = pd.DataFrame(stops_with_coords)

# Initialize a Folium map centered around an average location
map_center = [8.4869, 76.9529]
m = folium.Map(location=map_center, tiles="CartoDB Positron", zoom_start=13, min_zoom=5, max_zoom=15)

# Prepare data for HeatMap (latitude, longitude, and intensity)
heat_data = []
for _, row in stops_df.iterrows():
    heat_data.append([row["latitude"], row["longitude"], row["passenger_count"]])

# Create the HeatMap layer with enhanced visual settings
HeatMap(
    heat_data,
    min_opacity=0.3,  # Increase minimum opacity for better visibility
    max_opacity=0.9,  # Increase maximum opacity for better visibility
    radius=25,        # Increase size of heat spots
    blur=20,          # Increase amount of blur applied
    gradient={        # Enhanced gradient color scale
        0.2: 'blue',  # Low passenger count -> blue
        0.4: 'cyan',
        0.6: 'lime',
        0.8: 'orange',
        1.0: 'red',   # High passenger count -> red
    }
).add_to(m)

# Create a MarkerCluster to manage markers based on zoom level
marker_cluster = MarkerCluster().add_to(m)

# Add popups for bus stops with their name and passenger count
for _, row in stops_df.iterrows():
    folium.Marker(
        location=[row["latitude"], row["longitude"]],
        popup=f"<b>{row['stop_name']}</b><br>Passenger count: {row['passenger_count']}",
        tooltip=row["stop_name"],
        icon=Icon(icon_size=(10, 10))  # Adjust the icon size here
    ).add_to(marker_cluster)

# Save map to an HTML file
m.save("passenger_density_map_with_clusters.html")


In [12]:

from folium.plugins import HeatMap
# Filter out stops without coordinates
stops_with_coords = [stop for stop in bus_stops_data if stop["latitude"] is not None and stop["longitude"] is not None]

# Convert to Pandas DataFrame for easier handling with Folium
stops_df = pd.DataFrame(stops_with_coords)

# Initialize a Folium map centered around an average location
map_center = [8.4869, 76.9529]
m = folium.Map(location=map_center,tiles="OpenStreetMap" ,zoom_start=13,min_zoom=5,max_zoom=15)
# Prepare data for HeatMap (latitude, longitude, and intensity)
heat_data = []
for _, row in stops_df.iterrows():
    heat_data.append([row["latitude"], row["longitude"], row["passenger_count"]])

# Create the HeatMap layer
HeatMap(
    heat_data,
    min_opacity=0.2,  # Minimum opacity (low-intensity areas will be more transparent)
    max_opacity=0.8,  # Maximum opacity (high-intensity areas will be more visible)
    radius=20,        # Adjust size of heat spots
    blur=15,          # Amount of blur applied
    gradient={        # Gradient color scale
        0.2: 'blue',  # Low passenger count -> blue
        0.4: 'green',
        0.6: 'yellow',
        0.8: 'red',   # High passenger count -> red
    }
).add_to(m)

# Add popups for bus stops with their name and passenger count
for _, row in stops_df.iterrows():
    marker = folium.Marker(
        location=[row["latitude"], row["longitude"]],
        popup=f"<b>{row['stop_name']}</b><br>Passenger count: {row['passenger_count']}",
        tooltip=row["stop_name"],
    )

    # Add zoom level restriction for the marker
    marker.add_to(m)
    marker._parent.options['maxZoom'] = 15  # Adjust to the max zoom level when marker becomes visible
    marker._parent.options['minZoom'] = 10  # Adjust to the min zoom level for when marker is hidden

# Save map to an HTML file
m.save("passenger_density_map.html")
