# **Rainfall Annual Report Builder** — Public Release

**© 2025 Ridho Nanda Pratama, Researcher, Geoscience-Environmental-Water-Sustainability**

**Purpose**: This script aggregates and harmonizes daily rainfall datasets from Indonesia’s Meteorological, Climatological, and Geophysical Agency (BMKG) into a single, analysis-ready corpus.
**Sharing**: You may redistribute with attribution. Commercial reuse requires written permission.
**Reproducibility**: Outputs are preserved as generated. Re-running may vary slightly by environment.


Inputs
------
- One folder per year: rainfall_data_2015, rainfall_data_2016, ..., rainfall_data_2024
- Each yearly folder contains 12 monthly files, e.g. ...JanYYYY.xlsx, ...FebYYYY.xlsx, etc.

Outputs
-------
- All yearly workbooks & plots saved into ./output_all
- One annual summary workbook: output_all/annual_summary_2015_2024.xlsx
  * Sheet "Summary" (Year as the first column)
  * (Optional) Plots
- The script also supports extracting monthly maxima from each yearly workbook.

Assumptions
-----------
- Monthly files have daily data in columns A (Day) and B (Rainfall), with the header at row 8 (skiprows=7).
- Value 8888 is treated as missing.

#### Notes: You can adjust START_YEAR / END_YEAR / INPUT_PREFIX / OUTPUT_DIR in the Configuration section.

In [38]:
from __future__ import annotations
from pathlib import Path
from typing import Dict, List, Union, Optional
from functools import reduce
import re
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from openpyxl import Workbook, load_workbook
from openpyxl.utils.dataframe import dataframe_to_rows
from openpyxl.styles import Alignment, Border, Side, PatternFill, Font
from openpyxl.utils import get_column_letter

# =========================
# Configuration
# =========================
START_YEAR: int = 2015
END_YEAR: int = 2024
INPUT_PREFIX: str = "rainfall_data_"       # e.g., rainfall_data_2015
OUTPUT_DIR: Path = Path("./output_all")    # all outputs collected here
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

# (Optional) Override station name (if auto-detection fails / you want to force it)
STATION_NAME: Optional[str] = None  # e.g., "Halmahera Rain Gauge Station"

PERIOD: str = f"{START_YEAR}-{END_YEAR}"

MONTH_ORDER: List[str] = [
    "Jan", "Feb", "Mar", "Apr", "May", "Jun",
    "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"
]
MONTH_NUM: Dict[str, int] = {m: i + 1 for i, m in enumerate(MONTH_ORDER)}

MONTH_NAMES_ID: List[str] = [
    "Januari", "Februari", "Maret", "April", "Mei", "Juni",
    "Juli", "Agustus", "September", "Oktober", "November", "Desember"
]

# Number of top metadata header rows to read from the input file
HEADER_NROWS: int = 7  # change if your header is longer/shorter than 7 rows

# =========================
# Excel styling utilities
# =========================
def style_range(ws, r1: int, r2: int, c1: int, c2: int) -> None:
    """Apply thin borders and centered alignment to a cell range."""
    thin = Border(
        left=Side(style="thin"), right=Side(style="thin"),
        top=Side(style="thin"), bottom=Side(style="thin")
    )
    for row in ws.iter_rows(min_row=r1, max_row=r2, min_col=c1, max_col=c2):
        for cell in row:
            cell.alignment = Alignment(horizontal="center", vertical="center")
            cell.border = thin


def style_header_range(ws, r1: int, r2: int, c1: int, c2: int) -> None:
    """Apply borders to the metadata header, without wrapping text (wrap_text=False)."""
    thin = Border(
        left=Side(style="thin"), right=Side(style="thin"),
        top=Side(style="thin"), bottom=Side(style="thin")
    )
    for row in ws.iter_rows(min_row=r1, max_row=r2, min_col=c1, max_col=c2):
        for cell in row:
            cell.alignment = Alignment(horizontal="left", vertical="center", wrap_text=False)
            cell.border = thin

