In [None]:
pip install --upgrade snowflake-connector-python

Note: you may need to restart the kernel to use updated packages.


Part 2 tast 1

In [2]:

import os, pandas as pd, numpy as np
from pathlib import Path
import snowflake.connector as sf

BASE_DIR = Path(r"/home/jovyan/MGTA SQL/final project/SQL_FINAL_PROJECT/Data-5/Monthly PO Data").resolve()   # file base on your local path
OUT_CSV  = BASE_DIR / "combined_purchases.csv"

# ---- 1) Combine csv file
if not OUT_CSV.exists():
    TARGET_COLS = ["PurchaseOrderID","PurchaseOrderLineID","ReceivedOuters",
                   "ExpectedUnitPricePerOuter","OrderDate","SupplierID"]
    ALIASES = {
        "PurchaseOrderID":           ["purchaseorderid","purchase_order_id","poid","orderid"],
        "PurchaseOrderLineID":       ["purchaseorderlineid","purchase_order_line_id","polineid","orderlineid"],
        "ReceivedOuters":            ["receivedouters","received_outers","receivedoutersqty","receivedoutersquantity","received_qty"],
        "ExpectedUnitPricePerOuter": ["expectedunitpriceperouter","expected_unit_price_per_outer","unitpriceperouter","expectedprice","unitprice"],
        "OrderDate":                 ["orderdate","order_date","date","order_dt"],
        "SupplierID":                ["supplierid","supplier_id","vendorid","vendor_id"],
    }
    def norm(s): return "".join(ch.lower() for ch in s if ch.isalnum())
    def pick_and_rename(df):
        m = {c: norm(c) for c in df.columns}
        rev = {}
        for k,v in m.items():
            if v not in rev: rev[v]=k
        sel = {}
        miss=[]
        for tgt, aliases in ALIASES.items():
            found=None
            for a in aliases:
                if a in rev: found=rev[a]; break
            if not found: miss.append(tgt)
            else: sel[tgt]=found
        if miss: raise ValueError(f"缺少必须列: {miss}; 文件列={list(df.columns)}")
        out = df[[sel[c] for c in TARGET_COLS]].copy()
        out.columns = TARGET_COLS
       
        for c in ["ReceivedOuters","ExpectedUnitPricePerOuter"]:
            out[c] = pd.to_numeric(out[c], errors="coerce")
      
        out["OrderDate"] = pd.to_datetime(out["OrderDate"], errors="coerce", format="%m/%d/%Y")
        out = out.dropna(subset=["PurchaseOrderID","PurchaseOrderLineID","OrderDate"])
        out["ReceivedOuters"] = out["ReceivedOuters"].fillna(0)
        out["ExpectedUnitPricePerOuter"] = out["ExpectedUnitPricePerOuter"].fillna(0)
        return out

    frames=[]
    files = sorted(BASE_DIR.glob("*.csv"))
    if not files: raise FileNotFoundError(f"{BASE_DIR} 下没有 .csv")
    for p in files:
        df = pd.read_csv(p, dtype=str, keep_default_na=False, na_values=["","NULL"])
        frames.append(pick_and_rename(df))
    combined = pd.concat(frames, ignore_index=True)
    combined.to_csv(OUT_CSV, index=False)
    print(f"已生成本地整合：{OUT_CSV}  行数={len(combined):,}")

# ---- 2) To Snowflake ----
conn = sf.connect(
    user=os.getenv("SNOW_USER", "ETHANAN2000"),
    password=os.getenv("SNOW_PASSWORD", "An67087833@123"),
    account=os.getenv("SNOW_ACCOUNT", "svogymj-bxb71103") 
)
cs = conn.cursor()

## change these line under to choose where we save the data to
WH, DB, SC = "ETL_WH","ETL_DB","ETL_SCHEMA"
cs.execute(f"CREATE WAREHOUSE IF NOT EXISTS {WH} WAREHOUSE_SIZE=SMALL AUTO_SUSPEND=60 AUTO_RESUME=TRUE")
cs.execute(f"CREATE DATABASE  IF NOT EXISTS {DB}")
cs.execute(f"CREATE SCHEMA    IF NOT EXISTS {DB}.{SC}")
cs.execute(f"USE WAREHOUSE {WH}"); cs.execute(f"USE DATABASE {DB}"); cs.execute(f"USE SCHEMA {SC}")

cs.execute("CREATE STAGE IF NOT EXISTS purchases_stage")
cs.execute("""
CREATE OR REPLACE FILE FORMAT csv_ff
  TYPE=CSV
  FIELD_OPTIONALLY_ENCLOSED_BY='\"'
  PARSE_HEADER=TRUE
  NULL_IF=('','NULL')
  TRIM_SPACE=TRUE
""")

