<a href="https://colab.research.google.com/github/xinni-lee/projects/blob/main/Rainfall_Datagov.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Rainfall across Singapore
From Data.gov

https://data.gov.sg/datasets?formats=API&sort=relevancy&page=1&resultId=d_6580738cdd7db79374ed3152159fbd69#tag/default/GET/rainfall

**Part 1: Extract**

> fetch function:

Use the following API to populate the fetch function:
https://data.gov.sg/datasets?formats=API&sort=relevancy&page=1&resultId=d_6580738cdd7db79374ed3152159fbd69#tag/default/GET/rainfall

This API includes all rainfall readings every 5 minutes for that day, along with station information, and some other metadata.
The API uses pagination, therefore a single API call would not return all readings for the day. In order to return all readings we need to use the paginationToken in the response, to retrieve subsequent pages of responses. Append the readings from each subsequent response to the original response’s readings key.

> store function

Populate the store function to save the API response as JSON files in the /data/raw/ folder, with the the following naming scheme for the files: YYYY-MM-DD.json, where YYYY-MM-DD refers to the date of the data.

As an example, we be using between 2025-01-10 to 2025-01-15 (both days inclusive; 6 days).

In [1]:
import requests
import datetime
import os
import json

In [2]:
# # Check the API response by date using Python
# import requests

# url = "https://api-open.data.gov.sg/v2/real-time/api/rainfall?date=2025-01-10"
# response = requests.get(url)

# # Check HTTP status
# print("Status Code:", response.status_code)

# # Print full JSON response
# data = response.json()
# print(json.dumps(data, indent=2))

# # Check what keys are present
# print("Keys in API Response:", data.keys())

In [4]:
def fetch(date: datetime.date) -> dict:
    url = f"https://api-open.data.gov.sg/v2/real-time/api/rainfall?date={date.isoformat()}"
    # params = {'date': date_time.isoformat()}
    full_response = {"readings": []}
    pagination_token = None

    while True:
        params = {}
        if pagination_token:
            params["paginationToken"] = pagination_token

        print(f"Requesting page with paginationToken: {pagination_token}")

        response = requests.get(url, params=params)

        if response.status_code != 200:
            print(f"Error fetching data: {response.status_code}")
            print(response.text)
            return {}

        data = response.json()
        readings = data.get("data", {}).get("readings", [])
        full_response["readings"].extend(readings)

        print(f"Fetched {len(readings)} readings. Total so far: {len(full_response['readings'])}")

        pagination_token = data.get("data", {}).get("paginationToken")

        if pagination_token:
            print(f"Next pagination token: {pagination_token}")
        else:
            print("No more pages, exiting loop.")
            break

    # Check if metadata (stations) is available in the response
    if "stations" in data.get("data", {}):
        stations_metadata = {stations["id"]: {"name": stations["name"], "location": stations["location"]}
                             for stations in data["data"]["stations"]}

        # Merge metadata with readings without overwriting the station_location
        for reading in full_response["readings"]:
            if 'data' in reading:
                for station_reading in reading['data']:
                    station_id = station_reading.get('stationId')
                    if station_id in stations_metadata:
                        station_name = stations_metadata[station_id]["name"]
                        station_location = stations_metadata[station_id]["location"]
                        # Make sure the location is valid and doesn't have nested keys
                        if isinstance(station_location, dict) and "latitude" in station_location and "longitude" in station_location:
                            # Only merge station name and location if they're valid
                            station_reading["station_name"] = station_name
                            station_reading["station_location"] = station_location
                        else:
                            print(f"Invalid location data for station {station_id}: {station_location}")

    return full_response

def store(date: datetime.date, data: dict) -> None:
    os.makedirs("data/raw", exist_ok=True)
    file_name = f"{date.isoformat()}.json"
    file_path = os.path.join("data/raw", file_name)

    with open(file_path, "w", encoding="utf-8") as file:
        json.dump(data, file, indent=4)

    print(f"Data for {date} saved to {file_path}")