# =========================
# Helpers (I/O & parsing)
# =========================
def strict_find_monthly_files(input_folder: Path, year: int) -> List[tuple[str, str]]:
    """
    Find exactly one file for each month (Jan..Dec) in `input_folder`.
    Returns a list of tuples (file_path, column_label) where column_label is 'MonYYYY'.
    """
    file_info: List[tuple[str, str]] = []
    for mon in MONTH_ORDER:
        matches = list(input_folder.glob(f"*{mon}{year}*.xlsx"))
        if len(matches) == 0:
            raise FileNotFoundError(f"[{year}] Missing file for month {mon}{year} in {input_folder}")
        if len(matches) > 1:
            raise RuntimeError(f"[{year}] Multiple files found for {mon}{year}: {matches}")
        file_info.append((str(matches[0]), f"{mon}{year}"))
    return file_info


def extract_header_table(file_path: str, nrows: int = HEADER_NROWS) -> pd.DataFrame:
    """
    Read the top `nrows` lines from the input file as a metadata header (no column headers),
    then drop any columns/rows that are completely empty.
    """
    hdr = pd.read_excel(file_path, header=None, nrows=nrows)
    hdr = hdr.dropna(axis=1, how='all')  # drop fully empty columns
    hdr = hdr.dropna(axis=0, how='all')  # drop fully empty rows
    return hdr


def write_header_to_sheet(ws, header_df: pd.DataFrame) -> int:
    """
    Write `header_df` to the worksheet starting at row 1.
    Returns the last row index occupied by the header.
    """
    if header_df is None or header_df.empty:
        return 0
    start_row = 1
    for r in dataframe_to_rows(header_df, index=False, header=False):
        ws.append(r)
    end_row = ws.max_row
    max_col = ws.max_column
    style_header_range(ws, start_row, end_row, 1, max_col)
    return end_row


def get_representative_header_df() -> pd.DataFrame:
    """
    Find a representative metadata header from the first available monthly input file
    within the START_YEAR..END_YEAR range. Used for all sheets in the annual summary.
    """
    for y in range(START_YEAR, END_YEAR + 1):
        folder = Path(f"./{INPUT_PREFIX}{y}")
        if not folder.is_dir():
            continue
        try:
            files = strict_find_monthly_files(folder, y)
            return extract_header_table(files[0][0], nrows=HEADER_NROWS)
        except Exception:
            fallback = list(folder.glob("*.xlsx"))
            if fallback:
                return extract_header_table(str(fallback[0]), nrows=HEADER_NROWS)
    return pd.DataFrame()


def extract_station_name_from_header(header_df: pd.DataFrame) -> Optional[str]:
    """
    Attempt to extract the station name from `header_df`.
    Looks for lines containing 'stasiun', 'station', or 'pos hujan' (case-insensitive),
    then returns the text after ':' or the next cell content.
    """
    if header_df is None or header_df.empty:
        return None

    patterns = [r"stasiun", r"station", r"pos\s*hujan"]
    rx = re.compile("|".join(patterns), flags=re.IGNORECASE)

    for _, row in header_df.iterrows():
        cells = [str(x) for x in row.tolist() if pd.notna(x)]
        if not cells:
            continue
        joined = " | ".join(cells)
        if rx.search(joined):
            for cell in cells:
                if rx.search(cell):
                    if ":" in cell:
                        after = cell.split(":", 1)[1].strip()
                        if after:
                            return after
                    try:
                        idx = cells.index(cell)
                        if idx + 1 < len(cells):
                            cand = cells[idx + 1].strip()
                            if cand:
                                return cand
                    except ValueError:
                        pass
            m = re.search(r":\s*(.+)", joined)
            if m:
                return m.group(1).strip()
    return None


def slugify(text: str) -> str:
    """Create a safe slug for filenames."""
    text = text.lower().strip()
    text = re.sub(r"[^\w\- ]+", "", text)     # keep word, dash, space
    text = re.sub(r"\s+", "-", text)          # spaces -> dash
    text = re.sub(r"-{2,}", "-", text)        # collapse dashes
    return text.strip("-_") or "station-unknown"