# Combine file
cs.execute(f"PUT 'file:///{OUT_CSV.as_posix()}' @purchases_stage AUTO_COMPRESS=TRUE OVERWRITE=TRUE")

staged_name = OUT_CSV.name + ".gz" 
copy_sql = f"""
COPY INTO purchases_detail
FROM @purchases_stage/{staged_name}
FILE_FORMAT=(FORMAT_NAME='csv_ff')
MATCH_BY_COLUMN_NAME=CASE_INSENSITIVE
ON_ERROR='ABORT_STATEMENT'
"""
cs.execute("""
CREATE OR REPLACE TABLE purchases_detail (
  PurchaseOrderID           STRING,
  PurchaseOrderLineID       STRING,
  ReceivedOuters            NUMBER(18,4),
  ExpectedUnitPricePerOuter NUMBER(18,4),
  OrderDate                 DATE,
  SupplierID                STRING
)
""")
cs.execute("ALTER SESSION SET DATE_INPUT_FORMAT='AUTO'")
cs.execute(copy_sql)

cs.execute("SELECT COUNT(*) FROM purchases_detail")
print("Snowflake line:", cs.fetchone()[0])
cs.execute("SELECT * FROM purchases_detail ORDER BY OrderDate, PurchaseOrderID LIMIT 5")
for r in cs.fetchall(): print(r)

cs.close(); conn.close()

Snowflake line: 204575
('1', '1', Decimal('18.0000'), Decimal('5.5000'), datetime.date(2019, 1, 1), '2')
('1', '2', Decimal('21.0000'), Decimal('5.5000'), datetime.date(2019, 1, 1), '2')
('1', '3', Decimal('18.0000'), Decimal('5.5000'), datetime.date(2019, 1, 1), '2')
('1', '2', Decimal('21.0000'), Decimal('5.5000'), datetime.date(2019, 1, 1), '2')
('1', '1', Decimal('18.0000'), Decimal('5.5000'), datetime.date(2019, 1, 1), '2')


part 2 tast 2

In [13]:
# —— 牢靠版：重连 + 上下文一次性设置 + 临时游标 —— 
import os
import snowflake.connector as sf

ROLE = os.getenv("SNOW_ROLE", "ACCOUNTADMIN")
WH   = os.getenv("SNOW_WH",   "ETL_WH")
DB   = os.getenv("SNOW_DB",   "ETL_DB")
SC   = os.getenv("SNOW_SCHEMA","ETL_SCHEMA")

# 1) 强制建立一个新的连接：把上下文直接放在 connect 里；开启 keep_alive
conn = sf.connect(
    user=os.getenv("SNOW_USER", "ETHANAN2000"),
    password=os.getenv("SNOW_PASSWORD", "An67087833@123"),
    account=os.getenv("SNOW_ACCOUNT", "svogymj-bxb71103"), 
    role=ROLE,
    warehouse=WH,
    database=DB,
    schema=SC,
    client_session_keep_alive=True,
)

# 2) 在同一个 with 里完成所有 SQL；退出 with 后游标会自动关闭
with conn.cursor() as cs:
    # 行金额视图
    cs.execute("""
    CREATE OR REPLACE VIEW purchases_line AS
    SELECT
      PurchaseOrderID,
      PurchaseOrderLineID,
      OrderDate,
      SupplierID,
      (ReceivedOuters * ExpectedUnitPricePerOuter) AS line_amount
    FROM purchases_detail
    """)

    # 订单级金额表
    cs.execute("""
    CREATE OR REPLACE TABLE purchase_order_totals AS
    SELECT
      PurchaseOrderID,
      MIN(OrderDate) AS OrderDate,
      ANY_VALUE(SupplierID) AS SupplierID,
      SUM(line_amount) AS POAmount
    FROM purchases_line
    GROUP BY PurchaseOrderID
    """)

    # 校验
    cs.execute("SELECT COUNT(*) FROM purchase_order_totals")
    print("purchase_order_totals rows:", cs.fetchone()[0])

# 可选：保持连接给后续步骤用；如果不需要就 conn.close()
# conn.close()




purchase_order_totals rows: 2025


part 2 tast 3

In [17]:
# ==== Task 3：发票 XML 全流程（PUT → 原始装载 → 解析成表）====

from pathlib import Path
import os
import snowflake.connector as sf

# 0) 连接（如果你上面已有 conn，就会复用）
try:
    conn
except NameError:
    conn = sf.connect(
        user=os.getenv("SNOW_USER","<your_user>"),
        password=os.getenv("SNOW_PASSWORD","<your_password>"),
        account=os.getenv("SNOW_ACCOUNT","svogymj-bxb71103"),
        role=os.getenv("SNOW_ROLE","ACCOUNTADMIN"),
        warehouse=os.getenv("SNOW_WH","ETL_WH"),
        database=os.getenv("SNOW_DB","ETL_DB"),
        schema=os.getenv("SNOW_SCHEMA","ETL_SCHEMA"),
        client_session_keep_alive=True,
    )

