In [15]:
import os
from pyspark.sql import SparkSession

spark = (SparkSession.builder
  .appName("WriteToSupabase")
  .master("local[*]")
  .config("spark.jars.packages", "org.postgresql:postgresql:42.7.4")
  .getOrCreate())


csv_path = os.path.expanduser("~/Documents/populate_voyage/query_params.csv")


df = spark.read.csv(str(csv_path), header=False, inferSchema=True)


In [16]:
df.show()

+------+----+--------------------+--------------------+--------------------+--------------------+-------------+--------------+----+--------------+--------------+---------------+----------------+-------------------+------------------+-------------------+--------------------+
|   _c0| _c1|                 _c2|                 _c3|                 _c4|                 _c5|          _c6|           _c7| _c8|           _c9|          _c10|           _c11|            _c12|               _c13|              _c14|               _c15|                _c16|
+------+----+--------------------+--------------------+--------------------+--------------------+-------------+--------------+----+--------------+--------------+---------------+----------------+-------------------+------------------+-------------------+--------------------+
|Header|NULL|voyage_date_voyag...|           voyage_id|voyage_duration_days|voyage_rate_usd_2...|voyage_oh_usd|voyage_hac_usd|NULL|voyage_bcr_usd|voyage_cve_usd|voyage_bnkr_us

In [17]:
from dotenv import load_dotenv
load_dotenv(override=True)

from supabase import create_client

# --- 1️⃣ Connect to Supabase ---
url = os.getenv("SUPABASE_URL")
key = os.getenv("SUPABASE_SERVICE_ROLE_KEY")  # must be service_role key
client = create_client(url, key)

SCHEMA = "voyage"
TABLE = "summary"

print("🔗 Connected to Supabase schema:", SCHEMA, "table:", TABLE)

🔗 Connected to Supabase schema: voyage table: summary


In [18]:
from pyspark.sql.types import (
    StructType, StructField,
    StringType, TimestampType, IntegerType, DecimalType
)

# Full voyage.summary schema (from your DDL)
FULL_SCHEMA = StructType([
    StructField("updated_at",           TimestampType(), True),
    StructField("id",                   IntegerType(),   False),
    StructField("voyage_bcr_usd",       DecimalType(18,2), True),
    StructField("voyage_date_voyagestart", TimestampType(), True),
    StructField("voyage_id",            StringType(),    True),
    StructField("voyage_duration_days", DecimalType(10,4), True),
    StructField("voyage_rate_usd_24hrgross", DecimalType(18,2), True),
    StructField("voyage_oh_usd",        DecimalType(18,2), True),
    StructField("voyage_hac_usd",       DecimalType(18,2), True),
    StructField("voyage_cve_usd",       DecimalType(18,2), True),
    StructField("voyage_bnkr_usd",      DecimalType(18,2), True),
    StructField("voyage_bnkr_desc",     StringType(),    True),
    StructField("voyage_chtr_usd_bal",  DecimalType(18,2), True),
    StructField("voyage_expense_usd",   DecimalType(18,2), True),
    StructField("voyage_expense_desc",  StringType(),    True),
    StructField("voyage_bnkr_usd_deduc",DecimalType(18,2), True),
    StructField("created_by",           StringType(),    True),  # uuid → keep as text
    StructField("created_at",           TimestampType(), True),
    StructField("updated_by",           StringType(),    True),  # uuid → keep as text
])

def schema_for_select_cols(select_cols):
    """Return a StructType with only the selected columns, in the same order."""
    # Map field names to fields for quick lookup
    field_map = {f.name: f for f in FULL_SCHEMA.fields}
    fields = []
    for c in select_cols:
        if c in field_map:
            fields.append(field_map[c])
        else:
            # If user asked for a non-schema column, treat as string
            fields.append(StructField(c, StringType(), True))
    return StructType(fields)


In [None]:
# ========= CONFIG =========
SCHEMA = "voyage"
TABLE  = "summary"
DATE_COL = "voyage_date_voyagestart"

SCHEMA_COLS = {
    "updated_at","id","voyage_bcr_usd","voyage_date_voyagestart","voyage_id",
    "voyage_duration_days","voyage_rate_usd_24hrgross","voyage_oh_usd","voyage_hac_usd",
    "voyage_cve_usd","voyage_bnkr_usd","voyage_bnkr_desc","voyage_chtr_usd_bal",
    "voyage_expense_usd","voyage_expense_desc","voyage_bnkr_usd_deduc","created_by",
    "created_at","updated_by"
}

import re
from datetime import datetime
import pandas as pd
import os

pdf = df.toPandas().fillna("")

# ========= HELPERS =========
def _norm(s: str) -> str:
    s = str(s).strip().casefold()
    s = re.sub(r"\s+", "_", s)
    return re.sub(r"[^0-9a-z_]+", "", s)

