In [1]:
import requests
import pandas as pd
import os
import zipfile
from dotenv import load_dotenv
from io import BytesIO
import json

In [2]:
# load environment variables from .env file
load_dotenv()
LTA_KEY = os.getenv('LTA_API_KEY')
ONEMAP_KEY = os.getenv("ONE_MAP_ACCESS_TOKEN") 

##### 1. LTA Datamall Datasets

In [9]:
### GATHERING DATASETS ###

def fetch_data(url, headers):
    response = requests.get(url, headers=headers)
    if response.status_code == 200:
        return response.json()
    else:
        print(f"Error fetching data from {url}: {response.status_code}")
        return None

def save_to_csv(data, filename):
    os.makedirs("data", exist_ok=True)
    full_path = os.path.join("data", filename)

    df = pd.DataFrame(data)
    df.to_csv(full_path, index=False)
    print(f"Data saved to {filename}")

def download_file(download_url, filename):
    """Downloads ZIP file from URL, extract content and save as csv."""
    response = requests.get(download_url, stream = True)
    
    if response.status_code == 200:
        os.makedirs("data", exist_ok=True)
        full_path = os.path.join("data", filename)

        with zipfile.ZipFile(BytesIO(response.content)) as zip_file:
            file = zip_file.namelist()[0]   # get the name of first file in the zip
            with open(full_path, 'wb') as f:
                f.write(zip_file.read(file))
            print(f"Extracted and saved: {full_path}")
    else:
        print(f"Error downloading the file from {download_url}.")


def get_bus_routes(api_key):
    headers = {
        "AccountKey": api_key,
        "accept": "application/json"
    }
    
    bus_routes_url = "https://datamall2.mytransport.sg/ltaodataservice/BusRoutes"
    all_bus_routes = []
    skip = 0
    limit = 500  # Limit per API call

    while True:
        # Construct the URL with pagination
        paginated_url = f"{bus_routes_url}?$skip={skip}&$top={limit}"
        bus_routes_data = fetch_data(paginated_url, headers)

        if bus_routes_data and 'value' in bus_routes_data:
            all_bus_routes.extend(bus_routes_data['value'])  # Append new data
            
            # If the returned data is less than the limit, we have fetched all data
            if len(bus_routes_data['value']) < limit:
                break
            
            # Increment the skip value for the next API call
            skip += limit
        else:
            break  # Exit if there was an error or no more data

    if all_bus_routes:
        save_to_csv(all_bus_routes, "bus_routes_full.csv")


def get_bus_stops(api_key):
    headers = {
        "AccountKey": api_key,
        "accept": "application/json"
    }
    
    bus_stops_url = "https://datamall2.mytransport.sg/ltaodataservice/BusStops"
    all_bus_stops = []
    skip = 0
    limit = 500  # Limit per API call

    while True:
        # Construct the URL with pagination
        paginated_url = f"{bus_stops_url}?$skip={skip}&$top={limit}"
        bus_stops_data = fetch_data(paginated_url, headers)

        if bus_stops_data and 'value' in bus_stops_data:
            all_bus_stops.extend(bus_stops_data['value'])  # Append new data
            
            # If the returned data is less than the limit, we have fetched all data
            if len(bus_stops_data['value']) < limit:
                break
            
            # Increment the skip value for the next API call
            skip += limit
        else:
            break  # Exit if there was an error or no more data

    if all_bus_stops:
        save_to_csv(all_bus_stops, "bus_stops_full.csv")


def get_bus_services(api_key):
    headers = {
        "AccountKey": api_key, 
        "accept": "application/json"
    }
    
    bus_services_url = "https://datamall2.mytransport.sg/ltaodataservice/BusServices"
    all_bus_services = []
    skip = 0
    limit = 500  # Limit per API call

    while True:
        paginated_url = f"{bus_services_url}?$skip={skip}&$top={limit}"
        bus_services_data = fetch_data(paginated_url, headers)

        if bus_services_data and 'value' in bus_services_data:
            all_bus_services.extend(bus_services_data['value'])

            # If the returned data is less than the limit, we have fetched all data
            if len(bus_services_data['value']) < limit:
                break
            
            skip += limit  # Increment the skip value for the next API call
        else:
            break  # Exit if there was an error or no more data

    if all_bus_services:
        save_to_csv(all_bus_services, "bus_services_full.csv")