# 1) 定位 XML 文件（优先你的工作区路径，找不到则回退 /mnt/data）
candidates = [
    Path("/home/jovyan/MGTA SQL/final project/SQL_FINAL_PROJECT/Data-5/Supplier Transactions XML.xml"),
    Path("/mnt/data/Supplier Transactions XML.xml"),
]
xml_path = next((p for p in candidates if p.exists()), None)
if not xml_path:
    raise FileNotFoundError("找不到 XML 文件。请确认路径是否正确，或把文件上传到 /mnt/data 后重试。")
print("将上传的 XML 文件：", xml_path.as_posix())

with conn.cursor() as cs:
    # 2) Stage & 文件格式（幂等）
    cs.execute("CREATE STAGE IF NOT EXISTS invoices_stage")
    cs.execute("CREATE OR REPLACE FILE FORMAT xml_ff TYPE=XML STRIP_OUTER_ELEMENT=TRUE")

    # 3) PUT（覆盖上传）
    cs.execute(f"PUT 'file:///{xml_path.as_posix()}' @invoices_stage AUTO_COMPRESS=TRUE OVERWRITE=TRUE")
    # 简短确认
    cs.execute("LIST @invoices_stage")
    print("Stage中文件（前几项）：", [r[0] for r in cs.fetchall()[:3]])

    # 4) 原始装载到 VARIANT 表（幂等）
    cs.execute("CREATE OR REPLACE TABLE invoices_raw (v VARIANT)")
    cs.execute("""
    COPY INTO invoices_raw
    FROM @invoices_stage
    FILE_FORMAT=(FORMAT_NAME='xml_ff')
    ON_ERROR='ABORT_STATEMENT'
    """)
    cs.execute("SELECT COUNT(*) FROM invoices_raw")
    print("invoices_raw rows:", cs.fetchone()[0])

    # 5) 解析为结构化 invoices 表
    # - 兼容两种XML形态：单个对象 或 数组/重复元素
    # - 使用 COALESCE + 多种日期格式兜底
    cs.execute("""
    CREATE OR REPLACE TABLE invoices AS
    WITH flat AS (
      SELECT
        r.value                           AS obj
      FROM invoices_raw,
           LATERAL FLATTEN(
             input => CASE WHEN TYPEOF(v)='ARRAY' THEN v ELSE ARRAY_CONSTRUCT(v) END
           ) AS r
    )
    SELECT
      obj:SupplierInvoiceNumber::string                 AS InvoiceNumber,
      obj:PurchaseOrderID::string                       AS PurchaseOrderID,
      COALESCE(
        TRY_TO_DATE(obj:TransactionDate::string,'YYYY-MM-DD'),
        TRY_TO_DATE(obj:TransactionDate::string,'MM/DD/YYYY'),
        TRY_TO_DATE(obj:TransactionDate::string,'M/D/YYYY'),
        TRY_TO_DATE(obj:TransactionDate::string)
      )                                                 AS InvoiceDate,
      obj:AmountExcludingTax::number(18,4)              AS AmountExcludingTax,
      obj:SupplierID::string                            AS SupplierID
    FROM flat
    WHERE obj:PurchaseOrderID IS NOT NULL
    """)
    # 6) 校验
    cs.execute("SELECT COUNT(*) FROM invoices")
    print("invoices rows:", cs.fetchone()[0])

    cs.execute("""
      SELECT * FROM invoices
      ORDER BY InvoiceDate, PurchaseOrderID
      LIMIT 5
    """)
    sample = cs.fetchall()
    print("invoices 样例：")
    for r in sample:
        print(r)



将上传的 XML 文件： /home/jovyan/MGTA SQL/final project/SQL_FINAL_PROJECT/Data-5/Supplier Transactions XML.xml
Stage中文件（前几项）： ['invoices_stage/Supplier Transactions XML.xml.gz']
invoices_raw rows: 2438
invoices rows: 2438
invoices 样例：
('5', '3', datetime.date(1970, 1, 1), Decimal('7.0000'), '1')
('5', '3', datetime.date(1970, 1, 1), Decimal('7.0000'), '1')
('5', '3', datetime.date(1970, 1, 1), Decimal('7.0000'), '1')
('5', '3', datetime.date(1970, 1, 1), Decimal('7.0000'), '1')
('5', '3', datetime.date(1970, 1, 1), Decimal('7.0000'), '1')


part 2 tast 4

In [18]:
# ==== Task 4：采购 × 发票合并，产出差额 ====
import os
import snowflake.connector as sf