# =========================
# Per-year processing
# =========================
def process_year(year: int) -> Dict[str, Union[float, int]]:
    """
    Read 12 monthly files for `year`, build an Excel sheet "Rainfall year",
    save two plots (daily line & monthly max bar), and return annual summary stats.
    """
    input_folder = Path(f"./{INPUT_PREFIX}{year}")
    file_info = strict_find_monthly_files(input_folder, year)

    # Header & station
    header_df = extract_header_table(file_info[0][0], nrows=HEADER_NROWS)
    station_name = STATION_NAME or extract_station_name_from_header(header_df) or "Station Unknown"
    station_slug = slugify(station_name)

    # Containers
    date_df = pd.DataFrame({"Date": range(1, 32)})
    rainfall_dfs: List[pd.DataFrame] = []
    stat_data: Dict[str, list] = {"Date": ["Average", "Maximum", "Rainy Days (>0)", "Total"]}
    yearly_values: List[float] = []

    # Monthly loop
    for file_path, col_name in file_info:
        # Expected layout: columns A/B = Day/Rainfall; metadata header occupies first 7 rows
        df = pd.read_excel(file_path, skiprows=7, usecols="A:B").head(31)
        df.columns = ["Day", "Rainfall"]
        df["Rainfall"] = pd.to_numeric(df["Rainfall"].replace(8888, pd.NA), errors="coerce")

        yearly_values.extend(df["Rainfall"].dropna().tolist())

        # Monthly stats
        stat_data[col_name] = [
            round(df["Rainfall"].mean(skipna=True), 2),
            df["Rainfall"].max(skipna=True),
            df.loc[df["Rainfall"] > 0, "Rainfall"].count(),
            df["Rainfall"].sum(skipna=True),
        ]

        # Merge day (1..31) into 12 monthly columns
        df["Date"] = range(1, len(df) + 1)
        df = df[["Date", "Rainfall"]].rename(columns={"Rainfall": col_name})
        df = pd.merge(date_df, df, on="Date", how="left")
        rainfall_dfs.append(df)

    # Merge 12 months on Date
    merged_df = reduce(lambda L, R: pd.merge(L, R, on="Date"), rainfall_dfs)
    merged_num = merged_df.copy()
    merged_disp = merged_df.copy().fillna("-")

    # Write yearly Excel
    wb = Workbook()
    ws = wb.active
    ws.title = f"Rainfall {year}"

    # Metadata header top (no wrap + borders)
    write_header_to_sheet(ws, header_df)
    ws.append([])  # spacer

    # Daily table
    start_row = ws.max_row + 1
    for row in dataframe_to_rows(merged_disp, index=False, header=True):
        ws.append(row)
    data_start, data_end, max_col = start_row + 1, ws.max_row, ws.max_column
    style_range(ws, data_start, data_end, 1, max_col)

    # Monthly stats table
    stat_df = pd.DataFrame(stat_data)
    for row in dataframe_to_rows(stat_df, index=False, header=False):
        ws.append(row)
    stat_start, stat_end = data_end + 1, ws.max_row
    style_range(ws, stat_start, stat_end, 1, max_col)

    # Annual stats
    yearly_series = pd.Series(yearly_values)
    annual_stats = {
        "Annual Average": round(yearly_series.mean(), 2),
        "Annual Maximum": yearly_series.max(),
        "Annual Rainy Days": int((yearly_series > 0).sum()),
        "Annual Rainfall Total": yearly_series.sum(),
    }
    ws.append([])
    for k, v in annual_stats.items():
        ws.append([k, v])
    ann_start, ann_end = ws.max_row - len(annual_stats) + 1, ws.max_row
    style_range(ws, ann_start, ann_end, 1, 2)

    # Save yearly Excel (with station slug)
    excel_out = OUTPUT_DIR / f"rainfall_{year}__{station_slug}.xlsx"
    wb.save(excel_out)
    print(f"[{year}] Excel saved: {excel_out}")

    # =========================
    # Plotting
    # =========================
    # Build long daily frame
    long_frames: List[pd.DataFrame] = []
    for _, col_name in file_info:
        mon = col_name[:3]
        mnum = MONTH_NUM[mon]
        tmp = merged_num[["Date", col_name]].copy()
        tmp[col_name] = pd.to_numeric(tmp[col_name], errors="coerce")
        tmp = tmp.dropna(subset=[col_name])
        tmp["Datetime"] = pd.to_datetime(
            {"year": year, "month": mnum, "day": tmp["Date"].astype(int)},
            errors="coerce"
        )
        tmp = tmp.dropna(subset=["Datetime"]).rename(columns={col_name: "Rainfall"})
        long_frames.append(tmp[["Datetime", "Rainfall"]])

    daily_df = pd.concat(long_frames, ignore_index=True).sort_values("Datetime")

    # Daily plot
    plt.figure(figsize=(14, 5))
    plt.plot(daily_df["Datetime"], daily_df["Rainfall"])
    plt.title(f"Daily Rainfall in {year} — {station_name}")
    plt.xlabel("Date")
    plt.ylabel("Rainfall (mm)")
    ax = plt.gca()
    ax.xaxis.set_major_locator(mdates.MonthLocator())
    ax.xaxis.set_major_formatter(mdates.DateFormatter("%d %b"))
    plt.grid(axis="y", linestyle="--", alpha=0.4)
    plt.gcf().autofmt_xdate()
    plt.tight_layout()
    daily_png = OUTPUT_DIR / f"daily_rainfall_{year}__{station_slug}.png"
    plt.savefig(daily_png, dpi=200)
    plt.close()
    print(f"[{year}] Plot saved: {daily_png}")

    # Monthly max bar
    daily_df["Month"] = daily_df["Datetime"].dt.strftime("%b")
    monthly_max = (
        daily_df.groupby("Month", sort=False)["Rainfall"]
        .max()
        .reindex(MONTH_ORDER)
    )

    plt.figure(figsize=(10, 5))
    plt.bar(monthly_max.index, monthly_max.values)
    plt.title(f"Monthly Maximum Rainfall in {year} — {station_name}")
    plt.xlabel("Month")
    plt.ylabel("Maximum Daily Rainfall (mm)")
    plt.grid(axis="y", linestyle="--", alpha=0.4)
    plt.tight_layout()
    monthly_png = OUTPUT_DIR / f"monthly_max_rainfall_{year}__{station_slug}.png"
    plt.savefig(monthly_png, dpi=200)
    plt.close()
    print(f"[{year}] Plot saved: {monthly_png}")

    out: Dict[str, Union[float, int]] = dict(annual_stats)
    out["Year"] = year
    return out