if __name__ == "__main__":
    """
    Do not edit the code below
    """
    os.makedirs("data/raw", exist_ok=True)
    start_date = datetime.date(2025, 1, 10)
    end_date = datetime.date(2025, 1, 16)

    dates = [start_date + datetime.timedelta(days=i) for i in range((end_date-start_date).days)]

    for date in dates:
        print(f"Retrieving data for {date}...")
        response = fetch(date=date)
        store(date=date, data=response)

Retrieving data for 2025-01-10...
Requesting page with paginationToken: None
Fetched 100 readings. Total so far: 100
Next pagination token: b2Zmc2V0PTEwMA==
Requesting page with paginationToken: b2Zmc2V0PTEwMA==
Fetched 100 readings. Total so far: 200
Next pagination token: b2Zmc2V0PTIwMA==
Requesting page with paginationToken: b2Zmc2V0PTIwMA==
Fetched 88 readings. Total so far: 288
No more pages, exiting loop.
Data for 2025-01-10 saved to data/raw/2025-01-10.json
Retrieving data for 2025-01-11...
Requesting page with paginationToken: None
Fetched 100 readings. Total so far: 100
Next pagination token: b2Zmc2V0PTEwMA==
Requesting page with paginationToken: b2Zmc2V0PTEwMA==
Fetched 100 readings. Total so far: 200
Next pagination token: b2Zmc2V0PTIwMA==
Requesting page with paginationToken: b2Zmc2V0PTIwMA==
Fetched 88 readings. Total so far: 288
No more pages, exiting loop.
Data for 2025-01-11 saved to data/raw/2025-01-11.json
Retrieving data for 2025-01-12...
Requesting page with paginat

**Part 2: Transform**

We will now process the raw JSON files in the data/raw/ folder.

> transform_stations function

•	Read all the responses in the data/raw/ folder.

•	Construct a stations table by extracting unique station information.

•	Store each station as a separate JSON file in the data/processed/stations/ folder, using the naming scheme: {stationId}.json.


> transform_readings function:

•	Read all responses in the data/raw/ folder.

•	Construct a readings table.

•	Store each daily reading as a separate JSON file in the data/processed/readings/ folder, using the following naming scheme: data/processed/readings/{YYYY-MM-DD}.json.

•	Each file should contain an array of readings for all stations for a single day.


In [5]:
start_date = datetime.date(2025, 1, 10)
end_date = datetime.date(2025, 1, 16)

def transform_readings() -> None:
    raw_data_folder = "data/raw"
    processed_readings_folder = "data/processed/readings"
    os.makedirs(processed_readings_folder, exist_ok=True)

    dates = [start_date + datetime.timedelta(days=i) for i in range((end_date - start_date).days)]

    for date in dates:
        raw_file_path = os.path.join(raw_data_folder, f"{date.isoformat()}.json")

        if not os.path.exists(raw_file_path):
          print(f"Raw data file for {date.isoformat()} not found.")
          continue

        with open(raw_file_path, "r", encoding="utf-8") as raw_file:
          raw_data = json.load(raw_file)
        print(f"Processing {date.isoformat()} - Keys in raw data:", raw_data.keys())

        processed_readings = []

        if 'readings' in raw_data:
            for reading in raw_data['readings']:  # Loop over each timestamp entry
                timestamp = reading.get("timestamp")

                if 'data' not in reading:
                    print(f"No 'data' key in item for {timestamp}")
                    continue

                # Extract only stationId and value
                station_readings = [
                    {"stationId": entry["stationId"], "value": entry["value"]}
                    for entry in reading['data']
                    if "stationId" in entry and "value" in entry
                ]

                # Append timestamp and station readings
                processed_readings.append({
                    "timestamp": timestamp,
                    "data": station_readings
                })

        if not processed_readings:
            print(f"No valid readings found for {date.isoformat()}")

        processed_readings_path = os.path.join(processed_readings_folder, f"{date.isoformat()}.json")
        with open(processed_readings_path, "w", encoding="utf-8") as processed_file:
            json.dump(processed_readings, processed_file, indent=4)

        print(f"Processed data saved for {date.isoformat()} to {processed_readings_path}")

    return None