# 复用现有连接；如果不存在就连一次
try:
    conn
except NameError:
    conn = sf.connect(
        user=os.getenv("SNOW_USER","<your_user>"),
        password=os.getenv("SNOW_PASSWORD","<your_password>"),
        account=os.getenv("SNOW_ACCOUNT","svogymj-bxb71103"),
        role=os.getenv("SNOW_ROLE","ACCOUNTADMIN"),
        warehouse=os.getenv("SNOW_WH","ETL_WH"),
        database=os.getenv("SNOW_DB","ETL_DB"),
        schema=os.getenv("SNOW_SCHEMA","ETL_SCHEMA"),
        client_session_keep_alive=True,
    )

DB = os.getenv("SNOW_DB","ETL_DB")
SC = os.getenv("SNOW_SCHEMA","ETL_SCHEMA")

core_sql = f"""
SELECT
  p.PurchaseOrderID,
  p.OrderDate,                 -- 交易日期（若想用发票日期，改成 i.InvoiceDate）
  p.SupplierID,
  p.POAmount,
  i.InvoiceNumber,
  i.AmountExcludingTax,
  i.InvoiceDate,
  (i.AmountExcludingTax - p.POAmount) AS invoiced_vs_quoted
FROM {DB}.{SC}.purchase_order_totals p
JOIN {DB}.{SC}.invoices i
  ON i.PurchaseOrderID = p.PurchaseOrderID
WHERE i.InvoiceDate IS NOT NULL       -- 若想保留 1970-01-01 或空日期，请删除该行
"""

with conn.cursor() as cs:
    # 先清理，幂等
    cs.execute(f"DROP MATERIALIZED VIEW IF EXISTS {DB}.{SC}.purchase_orders_and_invoices")
    cs.execute(f"DROP TABLE IF EXISTS {DB}.{SC}.purchase_orders_and_invoices")

    # 优先建物化视图；不支持就退回表
    try:
        cs.execute(f"CREATE MATERIALIZED VIEW {DB}.{SC}.purchase_orders_and_invoices AS {core_sql}")
        obj = "MVIEW"
    except Exception as e:
        # 某些权限/版本限制下 MVIEW 不可用
        cs.execute(f"CREATE OR REPLACE TABLE {DB}.{SC}.purchase_orders_and_invoices AS {core_sql}")
        obj = "TABLE"

    # 校验
    cs.execute(f"SELECT COUNT(*) FROM {DB}.{SC}.purchase_orders_and_invoices")
    total = cs.fetchone()[0]
    print(f"purchase_orders_and_invoices ({obj}) rows:", total)

    cs.execute(f"""
        SELECT *
        FROM {DB}.{SC}.purchase_orders_and_invoices
        ORDER BY OrderDate, PurchaseOrderID
        LIMIT 5
    """)
    for r in cs.fetchall():
        print(r)



purchase_orders_and_invoices (TABLE) rows: 2438
('3', datetime.date(2019, 1, 1), '5', Decimal('76734.00000000'), '5', Decimal('7.0000'), datetime.date(1970, 1, 1), Decimal('-76727.00000000'))
('3', datetime.date(2019, 1, 1), '5', Decimal('76734.00000000'), '5', Decimal('7.0000'), datetime.date(1970, 1, 1), Decimal('-76727.00000000'))
('3', datetime.date(2019, 1, 1), '5', Decimal('76734.00000000'), '5', Decimal('7.0000'), datetime.date(1970, 1, 1), Decimal('-76727.00000000'))
('3', datetime.date(2019, 1, 1), '5', Decimal('76734.00000000'), '5', Decimal('7.0000'), datetime.date(1970, 1, 1), Decimal('-76727.00000000'))
('3', datetime.date(2019, 1, 1), '5', Decimal('76734.00000000'), '5', Decimal('7.0000'), datetime.date(1970, 1, 1), Decimal('-76727.00000000'))


part 2 tast 5

In [24]:
PG = dict(
    host="localhost",
    port=55432,           # 一定要写 55432
    dbname="postgres",
    user="postgres",
    password="pgpwd123"   # 就是你 docker run 里设置的 POSTGRES_PASSWORD
)

In [26]:
import psycopg2, time

PG = dict(
    host="host.docker.internal",  # 👈 关键：改这里
    port=55432,                   # 你映射的端口
    dbname="postgres",
    user="postgres",
    password="pgpwd123",
)

deadline = time.time() + 15
err = None
while time.time() < deadline:
    try:
        with psycopg2.connect(**PG) as conn, conn.cursor() as cur:
            cur.execute("select version()")
            print("✅ connected:", cur.fetchone()[0])
            err = None
            break
    except Exception as e:
        err = e
        time.sleep(1)

