In [None]:
import os
import io
import time
import requests
import zipfile

import pandas as pd
import geopandas as gpd

In [None]:
def read_hdb_price(zip_path):
    '''
    read HDB price file (.zip)
    
    Parameters
    ----------
    zip_path : str
        DESCRIPTION.

    Returns
    -------
    data : pandas.DataFrame
        DESCRIPTION.

    '''
    zf = zipfile.ZipFile(zip_path)
    name_li = zf.namelist()
    
    data_li = []
    for file_path in name_li:
        if file_path.endswith('.csv'):
            data = zf.read(file_path).decode('utf-8')
            data = pd.read_csv(io.StringIO(data))
            data_li.append(data)
    zf.close()
    data = pd.concat(data_li, axis=0)
    data.sort_values(by = ['month', 'town', 'street_name', 'block', 'storey_range', 'flat_type'], 
                     ignore_index=True, inplace=True)
    return data
# =============================================================================


def req_addr_loc(text):
    
    url = 'https://developers.onemap.sg/commonapi/search'
    params = {'searchVal'      : text,
              'returnGeom'     : 'Y',
              'getAddrDetails' : 'Y',
              'pageNum'        : 1 }
    
    while True:
        try:
            time.sleep(0.01)
            response = requests.get(url, params=params)
            content = response.json() 
            res = content['results']
            
            if len(res) >= 1:
                res = res[0]
            else:
                res = {'SEARCHVAL' : '', 
                       'BLK_NO'    : '', 
                       'ROAD_NAME' : '', 
                       'BUILDING'  : '', 
                       'ADDRESS'   : '', 
                       'POSTAL'    : '', 
                       'X'         : '', 
                       'Y'         : '', 
                       'LATITUDE'  : '', 
                       'LONGITUDE' : '', 
                       'LONGTITUDE': '' }
            
            break
        except:
            print('Fetching {} failed. Retrying in 5 sec'.format(text))
            time.sleep(5)
            continue
    return res
# =============================================================================


def req_hdb_addr_loc(hdb_addr, save_path):
    '''
    

    Parameters
    ----------
    hdb_addr : pandas.DataFrame
        DESCRIPTION.
    save_path : str
        DESCRIPTION.

    Returns
    -------
    hdb_addr_req : TYPE
        DESCRIPTION.

    '''
    
    # extract address infomartion 
    hdb_addr = hdb_addr[['block', 'street_name', 'town']].drop_duplicates(ignore_index=True)
    hdb_addr['addr_id'] = hdb_addr[['block', 'street_name']].agg(' '.join, axis=1)
    # chech whether addr_id is unique
    assert hdb_addr.shape[0] == hdb_addr['addr_id'].unique().shape[0]
    
    hdb_addr_req = []
    
    for ix, addr in hdb_addr.iterrows():
        if ix % 200 == 0:
            print(ix)
        res = addr.to_dict()
        # print(req_addr_loc(res['addr_id']))
        res.update(req_addr_loc(res['addr_id']))
        hdb_addr_req.append(res) 
    
    # save file
    hdb_addr_req = pd.DataFrame(hdb_addr_req)
    hdb_addr_req.to_csv(save_path, index=False)
    
    hdb_addr_req['blk_flag'] = hdb_addr_req[['block', 'BLK_NO']].apply(lambda x: x['block'] == x['BLK_NO'], 
                                                                       axis=1)
    # obtain the blk No of HDB which mismatch with the request result
    blk_flag = hdb_addr_req[hdb_addr_req['blk_flag'] == False]
        # lambda x: x.loc[x.index, 'block'] == x.loc[x.index, 'BLK_NO']))
    return hdb_addr_req, blk_flag
# =============================================================================



def read_hdb_location(zip_path):
    zf = zipfile.ZipFile(zip_path)
    file_path = zf.namelist()[0]
    data = zf.read(file_path).decode('utf-8')
    data = pd.read_csv(io.StringIO(data))
    zf.close()
    return data

# 1. Acquire all flat addresses

In [None]:
# acquire all flat address

zip_path = 'resale-flat-prices.zip'
hdb_price = read_hdb_price(zip_path)

save_path = 'hdb_address_{0}.csv'.format("2022-06")
req_hdb_addr_loc(hdb_addr=hdb_price, save_path=save_path)

# 2. Extract flat price

In [None]:
zip_path = 'hdb_address_2022-06.zip'
hdb_addr = read_hdb_location(zip_path)

hdb_addr['blk_flag'] = hdb_addr[['block', 'BLK_NO']].apply(lambda x: x['block'] == x['BLK_NO'], 
                                                           axis=1)
# obtain the blk No of HDB which mismatch with the request result
blk_flag = hdb_addr[hdb_addr['blk_flag'] == False]

In [None]:
month = '2022-06'
zip_path = 'resale-flat-prices.zip'

hdb_price = read_hdb_price(zip_path)
hdb_price = hdb_price[hdb_price['month'] == month]
hdb_price.reset_index(drop=True, inplace=True)
hdb_price['addr_id'] = hdb_price[['block', 'street_name']].agg(' '.join, axis=1)
hdb_price['price_per_area'] = hdb_price['resale_price'] / hdb_price['floor_area_sqm']

In [None]:
hdb_price = hdb_price.merge(hdb_addr, on='addr_id', how='left')

hdb_price_gdf = gpd.GeoDataFrame(hdb_price, 
                                 geometry = gpd.points_from_xy(hdb_price['LONGITUDE'], hdb_price['LATITUDE']),
                                 crs = "EPSG:4326")

hdb_price_gdf.drop(['block_y', 'street_name_y', 'town_y', 'SEARCHVAL', 'BLK_NO',
                    'X', 'Y', 'LATITUDE', 'LONGITUDE', 'LONGTITUDE'], 
                    axis=1, inplace=True)

hdb_price_gdf.plot(column='price_per_area', legend=True, s=5, cmap="copper")