In [2]:
import pandas as pd
import os
import asyncio
import aiohttp
from tqdm.asyncio import tqdm

In [5]:
# Read all the csv files in the directory and concatenate them into a single dataframe
dir = "../datasets/private/raw"

private = pd.concat(
    [pd.read_csv(os.path.join(dir, f)) for f in os.listdir(dir) if f.endswith(".csv")],
    ignore_index=True,
)

In [6]:
# Clean the column names and convert the sale date to a datetime object
private.columns = private.columns.str.strip().str.lower().str.replace(" ", "_")
private["sale_date"] = pd.to_datetime(
    private["sale_date"], format="%b-%y"
).dt.to_period("M")

# Create address column by concatenating project_name and street_name
private["address"] = private["project_name"] + " " + private["street_name"]
private = private.drop(columns=["project_name", "street_name"])

In [7]:
# Save the cleaned dataframe to a csv file
private.to_csv(
    "../datasets/private/private.csv",
    index=False,
)

In [8]:
addresses = private["address"].unique().tolist()

In [9]:
# Geocode addresses using OneMap API
url = "https://www.onemap.gov.sg/api/common/elastic/"


async def get_hdb_data(address, session, progress_bar=None):
    params = {"searchVal": address, "returnGeom": "Y", "getAddrDetails": "Y"}
    try:
        async with session.get(url, params=params) as response:
            data = await response.json()
            if data["found"] != 0:
                result = data["results"][0]
                if progress_bar:
                    progress_bar.update(1)
                return {**{"QUERY": address}, **result}
            if progress_bar:
                progress_bar.update(1)
            return {"QUERY": address, "found": 0}
    except Exception as e:
        if progress_bar:
            progress_bar.update(1)
        return {"QUERY": address, "ERROR": "REQUEST FAILED"}


async def main(addresses):
    results = []
    retry_addresses = addresses  # Initial set of addresses to try
    async with aiohttp.ClientSession() as session:
        progress_bar = tqdm(total=len(addresses), desc="Fetching data")
        while retry_addresses:
            tasks = [
                get_hdb_data(address, session, progress_bar)
                for address in retry_addresses
            ]
            batch_results = await asyncio.gather(*tasks)

            retry_addresses = (
                []
            )  # Reset retry_addresses list for the next iteration for any errors

            for res in batch_results:
                if res is not None:
                    if (
                        "ERROR" in res
                    ):  # Check if the response is an error, without updating progress
                        retry_addresses.append(res["QUERY"])  # Add address for retry
                    elif res.get("found", 1) == 0:  # Address has no data but no error
                        results.append(res)
                    else:  # Success case
                        results.append(res)
        progress_bar.close()

    if results:
        # Convert list of dicts to DataFrame, omitting error entries for presentation
        results_df = pd.DataFrame([res for res in results if "ERROR" not in res])
        return results_df.drop(columns="found")
    else:
        print("No results found")


# To run this in a notebook or async environment:
df = await main(addresses)

Fetching data: 4573it [00:41, 110.00it/s]                          


In [10]:
# Save the geocoded addresses to a CSV file
df.to_csv("../datasets/private/addresses.csv", index=False)