if err:
    raise err

✅ connected: PostgreSQL 15.14 (Debian 15.14-1.pgdg13+1) on aarch64-unknown-linux-gnu, compiled by gcc (Debian 14.2.0-19) 14.2.0, 64-bit


In [44]:
with sf_conn.cursor() as cs:
    cs.execute(f"DESC TABLE {DB}.{SC}.supplier_case")
    print([row[0] for row in cs.fetchall()])  # 第一列是列名

['supplierid', 'suppliername', 'suppliercategoryid', 'primarycontactpersonid', 'alternatecontactpersonid', 'deliverymethodid', 'postalcityid', 'supplierreference', 'bankaccountname', 'bankaccountbranch', 'bankaccountcode', 'bankaccountnumber', 'bankinternationalcode', 'paymentdays', 'internalcomments', 'phonenumber', 'faxnumber', 'websiteurl', 'deliveryaddressline1', 'deliveryaddressline2', 'deliverypostalcode', 'deliverylocation', 'postaladdressline1', 'postaladdressline2', 'postalpostalcode', 'lasteditedby', 'validfrom', 'validto']


In [46]:
# ==== Task 5 最终版：PG → CSV → Snowflake → supplier_basic ====
import os, time
from pathlib import Path
import pandas as pd
import psycopg2
import snowflake.connector as sf

# -----------------------------
# 1) 连接 Docker 里的 Postgres
# -----------------------------
PG = dict(
    host="host.docker.internal",
    port=55432,
    dbname="postgres",
    user="postgres",
    password="pgpwd123",
)

# -----------------------------
# 2) 找到 .pgsql 脚本
# -----------------------------
pgsql_candidates = [
    Path("/home/jovyan/MGTA SQL/final project/SQL_FINAL_PROJECT/Data-5/supplier_case.pgsql"),
    Path("/mnt/data/supplier_case.pgsql"),
]
pgsql_file = next((p for p in pgsql_candidates if p.exists()), None)
if not pgsql_file:
    raise FileNotFoundError("找不到 supplier_case.pgsql，请确认路径。")
print("将执行脚本：", pgsql_file.as_posix())

# -----------------------------
# 3) 等待 PG 就绪（最多 30 秒）
# -----------------------------
deadline = time.time() + 30
while time.time() < deadline:
    try:
        with psycopg2.connect(**PG) as _:
            break
    except Exception:
        time.sleep(1)
else:
    raise RuntimeError("等待 Postgres 超时，请确认容器已启动并监听 55432 端口。")
print("✅ Postgres 就绪")

# ---------------------------------------------------
# 4) 在 PG 执行 .pgsql（创建并填充 supplier_case）
# ---------------------------------------------------
with psycopg2.connect(**PG) as pgconn, pgconn.cursor() as cur, open(pgsql_file, "r", encoding="utf-8") as f:
    cur.execute(f.read())
    pgconn.commit()
print("✅ 已在 Postgres 执行 supplier_case.pgsql")

# ---------------------------------------------------
# 5) 从 PG 导出 CSV 到项目目录（自动 mkdir）
# ---------------------------------------------------
proj_dir = Path("/home/jovyan/MGTA SQL/final project/SQL_FINAL_PROJECT/Data-5")
proj_dir.mkdir(parents=True, exist_ok=True)
csv_out = proj_dir / "supplier_case.csv"

with psycopg2.connect(**PG) as pgconn, pgconn.cursor() as cur, open(csv_out, "w", encoding="utf-8") as f:
    cur.copy_expert("COPY (SELECT * FROM supplier_case) TO STDOUT WITH CSV HEADER", f)
print("✅ 已导出 CSV：", csv_out.as_posix())

# 读 CSV（用于建表与视图）
df = pd.read_csv(csv_out)
print("CSV 行数：", len(df))
print("CSV 列：", list(df.columns))

# ---------------------------------------------------
# 6) 连接 Snowflake（新建独立连接，避免与 PG 混淆）
# ---------------------------------------------------
sf_conn = sf.connect(
    user=os.getenv("SNOW_USER","Ethanan2000"),
    password=os.getenv("SNOW_PASSWORD","An67087833@123"),
    account=os.getenv("SNOW_ACCOUNT","svogymj-bxb71103"),
    role=os.getenv("SNOW_ROLE","ACCOUNTADMIN"),
    warehouse=os.getenv("SNOW_WH","ETL_WH"),
    database=os.getenv("SNOW_DB","ETL_DB"),
    schema=os.getenv("SNOW_SCHEMA","ETL_SCHEMA"),
    client_session_keep_alive=True,
)
DB = os.getenv("SNOW_DB","ETL_DB")
SC = os.getenv("SNOW_SCHEMA","ETL_SCHEMA")