def transform_stations() -> None:
    raw_data_folder = "data/raw"
    processed_stations_folder = "data/processed/stations"
    os.makedirs(processed_stations_folder, exist_ok=True)

    stations_metadata = {}

    dates = [start_date + datetime.timedelta(days=i) for i in range((end_date - start_date).days)]

    for date in dates:
        raw_file_path = os.path.join(raw_data_folder, f"{date.isoformat()}.json")

        if not os.path.exists(raw_file_path):
            print(f"Raw data file for {date.isoformat()} not found.")
            continue

        with open(raw_file_path, "r", encoding="utf-8") as raw_file:
            raw_data = json.load(raw_file)

        print(f"Processing stations for {date.isoformat()} - Keys in raw data:", raw_data.keys())

        if 'readings' in raw_data:
            for reading in raw_data['readings']:  # Loop over each timestamp entry
                for station_reading in reading.get('data', []):
                    station_id = station_reading.get('stationId')
                    station_name = station_reading.get('station_name')
                    station_location = station_reading.get('station_location')

                    if station_id and station_name and station_location:
                        if station_id not in stations_metadata:
                            stations_metadata[station_id] = {
                                "station_id": station_id,
                                "name": station_name,
                                "location": station_location
                            }

    # Save the stations' data to individual JSON files
    for station_id, station_info in stations_metadata.items():
        station_file_path = os.path.join(processed_stations_folder, f"{station_id}.json")

        with open(station_file_path, "w", encoding="utf-8") as station_file:
            json.dump(station_info, station_file, indent=4)

        print(f"Processed station saved for {station_id} to {station_file_path}")

    return None

if __name__ == "__main__":
    """
    Do not edit the code below
    """
    os.makedirs("data/processed/stations", exist_ok=True)
    os.makedirs("data/processed/readings", exist_ok=True)
    transform_stations()
    transform_readings()

Processing stations for 2025-01-10 - Keys in raw data: dict_keys(['readings'])
Processing stations for 2025-01-11 - Keys in raw data: dict_keys(['readings'])
Processing stations for 2025-01-12 - Keys in raw data: dict_keys(['readings'])
Processing stations for 2025-01-13 - Keys in raw data: dict_keys(['readings'])
Processing stations for 2025-01-14 - Keys in raw data: dict_keys(['readings'])
Processing stations for 2025-01-15 - Keys in raw data: dict_keys(['readings'])
Processed station saved for S218 to data/processed/stations/S218.json
Processed station saved for S219 to data/processed/stations/S219.json
Processed station saved for S216 to data/processed/stations/S216.json
Processed station saved for S217 to data/processed/stations/S217.json
Processed station saved for S214 to data/processed/stations/S214.json
Processed station saved for S215 to data/processed/stations/S215.json
Processed station saved for S213 to data/processed/stations/S213.json
Processed station saved for S81 to d