def find_marker_row(pdf: pd.DataFrame, marker: str) -> int | None:
    """
    Find the row index where the given marker text appears.
    - 'Header' is expected in column A (first column).
    - 'StartDate', 'EndDate', 'Voyage ID' are expected in column C (third column).
    Returns the matching row index or None if not found.
    """
    # decide which column to search
    marker_lower = marker.strip().casefold()
    if marker_lower == "header":
        col_to_check = pdf.columns[0]  # column A
    else:
        # for StartDate / EndDate / Voyage ID
        col_to_check = pdf.columns[2]  # column C

    target = _norm(marker)
    for i, val in enumerate(pdf[col_to_check]):
        if _norm(val) == target:
            print(f"✅ Found marker '{marker}' in column {col_to_check} at row index {i}")
            return i

    print(f"⚠️ Marker '{marker}' not found in column {col_to_check}.")
    return None



def parse_to_iso(val, *, is_end: bool=False) -> str|None:
    """Return ISO 'YYYY-MM-DDTHH:MM:SS'. Returns None if cannot parse.
       Date-only → 00:00:00 (start) or 23:59:59 (end) for inclusive range."""
    if val is None: return None
    s = str(val).strip()
    if not s or s.upper() == "NULL": return None
    if s.upper().startswith("UTC "): s = s[4:].strip()
    if s.endswith("Z"): s = s[:-1]
    s_space = s.replace("T", " ")
    for fmt in ("%Y-%m-%d %H:%M:%S","%Y-%m-%d %H:%M",
                "%m/%d/%Y %H:%M:%S","%m/%d/%Y %H:%M"):
        try:
            dt = datetime.strptime(s_space, fmt)
            return dt.strftime("%Y-%m-%dT%H:%M:%S")
        except ValueError:
            pass
    for fmt in ("%Y-%m-%d","%m/%d/%Y"):
        try:
            d = datetime.strptime(s_space, fmt)
            return d.strftime("%Y-%m-%dT23:59:59" if is_end else "%Y-%m-%dT00:00:00")
        except ValueError:
            pass
    return None

def header_cols_from_header_row_noheaderpdf(pdf: pd.DataFrame) -> list[str]:
    """With header=False, the 'Header' marker is a DATA row.
       Return the DB columns on that row (excluding the literal 'Header' cell)."""
    r_hdr = find_marker_row(pdf, "Header")
    if r_hdr is None:
        raise ValueError("Could not find a 'Header' row in the CSV (read with header=False).")
    # Keep all non-empty cells in that row, except the marker cell itself
    row_vals = [str(v).strip() for v in pdf.iloc[r_hdr].values if v is not None]
    row_vals = [v for v in row_vals if v and _norm(v) != _norm("Header") and v.upper() != "NULL"]
    # Keep only real DB columns, preserve order
    cols = [c for c in row_vals if c in SCHEMA_COLS]
    if not cols:
        raise ValueError("Header row found but no valid DB columns on that row.")
    return cols, r_hdr

# ========= 1) Locate rows =========
r_hdr = None
header_cols, r_hdr = header_cols_from_header_row_noheaderpdf(pdf)
r_start = find_marker_row(pdf, "StartDate")
r_end   = find_marker_row(pdf, "EndDate")
r_id    = find_marker_row(pdf, "Voyage ID")


print("Header columns (output order):", header_cols)
print("Row indices -> Header:", r_hdr, "StartDate:", r_start, "EndDate:", r_end, "Voyage ID:", r_id)

if r_id is None:
    raise ValueError("Could not find 'Voyage ID' marker row (read with header=False).")

# ========= 2) Build query 'sets' by COLUMN =========
# We will look at each column. For that column:
#   - pick Start/End from the marker rows (cells at r_start, r_end for this column)
#   - gather 1..N voyage_ids from rows BELOW r_id until another marker row or until a blank barrier
col_names = list(pdf.columns)
n_rows = len(pdf)

def is_marker_row(i: int) -> bool:
    if i is None:
        return False
    
    # markers to look for (normalized)
    markers = {"header", "startdate", "enddate", "voyageid"}
    
    # column A = pdf.columns[0], column C = pdf.columns[2]
    val_a = str(pdf.iloc[i][pdf.columns[0]]).strip() if not pd.isna(pdf.iloc[i][pdf.columns[0]]) else ""
    val_c = str(pdf.iloc[i][pdf.columns[2]]).strip() if not pd.isna(pdf.iloc[i][pdf.columns[2]]) else ""
    
    # normalize and compare
    if _norm(val_a) in markers or _norm(val_c) in markers:
        return True
    return False


stop_rows = set(x for x in [r_hdr, r_start, r_end, r_id] if x is not None)

sets = []
for col in col_names:
    # dates for this column
    s_start = parse_to_iso(pdf.iloc[r_start][col], is_end=False) if r_start is not None else None
    s_end   = parse_to_iso(pdf.iloc[r_end][col],   is_end=True)  if r_end   is not None else None

    # voyage ids BELOW the 'Voyage ID' row until next marker row (or max +10 rows as a guard)
    ids = []
    max_probe = min(n_rows, r_id + 11)  # look up to 10 rows below
    for r in range(r_id + 1, max_probe):
        # stop if we hit another marker row
        if r in stop_rows or is_marker_row(r):
            break
        val = pdf.iloc[r][col]
        if val is None: 
            continue
        s_val = str(val).strip()
        if not s_val or s_val.upper() == "NULL":
            continue
        # ignore cells that still say 'Voyage ID'
        if _norm(s_val) == _norm("Voyage ID"):
            continue
        ids.append(s_val)

    # create one set per voyage id (so a single column can have multiple IDs)
    for vid in ids:
        sets.append({"start": s_start, "end": s_end, "id": vid})