with sf_conn.cursor() as cs:
    cs.execute("SELECT CURRENT_VERSION()")
    print("✅ Snowflake version:", cs.fetchone()[0])

# ---------------------------------------------------
# 7) 推断列类型 + 识别 SupplierID/邮编列（使用你的真实列名）
# ---------------------------------------------------
tmap = {
    "int64": "NUMBER",
    "float64": "FLOAT",
    "object": "STRING",
    "bool": "BOOLEAN",
    "datetime64[ms]": "TIMESTAMP_NTZ",
    "datetime64[ns]": "TIMESTAMP_NTZ",
}
cols_def = ",\n  ".join([f'"{c}" {tmap.get(str(t), "STRING")}' for c,t in df.dtypes.items()])

# 你的 CSV 确认有这两列：
sup_id_col = next((c for c in df.columns if c.lower()=="supplierid"), None)
zip_col     = next((c for c in df.columns if c.lower() in {
    "postalpostalcode","postal_code","postalcode","zip","zipcode","zip_code",
    "supplierpostalcode","postalcityid"
}), None)

if not sup_id_col:
    raise RuntimeError(f"CSV 中缺少 SupplierID 列。当前列={list(df.columns)}")
if not zip_col:
    raise RuntimeError(f"CSV 中无法识别邮编列。当前列={list(df.columns)}")

print(f"识别列映射：SupplierID -> {sup_id_col} ，PostalPostalCode -> {zip_col}")

# ---------------------------------------------------
# 8) STAGE / FILE FORMAT / PUT / COPY / VIEW
# ---------------------------------------------------
full_tbl = f"{DB}.{SC}.supplier_case"

with sf_conn.cursor() as cs:
    cs.execute("CREATE STAGE IF NOT EXISTS supplier_stage")
    cs.execute("""
      CREATE OR REPLACE FILE FORMAT csv_ff_sc
      TYPE=CSV
      FIELD_OPTIONALLY_ENCLOSED_BY='\"'
      PARSE_HEADER=TRUE
      NULL_IF=('','NULL')
      TRIM_SPACE=TRUE
    """)

    cs.execute(f"CREATE OR REPLACE TABLE {full_tbl} (\n  {cols_def}\n)")

    # PUT：路径包含空格，使用 file:/// + 引号
    cs.execute(f"PUT 'file:///{csv_out.as_posix()}' @supplier_stage AUTO_COMPRESS=TRUE OVERWRITE=TRUE")

    # COPY：PARSE_HEADER=TRUE 配套 MATCH_BY_COLUMN_NAME
    cs.execute(f"""
      COPY INTO {full_tbl}
      FROM @supplier_stage
      FILE_FORMAT = (FORMAT_NAME = 'csv_ff_sc')
      MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE
      ON_ERROR = 'ABORT_STATEMENT'
    """)

    cs.execute(f"SELECT COUNT(*) FROM {full_tbl}")
    print("✅ Snowflake.supplier_case 行数：", cs.fetchone()[0])

    # 视图里对实际列名加双引号，避免大小写问题
    cs.execute(f"""
      CREATE OR REPLACE VIEW {DB}.{SC}.supplier_basic AS
      SELECT
        "{sup_id_col}" AS SupplierID,
        "{zip_col}"    AS PostalPostalCode
      FROM {DB}.{SC}.supplier_case
    """)
    cs.execute(f"SELECT COUNT(*) FROM {DB}.{SC}.supplier_basic")
    print("✅ supplier_basic 行数：", cs.fetchone()[0])

    cs.execute(f"SELECT * FROM {DB}.{SC}.supplier_basic LIMIT 5")
    print("样例：")
    for r in cs.fetchall():
        print(r)

将执行脚本： /home/jovyan/MGTA SQL/final project/SQL_FINAL_PROJECT/Data-5/supplier_case.pgsql
✅ Postgres 就绪
✅ 已在 Postgres 执行 supplier_case.pgsql
✅ 已导出 CSV： /home/jovyan/MGTA SQL/final project/SQL_FINAL_PROJECT/Data-5/supplier_case.csv
CSV 行数： 13
CSV 列： ['supplierid', 'suppliername', 'suppliercategoryid', 'primarycontactpersonid', 'alternatecontactpersonid', 'deliverymethodid', 'postalcityid', 'supplierreference', 'bankaccountname', 'bankaccountbranch', 'bankaccountcode', 'bankaccountnumber', 'bankinternationalcode', 'paymentdays', 'internalcomments', 'phonenumber', 'faxnumber', 'websiteurl', 'deliveryaddressline1', 'deliveryaddressline2', 'deliverypostalcode', 'deliverylocation', 'postaladdressline1', 'postaladdressline2', 'postalpostalcode', 'lasteditedby', 'validfrom', 'validto']
✅ Snowflake version: 9.27.0
识别列映射：SupplierID -> supplierid ，PostalPostalCode -> postalcityid
✅ Snowflake.supplier_case 行数： 13
✅ supplier_basic 行数： 13
样例：
(1, 22202)
(2, 80125)
(3, 60523)
(4, 95642)
(5, 80125)