# =========================
# Cross-year summary
# =========================
def build_annual_summary(all_rows: List[Dict[str, Union[float, int]]]) -> Path | None:
    """
    Save cross-year summary (Excel + annual total plot).
    Also adds the metadata header to the 'Summary' sheet.
    """
    if not all_rows:
        print("[INFO] No yearly outputs generated.")
        return None

    summary_df = pd.DataFrame(all_rows)

    # Ensure "Year" is the first column
    desired_cols = ["Year", "Annual Rainfall Total", "Annual Average", "Annual Maximum", "Annual Rainy Days"]
    for c in summary_df.columns:
        if c not in desired_cols:
            desired_cols.append(c)
    summary_df = summary_df[desired_cols].sort_values("Year")

    # Representative header (from first available input)
    rep_header_df = get_representative_header_df()

    # Determine station name for title & file name
    station_name_for_summary = (
        STATION_NAME or extract_station_name_from_header(rep_header_df) or "Station Unknown"
    )
    station_slug_for_summary = slugify(station_name_for_summary)

    # Prepare workbook & write header + table
    xlsx_out = OUTPUT_DIR / f"annual_summary_{START_YEAR}_{END_YEAR}__{station_slug_for_summary}.xlsx"
    wb = Workbook()
    ws = wb.active
    ws.title = "Summary"

    # Metadata header
    write_header_to_sheet(ws, rep_header_df)
    ws.append([])  # spacer

    # Summary table
    start_row = ws.max_row + 1
    for row in dataframe_to_rows(summary_df, index=False, header=True):
        ws.append(row)
    data_start, data_end, max_col = start_row + 1, ws.max_row, ws.max_column
    style_range(ws, data_start, data_end, 1, max_col)

    # Save summary workbook
    wb.save(xlsx_out)
    print(f"[OK] Summary Excel saved: {xlsx_out}")

    # Annual total plot
    plt.figure(figsize=(10, 5))
    plt.plot(summary_df["Year"], summary_df["Annual Rainfall Total"], marker="o")
    plt.title(f"Annual Rainfall Total {START_YEAR}–{END_YEAR} — {station_name_for_summary}")
    plt.xlabel("Year")
    plt.ylabel("Total Rainfall (mm)")
    ax = plt.gca()
    ax.set_xticks(summary_df["Year"])
    ax.set_xticklabels(summary_df["Year"])
    plt.grid(axis="y", linestyle="--", alpha=0.4)
    plt.tight_layout()
    png_out = OUTPUT_DIR / f"annual_total_{START_YEAR}_{END_YEAR}__{station_slug_for_summary}.png"
    plt.savefig(png_out, dpi=200)
    plt.close()
    print(f"[OK] Summary plot saved: {png_out}")

    return xlsx_out