def get_passenger_volume_by_bus_stops(api_key):
    headers = {
        "AccountKey": api_key,
        "accept": "application/json"
    }
    
    passenger_volume_bus_stops_url = "https://datamall2.mytransport.sg/ltaodataservice/PV/Bus"
    response = fetch_data(passenger_volume_bus_stops_url, headers)

    if response:
        download_link = response.get('value', [{}])[0].get('Link')
        if download_link:
            download_file(download_link, "passenger_volume_by_bus_stops.csv")
    else:
        print(f"No data received from the API. Link: {download_link}")


def get_passenger_volume_by_train_stations(api_key):
    headers = {
        "AccountKey": api_key,
        "accept": "application/json"
    }
    
    passenger_volume_train_stations_url = "https://datamall2.mytransport.sg/ltaodataservice/PV/Train"
    response = fetch_data(passenger_volume_train_stations_url, headers)

    if response:
        download_link = response.get('value', [{}])[0].get('Link')
        if download_link:
            download_file(download_link, "passenger_volume_by_train_stations.csv")
    else:
        print(f"No data received from the API. Link: {download_link}")


def get_passenger_origin_dest_bus(api_key):
    headers = {
        "AccountKey": api_key,
        "accept": "application/json"
    }
    
    passenger_volume_bus_stops_url = "https://datamall2.mytransport.sg/ltaodataservice/PV/ODBus"
    response = fetch_data(passenger_volume_bus_stops_url, headers)

    if response:
        download_link = response.get('value', [{}])[0].get('Link')
        if download_link:
            download_file(download_link, "passenger_volume_OD_by_bus_stops.csv")
    else:
        print(f"No data received from the API. Link: {download_link}")


In [None]:
get_bus_routes(LTA_KEY)
get_bus_stops(LTA_KEY)
get_bus_services(LTA_KEY)
get_passenger_volume_by_bus_stops(LTA_KEY)
get_passenger_volume_by_train_stations(LTA_KEY)
get_passenger_origin_dest_bus(LTA_KEY)

##### 2. Onemap Dataset (MRT Stations)

In [None]:
os.makedirs("data", exist_ok=True)

mrt_ranges = {
    "TE": range(1, 30),
    "NS": range(1, 29),    # no NS6
    "EW": range(1, 34),
    "CG": range(1, 3),
    "NE": range(1, 19),    # no NE2
    "CC": range(1, 30),
    "CE": range(1, 3),
    "DT": range(1, 36),
    "SW": range(1, 9),
    "SE": range(1, 6),
    "PW": range(1, 8),
    "PE": range(1, 8)
}

# Generate the MRT lines
list_of_mrt = [f"{line}{i}" for line, r in mrt_ranges.items() for i in r]

mrt_names = []
mrt_lat = []
mrt_long = []
valid_mrt_list = []

for i in range(0, len(list_of_mrt)):
    query_address = list_of_mrt[i]
    query_string = "https://www.onemap.gov.sg/api/common/elastic/search?searchVal="+str(query_address)+"&returnGeom=Y&getAddrDetails=Y"

    response = requests.get(query_string)
    data_mrt= json.loads(response.content)

    if data_mrt['found'] != 0:
        station_name = data_mrt["results"][0]["SEARCHVAL"]

        if "STATION" in station_name:
            clean_name = re.sub(r"\s*\(.*?\)", "", station_name)
            valid_mrt_list.append(query_address)
            mrt_names.append(clean_name)
            mrt_lat.append(data_mrt["results"][0]["LATITUDE"])
            mrt_long.append(data_mrt["results"][0]["LONGITUDE"])
            print(str(query_address)+", Latitude: "+data_mrt["results"][0]["LATITUDE"] + " Longitude: "+data_mrt["results"][0]["LONGITUDE"])

# Manually append missing MRT station data
manual_data = [
    {"STN_NAME": "PASIR RIS MRT STATION", "STN_NO": "EW1", "Latitude": "1.372983774", "Longitude": "103.9492681"},
    {"STN_NAME": "THANGGAM LRT STATION", "STN_NO": "SW4", "Latitude": "1.397318155", "Longitude": "103.8756352"},
    {"STN_NAME": "TONGKANG LRT STATION", "STN_NO": "SW7", "Latitude": "1.389347953", "Longitude": "103.8858441"}
]

for entry in manual_data:
    mrt_names.append(entry["STN_NAME"])
    valid_mrt_list.append(entry["STN_NO"])
    mrt_lat.append(entry["Latitude"])
    mrt_long.append(entry["Longitude"])

mrt_location = pd.DataFrame({
    'STN_NAME': mrt_names,
    'STN_NO': valid_mrt_list, 
    'Latitude': mrt_lat, 
    'Longitude': mrt_long
})

filename = "data/mrt_stations.csv"
mrt_location.to_csv(filename, index=False)