part 2 tast 6

In [48]:
# ==== Task 6（Marketplace 版）：ZIP → 最近站点 → 日最高温 TMAX ====
import os
from pathlib import Path
import snowflake.connector as sf

# ---------- 0) Snowflake 连接 ----------
try:
    sf_conn
except NameError:
    sf_conn = sf.connect(
        user=os.getenv("SNOW_USER","<your_user>"),
        password=os.getenv("SNOW_PASSWORD","<your_password>"),
        account=os.getenv("SNOW_ACCOUNT","svogymj-bxb71103"),
        role=os.getenv("SNOW_ROLE","ACCOUNTADMIN"),
        warehouse=os.getenv("SNOW_WH","ETL_WH"),
        database=os.getenv("SNOW_DB","ETL_DB"),
        schema=os.getenv("SNOW_SCHEMA","ETL_SCHEMA"),
        client_session_keep_alive=True,
    )

DB = os.getenv("SNOW_DB","ETL_DB")
SC = os.getenv("SNOW_SCHEMA","ETL_SCHEMA")

IMPORTED_DB = "WEATHER__ENVIRONMENT"     # 你订阅到的 Imported DB
PROVIDER_SCH = "CYBERSYN"                 # Provider 的 schema

STATION_TBL = f"{IMPORTED_DB}.{PROVIDER_SCH}.NOAA_WEATHER_STATION_INDEX"
TS_TBL      = f"{IMPORTED_DB}.{PROVIDER_SCH}.NOAA_WEATHER_METRICS_TIMESERIES"

DATA_DIR = Path("/home/jovyan/MGTA SQL/final project/SQL_FINAL_PROJECT/Data-5")
gaz_file = DATA_DIR / "2021_Gaz_zcta_national.txt"
if not gaz_file.exists():
    raise FileNotFoundError("未找到 2021_Gaz_zcta_national.txt，请放到 Data-5 目录。")

with sf_conn.cursor() as cs:
    cs.execute("SELECT CURRENT_VERSION()")
    print("✅ Snowflake:", cs.fetchone()[0])

# ---------- 1) Stage + File Format + PUT（Gazetteer TSV） ----------
with sf_conn.cursor() as cs:
    cs.execute("CREATE STAGE IF NOT EXISTS weather_stage")
    cs.execute("""
      CREATE OR REPLACE FILE FORMAT tsv_ff
      TYPE=CSV FIELD_DELIMITER='\t' SKIP_HEADER=1 NULL_IF=('','NULL') TRIM_SPACE=TRUE
    """)
    cs.execute(f"PUT 'file:///{gaz_file.as_posix()}' @weather_stage OVERWRITE=TRUE AUTO_COMPRESS=TRUE")
    print("✅ 已 PUT Gazetteer 到 @weather_stage")

# ---------- 2) ZIP → 经纬度（zip_locations） ----------
with sf_conn.cursor() as cs:
    # 原始三列：GEOID INTPTLAT INTPTLONG
    cs.execute(f"""
      CREATE OR REPLACE TEMP TABLE {DB}.{SC}.zip_gaz_raw AS
      SELECT $1::string AS GEOID, $2::string AS INTPTLAT, $3::string AS INTPTLONG
      FROM @weather_stage/{gaz_file.name} (FILE_FORMAT => 'tsv_ff')
    """)
    cs.execute(f"""
      CREATE OR REPLACE TABLE {DB}.{SC}.zip_locations AS
      SELECT
        GEOID::string AS zip,
        TRY_TO_DOUBLE(INTPTLAT)  AS lat,
        TRY_TO_DOUBLE(INTPTLONG) AS lon,
        TO_GEOGRAPHY(ST_MAKEPOINT(TRY_TO_DOUBLE(INTPTLONG), TRY_TO_DOUBLE(INTPTLAT))) AS geog
      FROM {DB}.{SC}.zip_gaz_raw
      WHERE LENGTH(GEOID)=5
        AND TRY_TO_DOUBLE(INTPTLAT)  IS NOT NULL
        AND TRY_TO_DOUBLE(INTPTLONG) IS NOT NULL
    """)
    cs.execute(f"SELECT COUNT(*) FROM {DB}.{SC}.zip_locations")
    print("✅ zip_locations 行数：", cs.fetchone()[0])