**Part 3: Queries** (sqls/*.sql)

Use DuckDB to query the processed JSON files. We should use the stations table to retrieve station information.
Write your SQL queries in separate .sql files in the sqls/ folder (e.g., q1.sql, q2.sql, q3.sql).

In [6]:
pip install duckdb



In [7]:
import duckdb
import datetime
import os
import json

Create sql directory

In [8]:
# Create the sqls directory
os.makedirs("sqls", exist_ok=True)

# Create sample SQL query files
queries = {
    "q1.sql": "SELECT COUNT(*) FROM readings;",
    "q2.sql": "SELECT COUNT(*) FROM stations;",
    "q3.sql": "SELECT COUNT(*) FROM readings;",
    "q4.sql": "SELECT COUNT(*) FROM stations;"
}

for file_name, query in queries.items():
    with open(f"sqls/{file_name}", "w") as f:
        f.write(query)

print("SQL files created in 'sqls/' directory.")

SQL files created in 'sqls/' directory.


Create tables

In [9]:
# Load readings JSON into DuckDB
duckdb.sql("""
    CREATE TABLE readings AS
    SELECT * FROM read_json_auto('data/processed/readings/*.json', union_by_name=True);
""")

# Load stations JSON into DuckDB
duckdb.sql("""
    CREATE TABLE stations AS
    SELECT * FROM read_json_auto('data/processed/stations/*.json', union_by_name=True);
""")

print("Tables successfully created!")

Tables successfully created!


In [10]:
# Check if the tables exist:
duckdb.sql("SHOW TABLES").show()

┌──────────┐
│   name   │
│ varchar  │
├──────────┤
│ readings │
│ stations │
└──────────┘



In [11]:
# Check if the tables contain data:
duckdb.sql("SELECT * FROM readings LIMIT 5").show()
duckdb.sql("SELECT * FROM stations LIMIT 5").show()

┌───────────────────────────┬───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────

Download into csv files (optional)


In [12]:
duckdb.sql("COPY readings TO 'readings.csv' (HEADER, DELIMITER ',');")
duckdb.sql("COPY stations TO 'stations.csv' (HEADER, DELIMITER ',');")

In [13]:
from google.colab import files

files.download("readings.csv")
files.download("stations.csv")

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

Query directly here in Colab (doing it two ways here: 1. query directly here and 2. update query in the sqls/*.sql)

1.	How many stations have names starting with “Pasir”?

In [14]:
# Connect to DuckDB in memory (or specify a file for persistence)
con = duckdb.connect(database=":memory:", read_only=False)

# Run the query to find station names starting with "Bukit"
query = "SELECT COUNT(*) as count FROM stations WHERE name LIKE 'Pasir%';"
result = duckdb.sql(query).fetchall()

# Display result in Colab cell
print(f"Number of stations starting with 'Pasir': {result[0][0]}")

Number of stations starting with 'Pasir': 3


Read and update sql files in sql directory

In [15]:
# Read the existing SQL query
file_path = "sqls/q1.sql"

with open(file_path, "r") as f:
    sql_query = f.read()

print("Current Query:\n", sql_query)  # Display the current SQL query

Current Query:
 SELECT COUNT(*) FROM readings;


In [16]:
new_query = "SELECT COUNT(*) as count FROM stations WHERE name LIKE 'Pasir%';"  # Modify this

# Overwrite the SQL file with the new query
with open(file_path, "w") as f:
    f.write(new_query)

print("Updated q1.sql successfully!")

Updated q1.sql successfully!


In [17]:
def execute_sql_from_file(file_path):
    with open(file_path, 'r') as f:
        sql_query = f.read()
    result = duckdb.sql(sql_query).df()  # Convert output to DataFrame
    return result

# Run the query from 'sqls/q1.sql'
execute_sql_from_file("sqls/q1.sql")

Unnamed: 0,count
0,3


2.	What is the total rainfall measured for the whole period

In [18]:
duckdb.sql("DESCRIBE readings").show()

┌─────────────┬─────────────────────────────────────────────┬─────────┬─────────┬─────────┬─────────┐
│ column_name │                 column_type                 │  null   │   key   │ default │  extra  │
│   varchar   │                   varchar                   │ varchar │ varchar │ varchar │ varchar │
├─────────────┼─────────────────────────────────────────────┼─────────┼─────────┼─────────┼─────────┤
│ timestamp   │ VARCHAR                                     │ YES     │ NULL    │ NULL    │ NULL    │
│ data        │ STRUCT(stationId VARCHAR, "value" DOUBLE)[] │ YES     │ NULL    │ NULL    │ NULL    │
└─────────────┴─────────────────────────────────────────────┴─────────┴─────────┴─────────┴─────────┘



In [19]:
duckdb.sql("SELECT data FROM readings LIMIT 5").show()

┌───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────

In [20]:
# Save the records as separate columns, expanded/unnest from data column
query = """
CREATE OR REPLACE TABLE readings_flat AS
SELECT
  timestamp,
  UNNEST(data).stationId AS station_id,
  UNNEST(data).value AS value
FROM readings;
"""
# Execute the query
duckdb.sql(query)
duckdb.sql("SELECT * FROM readings_flat LIMIT 10").show()

┌───────────────────────────┬────────────┬────────┐
│         timestamp         │ station_id │ value  │
│          varchar          │  varchar   │ double │
├───────────────────────────┼────────────┼────────┤
│ 2025-01-10T23:55:00+08:00 │ S218       │  0.204 │
│ 2025-01-10T23:55:00+08:00 │ S219       │  0.402 │
│ 2025-01-10T23:55:00+08:00 │ S216       │  0.204 │
│ 2025-01-10T23:55:00+08:00 │ S217       │  0.197 │
│ 2025-01-10T23:55:00+08:00 │ S214       │  0.394 │
│ 2025-01-10T23:55:00+08:00 │ S215       │  0.203 │
│ 2025-01-10T23:55:00+08:00 │ S213       │    0.4 │
│ 2025-01-10T23:55:00+08:00 │ S81        │    0.4 │
│ 2025-01-10T23:55:00+08:00 │ S40        │    0.2 │
│ 2025-01-10T23:55:00+08:00 │ S84        │    0.2 │
├───────────────────────────┴────────────┴────────┤
│ 10 rows                               3 columns │
└─────────────────────────────────────────────────┘



Download into csv files (optional)

In [21]:
duckdb.sql("COPY readings_flat TO 'readings_flat.csv' (HEADER, DELIMITER ',');")
files.download("readings_flat.csv")

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [22]:
query = """
SELECT CAST(ROUND(SUM(value), 0) AS INTEGER) AS total_rainfall
FROM readings_flat;
"""
duckdb.sql(query).show()
result = duckdb.sql(query).fetchall()

# Display result in Colab cell
print(f"Total rainfall measured: {result[0][0]}")

┌────────────────┐
│ total_rainfall │
│     int32      │
├────────────────┤
│          20718 │
└────────────────┘

Total rainfall measured: 20718


In [23]:
# Read the existing SQL query
file_path = "sqls/q2.sql"

with open(file_path, "r") as f:
    sql_query = f.read()

print("Current Query:\n", sql_query)  # Display the current SQL query

Current Query:
 SELECT COUNT(*) FROM stations;


In [24]:
new_query = """
SELECT CAST(ROUND(SUM(value), 0) AS INTEGER) AS total_rainfall
FROM readings_flat;
"""

# Overwrite the SQL file with the new query
with open(file_path, "w") as f:
    f.write(new_query)

print("Updated q2.sql successfully!")

Updated q2.sql successfully!


In [25]:
# Run the query from 'sqls/q2.sql'
execute_sql_from_file("sqls/q2.sql")

Unnamed: 0,total_rainfall
0,20718


3.	What is the station_name, day, and value for the station with the highest total rainfall for a day?

In [26]:
query = """
SELECT stations.name, CAST(readings_flat.timestamp AS DATE) AS date, CAST(ROUND(SUM(readings_flat.value), 0) AS INTEGER) AS rainfall
FROM stations
JOIN readings_flat
ON stations.station_id = readings_flat.station_id
GROUP BY stations.name, date
ORDER BY rainfall DESC
LIMIT 1;
"""
duckdb.sql(query).show()
result = duckdb.sql(query).fetchall()

# Display result in Colab cell
print(f"Highest rainfall measured: {result[0][2]}mm at {result[0][0]} on {result[0][1]}")

┌────────────────────┬────────────┬──────────┐
│        name        │    date    │ rainfall │
│      varchar       │    date    │  int32   │
├────────────────────┼────────────┼──────────┤
│ Pasir Ris Drive 12 │ 2025-01-10 │      145 │
└────────────────────┴────────────┴──────────┘

Highest rainfall measured: 145mm at Pasir Ris Drive 12 on 2025-01-10


In [27]:
# Read the existing SQL query
file_path = "sqls/q3.sql"

with open(file_path, "r") as f:
    sql_query = f.read()

print("Current Query:\n", sql_query)  # Display the current SQL query

Current Query:
 SELECT COUNT(*) FROM readings;


In [28]:
new_query = """
SELECT stations.name, CAST(readings_flat.timestamp AS DATE) AS date, CAST(ROUND(SUM(readings_flat.value), 0) AS INTEGER) AS rainfall
FROM stations
JOIN readings_flat
ON stations.station_id = readings_flat.station_id
GROUP BY stations.name, date
ORDER BY rainfall DESC
LIMIT 1;
"""

# Overwrite the SQL file with the new query
with open(file_path, "w") as f:
    f.write(new_query)

print("Updated q3.sql successfully!")

Updated q3.sql successfully!


In [29]:
# Run the query from 'sqls/q3.sql'
execute_sql_from_file("sqls/q3.sql")

Unnamed: 0,name,date,rainfall
0,Pasir Ris Drive 12,2025-01-10,145


4.	What is the minimum, maximum, and average daily rainfall for Pasir Ris Drive 12?

In [30]:
query = """
WITH daily_rainfall AS (
    SELECT
        stations.name,
        CAST(readings_flat.timestamp AS DATE) AS day,
        SUM(readings_flat.value) AS daily_rainfall
    FROM readings_flat
    JOIN stations
        ON stations.station_id = readings_flat.station_id
    WHERE stations.name = 'Pasir Ris Drive 12'
      AND readings_flat.value > 0
    GROUP BY stations.name, day
)

SELECT
    CAST(ROUND(MIN(daily_rainfall), 0) AS INTEGER) AS min_rainfall,
    CAST(ROUND(MAX(daily_rainfall), 0) AS INTEGER) AS max_rainfall,
    CAST(ROUND(AVG(daily_rainfall), 0) AS INTEGER) AS avg_rainfall
FROM daily_rainfall
"""
duckdb.sql(query).show()
result = duckdb.sql(query).fetchall()

# Display result in Colab cell
print(f"Daily rainfall measured has average of {result[0][2]}mm")

┌──────────────┬──────────────┬──────────────┐
│ min_rainfall │ max_rainfall │ avg_rainfall │
│    int32     │    int32     │    int32     │
├──────────────┼──────────────┼──────────────┤
│            9 │          145 │           68 │
└──────────────┴──────────────┴──────────────┘

Daily rainfall measured has average of 68mm


In [31]:
# Read the existing SQL query
file_path = "sqls/q4.sql"

with open(file_path, "r") as f:
    sql_query = f.read()

print("Current Query:\n", sql_query)  # Display the current SQL query

Current Query:
 SELECT COUNT(*) FROM stations;


In [32]:
new_query = """
WITH daily_rainfall AS (
    SELECT
        stations.name,
        CAST(readings_flat.timestamp AS DATE) AS day,
        SUM(readings_flat.value) AS daily_rainfall
    FROM readings_flat
    JOIN stations
        ON stations.station_id = readings_flat.station_id
    WHERE stations.name = 'Pasir Ris Drive 12'
      AND readings_flat.value > 0
    GROUP BY stations.name, day
)

SELECT
    CAST(ROUND(MIN(daily_rainfall), 0) AS INTEGER) AS min_rainfall,
    CAST(ROUND(MAX(daily_rainfall), 0) AS INTEGER) AS max_rainfall,
    CAST(ROUND(AVG(daily_rainfall), 0) AS INTEGER) AS avg_rainfall
FROM daily_rainfall;
"""

# Overwrite the SQL file with the new query
with open(file_path, "w") as f:
    f.write(new_query)

print("Updated q4.sql successfully!")

Updated q4.sql successfully!


In [33]:
# Run the query from 'sqls/q4.sql'
execute_sql_from_file("sqls/q4.sql")

Unnamed: 0,min_rainfall,max_rainfall,avg_rainfall
0,9,145,68