# =========================
# Monthly max extraction from yearly outputs
# =========================
def extract_max_row_from_workbook(xlsx_path: Path) -> pd.Series:
    """
    Open 'rainfall_YYYY.xlsx' (or rainfall_YYYY__*.xlsx) and return the row labeled 'Maximum'
    for the 12 month columns. Returns a Series indexed by MONTH_NAMES_ID.
    """
    wb = load_workbook(xlsx_path, data_only=True)
    ws = wb.active  # "Rainfall YYYY"

    max_row_vals = None
    for row in ws.iter_rows(values_only=True):
        if row and str(row[0]).strip().lower() == "maximum":
            max_row_vals = row
            break

    if max_row_vals is None:
        raise ValueError(f"Row labeled 'Maximum' not found in {xlsx_path.name}")

    values_12 = pd.to_numeric(pd.Series(list(max_row_vals[1:13])), errors="coerce")
    series = pd.Series(values_12.values, index=MONTH_NAMES_ID)
    return series


def save_dataframe_with_borders(df: pd.DataFrame, out_path: Path) -> None:
    """Write df to Excel and apply thin borders to all cells."""
    df.to_excel(out_path)
    wb = load_workbook(out_path)
    ws = wb.active

    thin = Border(
        left=Side(style="thin"), right=Side(style="thin"),
        top=Side(style="thin"), bottom=Side(style="thin")
    )

    max_row = ws.max_row
    max_col = ws.max_column

    for row in ws.iter_rows(min_row=1, max_row=max_row, min_col=1, max_col=max_col):
        for cell in row:
            cell.border = thin

    wb.save(out_path)