print(f"Discovered {len(sets)} set(s):")
for i, s in enumerate(sets[:12], 1):
    print(f"  {i:02d}. voyage_id={s['id']}, start={s['start']}, end={s['end']}")
if len(sets) > 12:
    print(f"  ... (+{len(sets)-12} more)")

# If no sets -> nothing to query
if not sets:
    print("No (voyage_id, date) sets detected under the 'Voyage ID' row. Check your CSV content.")
    pd_df = pd.DataFrame(columns=header_cols)
    out_csv = os.path.expanduser("~/Documents/populate_voyage/db_results.csv")
    pd_df.to_csv(out_csv, index=False, encoding="utf-8-sig")
    print("✅ Wrote empty file with headers only:", out_csv)
else:
    # ========= 3) Build SELECT list for query (add keys for dedupe, but drop them for output) =========
    query_select_cols = header_cols.copy()
    for must in ("voyage_id", DATE_COL):
        if must not in query_select_cols:
            query_select_cols.append(must)

    print("Query will SELECT:", query_select_cols)

    # ========= 4) Run Supabase queries (one per set), UNION results =========
    all_rows = []
    page_size = 2000

    # IMPORTANT: use the actual client you created earlier: `supabase = create_client(...)`

    for i, s in enumerate(sets, 1):
        q = client.schema(SCHEMA).table(TABLE).select(",".join(query_select_cols))
        if s["start"] is not None:
            q = q.gte(DATE_COL, s["start"])   # inclusive lower
        if s["end"] is not None:
            q = q.lte(DATE_COL, s["end"])     # inclusive upper
        q = q.eq("voyage_id", s["id"])

        # optional ordering
        q = q.order(DATE_COL, desc=True, nullsfirst=False)

        page = 0
        while True:
            resp = q.range(page * page_size, page * page_size + page_size - 1).execute()
            batch = resp.data or []
            all_rows.extend(batch)
            if len(batch) < page_size:
                break
            page += 1

    print(f"Fetched raw rows (pre-dedup): {len(all_rows)}")

    # ========= 5) Pandas de-dup + OUTPUT in exact header order =========
    if all_rows:
        pd_df = pd.DataFrame(all_rows)
    else:
        pd_df = pd.DataFrame(columns=query_select_cols)

    # De-dup on unique pair
    if {"voyage_id", DATE_COL}.issubset(pd_df.columns):
        pd_df = pd_df.drop_duplicates(subset=["voyage_id", DATE_COL])
    elif "voyage_id" in pd_df.columns:
        pd_df = pd_df.drop_duplicates(subset=["voyage_id"])
    else:
        pd_df = pd_df.drop_duplicates()

    # Keep ONLY header_cols in EXACT order (even if we fetched keys)
    for c in header_cols:
        if c not in pd_df.columns:
            pd_df[c] = ""
    pd_df = pd_df[header_cols]

    # Write (with temp-rename to avoid Excel locks)
    import tempfile, shutil
    out_csv = os.path.expanduser("~/Documents/populate_voyage/db_results.csv")

    tmp_fd, tmp_path = tempfile.mkstemp(suffix=".csv", dir=os.path.dirname(out_csv))
    os.close(tmp_fd)
    pd_df.to_csv(tmp_path, index=False, encoding="utf-8-sig")
    try:
        shutil.move(tmp_path, out_csv)
        print("✅ Wrote:", out_csv, "| rows:", len(pd_df), "| columns:", list(pd_df.columns))
    except PermissionError:
        alt = out_csv.replace(".csv", "_new.csv")
        shutil.move(tmp_path, alt)
        print(f"⚠️ Excel lock detected — saved as '{alt}' instead.")


✅ Found marker 'Header' in column _c0 at row index 0
✅ Found marker 'StartDate' in column _c2 at row index 1
✅ Found marker 'EndDate' in column _c2 at row index 2
✅ Found marker 'Voyage ID' in column _c2 at row index 3
1 2 3
Header columns (output order): ['voyage_date_voyagestart', 'voyage_id', 'voyage_duration_days', 'voyage_rate_usd_24hrgross', 'voyage_oh_usd', 'voyage_hac_usd', 'voyage_bcr_usd', 'voyage_cve_usd', 'voyage_bnkr_usd', 'voyage_bnkr_desc', 'voyage_chtr_usd_bal', 'voyage_expense_usd', 'voyage_expense_desc', 'voyage_bnkr_usd_deduc']
Row indices -> Header: 0 StartDate: 1 EndDate: 2 Voyage ID: 3
Discovered 0 set(s):
No (voyage_id, date) sets detected under the 'Voyage ID' row. Check your CSV content.
✅ Wrote empty file with headers only: C:\Users\youngw417/Documents/populate_voyage/db_results.csv
