# Data Pipeline

Notebook version of `tools/data_pipeline.py`, explicitly demonstrating all the steps in loading the published resale data from CSV files to processing and saving them as a ZIP file.

1. Load and process CSV files published on https://data.gov.sg/collections/189/view.
2. Load any existing geocoded addresses.
3. Update geocoded addresses.
4. Make H3 geometries.
5. Output data to disk for further downstream analytics.

In [None]:
import sys
sys.dont_write_bytecode = True

from pathlib import Path
import pandas as pd


# Local imports.
from resale_flat_prices.csv_data.resale_csv_data import ResaleCsvData
from resale_flat_prices.geocode.geocoded_addresses import GeocodedAddresses
from resale_flat_prices.csv_data.rent_csv_data import RentCsvData


# Data directories.
csv_data_dir = Path("../data/ResaleFlatPrices/")
processed_data_dir = Path("../data/processed_data/")
rent_csv_data_file = Path("../data/RentingOutofFlats/RentingOutofFlats2024CSV.csv")

In [None]:
# Load and process raw resale CSV files published on https://data.gov.sg/collections/189/view.
csv_data = ResaleCsvData(csv_data_dir, wanted_columns = "default")
csv_data.load_csv_files()
csv_data.compile_csv_data()
csv_data.process_csv_data()
print("Loaded and compiled resale prices CSV data into shape {}.".format(csv_data.df.shape))

In [None]:
# Load and process raw rent CSV files published on https://data.gov.sg/datasets/d_c9f57187485a850908655db0e8cfe651/view
rent_csv_data = RentCsvData(rent_csv_data_file)
rent_csv_data.load_csv_file()
rent_csv_data.process_csv_data()
print("Loaded and compiled rent CSV data into shape {}.".format(rent_csv_data.df.shape))

In [None]:
# Load geocoded addresses.
geocoded_addresses = GeocodedAddresses()
geocoded_addresses.read_json(processed_data_dir / "geocoded_addresses.json")
print("Loaded {} existing geocoded addresses.".format(len(geocoded_addresses.address_dict)))

# Check for new addresses to be geocoded.
all_unique_addresses = set(csv_data.df["address"].unique())
all_unique_geocoded_addresses = geocoded_addresses.get_all_geocoded_addresses()

# Update new geocoded addresses.
missing_addresses = all_unique_addresses.difference(all_unique_geocoded_addresses)
print("Found {} new addresses to be geocoded in loaded CSV data.".format(len(missing_addresses)))
if len(missing_addresses) > 0:
    print("Updating {} new geocoded addresses.".format(len(missing_addresses)))
    geocoded_addresses.update_geocoded_addresses(missing_addresses)
    geocoded_addresses.to_json(processed_data_dir / "geocoded_addresses.json")

# Check for problematic geocodes.
problem_addresses = geocoded_addresses.verify_geocoded_latitudes_and_longitudes(country = "SINGAPORE")
if len(problem_addresses) > 0:
    print("Warning - the following {} addresses do not seem to have been geocoded correctly.".format(
        len(problem_addresses))
    )
    for i, p in enumerate(problem_addresses):
        print("{:05d}: {}.".format(i, p))

In [None]:
# Merge geocoded addresses with the resale prices CSV data.
geocode_df = geocoded_addresses.address_dict_to_df()
csv_df = csv_data.get_df()
processed_data_df = pd.merge(left=csv_df, right=geocode_df, left_on="address", right_on="address", how="left")

print("Merged resale prices data shape: {}.".format(processed_data_df.shape))

In [None]:
# Merge geocoded addresses with the rent CSV data.
rent_csv_df = rent_csv_data.get_df()
processed_rent_data_df = pd.merge(left=rent_csv_df, right=geocode_df, left_on="address", right_on="address", how="left")
print("Merged rent data shape: {}.".format(processed_rent_data_df.shape))

In [None]:
output_csv_file = "resale-flat-prices.csv.zip"

# Output the merged processed data to disk.
if output_csv_file[-3:] == "zip":
    compression = "zip"
else:
    compression = None
print("Saving processed resale prices data to {}.".format(processed_data_dir / output_csv_file))
processed_data_df.to_csv(processed_data_dir / output_csv_file, index = False, compression = compression)

In [None]:
output_rent_csv_file = "rent-prices.csv.zip"

# Output the merged processed data to disk.
if output_rent_csv_file[-3:] == "zip":
    compression = "zip"
else:
    compression = None
print("Saving processed rent data to {}.".format(processed_data_dir / output_rent_csv_file))
processed_rent_data_df.to_csv(processed_data_dir / output_rent_csv_file, index = False, compression = compression)