# =========================
# Add "Monthly Max" sheet to annual summary
# =========================
def append_monthly_max_table_to_summary(summary_xlsx_path: Path,
                                        sheet_name: str = "Monthly Max") -> None:
    """
    Append a new sheet to the annual summary workbook that contains:
    Year, Januari..Desember, Maximum Rainfall.
    Also add the metadata header at the top of the sheet.
    """
    # Collect per-year monthly maxima
    rows = []
    for y in range(START_YEAR, END_YEAR + 1):
        # Use yearly workbook with station suffix if present; otherwise fallback
        pattern = f"rainfall_{y}__*.xlsx"
        candidates = list(OUTPUT_DIR.glob(pattern))
        src = candidates[0] if candidates else OUTPUT_DIR / f"rainfall_{y}.xlsx"
        if not src.exists():
            rows.append([y] + [None] * 12 + [None])
            continue
        series = extract_max_row_from_workbook(src)  # index = MONTH_NAMES_ID
        rrmax = float(pd.to_numeric(series, errors="coerce").max())
        rows.append([y] + [series.get(mon, None) for mon in MONTH_NAMES_ID] + [rrmax])

    # Open summary workbook
    wb = load_workbook(summary_xlsx_path)
    if sheet_name in wb.sheetnames:
        wb.remove(wb[sheet_name])  # overwrite if exists
    ws = wb.create_sheet(title=sheet_name)

    # Metadata header (representative)
    rep_header_df = get_representative_header_df()
    write_header_to_sheet(ws, rep_header_df)
    ws.append([])  # spacer

    # Table header & data
    header = ["Year"] + MONTH_NAMES_ID + ["Maximum Rainfall"]
    ws.append(header)
    for r in rows:
        ws.append(r)

    # Table styling
    thin = Border(left=Side(style="thin"), right=Side(style="thin"),
                  top=Side(style="thin"), bottom=Side(style="thin"))
    header_fill = PatternFill("solid", fgColor="DDD9C4")
    max_fill    = PatternFill("solid", fgColor="E6B8B7")
    bold_font   = Font(bold=True)

    max_row = ws.max_row
    max_col = ws.max_column

    # Determine the first row of the table header (after metadata header + spacer)
    table_header_row = 1
    if rep_header_df is not None and not rep_header_df.empty:
        table_header_row = rep_header_df.shape[0] + 2  # +1 spacer → header at +2

    for r in ws.iter_rows(min_row=table_header_row, max_row=max_row, min_col=1, max_col=max_col):
        for c_idx, cell in enumerate(r, start=1):
            cell.border = thin
            if cell.row == table_header_row:  # table header row
                cell.fill = header_fill
                cell.font = bold_font
                cell.alignment = Alignment(horizontal="center", vertical="center", wrap_text=False)
            if c_idx == max_col and cell.row > table_header_row:
                cell.fill = max_fill

    # Column widths
    widths = {
        1: 8,                                  # Year
        **{i + 2: 12 for i in range(12)},      # 12 months
        14: 18                                 # Maximum Rainfall
    }
    for col_idx, width in widths.items():
        ws.column_dimensions[get_column_letter(col_idx)].width = width

    wb.save(summary_xlsx_path)
    print(f"[OK] Sheet '{sheet_name}' added to {summary_xlsx_path.name}")

# =========================
# Main
# =========================
def main() -> None:
    # 1) Process each year
    summary_rows: List[Dict[str, Union[float, int]]] = []
    for year in range(START_YEAR, END_YEAR + 1):
        folder = Path(f"./{INPUT_PREFIX}{year}")
        if not folder.is_dir():
            print(f"[WARN] Skip {year}: folder not found -> {folder}")
            continue
        try:
            summary_rows.append(process_year(year))
        except Exception as e:
            print(f"[ERROR] {year}: {e}")

    # 2) Build cross-year summary (creates the summary workbook & plot) + metadata header
    summary_path = build_annual_summary(summary_rows)
    if summary_path is None:
        return

    # 3) Add the "Monthly Max" sheet (with metadata header on top)
    append_monthly_max_table_to_summary(summary_path, sheet_name="Monthly Max")


if __name__ == "__main__":
    main()


[2015] Excel saved: output_all\rainfall_2015__stasiun-meteorologi-gamar-malamo.xlsx
[2015] Plot saved: output_all\daily_rainfall_2015__stasiun-meteorologi-gamar-malamo.png
[2015] Plot saved: output_all\monthly_max_rainfall_2015__stasiun-meteorologi-gamar-malamo.png
[2016] Excel saved: output_all\rainfall_2016__stasiun-meteorologi-gamar-malamo.xlsx
[2016] Plot saved: output_all\daily_rainfall_2016__stasiun-meteorologi-gamar-malamo.png
[2016] Plot saved: output_all\monthly_max_rainfall_2016__stasiun-meteorologi-gamar-malamo.png
[2017] Excel saved: output_all\rainfall_2017__stasiun-meteorologi-gamar-malamo.xlsx
[2017] Plot saved: output_all\daily_rainfall_2017__stasiun-meteorologi-gamar-malamo.png
[2017] Plot saved: output_all\monthly_max_rainfall_2017__stasiun-meteorologi-gamar-malamo.png
[2018] Excel saved: output_all\rainfall_2018__stasiun-meteorologi-gamar-malamo.xlsx
[2018] Plot saved: output_all\daily_rainfall_2018__stasiun-meteorologi-gamar-malamo.png
[2018] Plot saved: output_all\