# ---------- 3) 站点索引（Marketplace） ----------
with sf_conn.cursor() as cs:
    # 站点索引通常含：STATION_ID / LATITUDE / LONGITUDE
    cs.execute(f"""
      CREATE OR REPLACE VIEW {DB}.{SC}.weather_stations AS
      SELECT
        station_id::string AS station_id,
        latitude::float    AS latitude,
        longitude::float   AS longitude,
        TO_GEOGRAPHY(ST_MAKEPOINT(longitude::float, latitude::float)) AS geog
      FROM {STATION_TBL}
      WHERE latitude IS NOT NULL AND longitude IS NOT NULL
    """)
    cs.execute(f"SELECT COUNT(*) FROM {DB}.{SC}.weather_stations")
    print("✅ weather_stations 行数：", cs.fetchone()[0])

# ---------- 4) 逐日 TMAX（Marketplace） ----------
with sf_conn.cursor() as cs:
    # Timeseries 通常含：STATION_ID / DATE / METRIC / VALUE / UNIT
    # 这里筛选 METRIC='TMAX'，并将可能的 0.1℃ 标度标准化为 ℃（>200 则 /10）
    cs.execute(f"""
      CREATE OR REPLACE VIEW {DB}.{SC}.weather_tmax AS
      SELECT
        station_id::string                                     AS station_id,
        TO_DATE(date)                                          AS weather_date,
        CASE
          WHEN value IS NULL THEN NULL
          WHEN ABS(value::float) > 200 THEN (value::float)/10  -- NOAA 常见刻度：0.1℃ → ℃
          ELSE value::float
        END                                                    AS tmax_value
      FROM {TS_TBL}
      WHERE UPPER(metric) = 'TMAX'
    """)
    cs.execute(f"SELECT COUNT(*) FROM {DB}.{SC}.weather_tmax")
    print("✅ weather_tmax 行数：", cs.fetchone()[0])

# ---------- 5) ZIP → 最近站点 ----------
with sf_conn.cursor() as cs:
    # supplier_basic 的 ZIP 映射到坐标
    cs.execute(f"""
      CREATE OR REPLACE VIEW {DB}.{SC}.supplier_zip_points AS
      SELECT
        s.PostalPostalCode AS zip,
        z.lat, z.lon, z.geog
      FROM {DB}.{SC}.supplier_basic s
      JOIN {DB}.{SC}.zip_locations z
        ON z.zip = s.PostalPostalCode
    """)

    # 最近站点（建议加 50km 限制；如数据稀疏可移除 WHERE）
    cs.execute(f"""
      CREATE OR REPLACE VIEW {DB}.{SC}.zip_nearest_station AS
      SELECT
        p.zip,
        w.station_id,
        ST_DISTANCE(p.geog, w.geog) AS distance_m
      FROM {DB}.{SC}.supplier_zip_points p
      JOIN {DB}.{SC}.weather_stations w
      WHERE ST_DISTANCE(p.geog, w.geog) < 50000
      QUALIFY ROW_NUMBER() OVER (PARTITION BY p.zip ORDER BY ST_DISTANCE(p.geog, w.geog)) = 1
    """)
    cs.execute(f"SELECT COUNT(*) FROM {DB}.{SC}.zip_nearest_station")
    print("✅ zip_nearest_station 行数：", cs.fetchone()[0])

# ---------- 6) 产出最终结果 ----------
with sf_conn.cursor() as cs:
    cs.execute(f"""
      CREATE OR REPLACE VIEW {DB}.{SC}.supplier_zip_code_weather AS
      SELECT
        s.PostalPostalCode              AS PostalPostalCode,
        t.weather_date                  AS weather_date,
        t.tmax_value                    AS tmax_value
      FROM {DB}.{SC}.supplier_basic s
      JOIN {DB}.{SC}.zip_nearest_station z
        ON z.zip = s.PostalPostalCode
      JOIN {DB}.{SC}.weather_tmax t
        ON t.station_id = z.station_id
      WHERE t.weather_date IS NOT NULL
    """)
    cs.execute(f"SELECT COUNT(*) FROM {DB}.{SC}.supplier_zip_code_weather")
    print("✅ supplier_zip_code_weather 行数：", cs.fetchone()[0])

    cs.execute(f"""
      SELECT * FROM {DB}.{SC}.supplier_zip_code_weather
      ORDER BY PostalPostalCode, weather_date
      LIMIT 5
    """)
    print("样例：")
    for r in cs.fetchall():
        print(r)

✅ Snowflake: 9.27.0
✅ 已 PUT Gazetteer 到 @weather_stage


ProgrammingError: 100205 (P0000): DML operation to table PYB59698.ETL_DB.ETL_SCHEMA.ZIP_LOCATIONS failed on column GEOG with error: GeoJSON::Point: Invalid Lng/Lat pair: '799292,1.66848e+08'.