### Import thư viện

In [None]:
import os, sys
import pandas as pd
import pymysql
from dotenv import load_dotenv

: 

### Xử lí lỗi UTF-8

In [None]:
# Tránh lỗi in, chuyển thành utf-8
try:
    sys.stdout.reconfigure(encoding="utf-8")
except Exception:
    pass

### Kết nối MySQL

In [None]:
load_dotenv()

def get_env(name: str, default=None, required: bool = False):
    val = os.getenv(name, default)
    if required and (val is None or val == ""):
        raise RuntimeError(f"Missing required env: {name}")
    return val

def to_bool(v) -> bool:
    return str(v).strip().lower() in ("1", "true", "yes", "y", "on")

def to_int(v, default=None) -> int:
    try:
        return int(v)
    except (TypeError, ValueError):
        return default

HOST            = get_env("HOST", required=True)
PORT            = to_int(get_env("PORT", 3306))
USER            = get_env("USER", required=True)
PASSWORD        = get_env("PASSWORD", required=True)
DATABASE        = get_env("DATABASE", required=True)
CHARSET         = get_env("CHARSET", "utf8mb4")
CURSORCLASS_STR = get_env("CURSORCLASS", "DictCursor")
CONNECT_TIMEOUT = to_int(get_env("CONNECT_TIMEOUT", 10))
READ_TIMEOUT    = to_int(get_env("READ_TIMEOUT", 10))
WRITE_TIMEOUT   = to_int(get_env("WRITE_TIMEOUT", 10))
SSL_CA_PATH     = get_env("SSL_CA", None)
AUTOCOMMIT      = to_bool(get_env("AUTOCOMMIT", "false"))

cursorclass_map = {
    "DictCursor": pymysql.cursors.DictCursor,
    "Cursor": pymysql.cursors.Cursor,
    "SSCursor": pymysql.cursors.SSCursor,
    "SSDictCursor": pymysql.cursors.SSDictCursor,
}
cursorclass = cursorclass_map.get(CURSORCLASS_STR, pymysql.cursors.DictCursor)

ssl_args = {"ca": SSL_CA_PATH} if SSL_CA_PATH else None

# === Kết nối MySQL ===
conn = pymysql.connect(
    host=HOST,
    port=PORT,
    user=USER,
    password=PASSWORD,
    database=DATABASE,
    charset=CHARSET,
    cursorclass=cursorclass,
    connect_timeout=CONNECT_TIMEOUT,
    read_timeout=READ_TIMEOUT,
    write_timeout=WRITE_TIMEOUT,
    ssl=ssl_args,
    autocommit=AUTOCOMMIT,
)

# Demo kiểm tra nhanh
with conn.cursor() as cur:
    cur.execute("SELECT 1 AS ok;")
    row = cur.fetchone()
    print("Ping:", row)

conn.close()

### Đọc file CSV

In [None]:
# Đọc CSV chính
CSV_PATH = r"C:\Users\hungn\Downloads\coffee_dabase\Data_coffee.csv"  # đổi theo đường dẫn tùy máy
df = pd.read_csv(CSV_PATH, encoding="utf-8-sig")

# Đọc CSV thị trường
CSV_PATH_MT = r"C:\Users\hungn\Downloads\coffee_dabase\Thi_phan_3_thi_truong_chinh.csv"  # đổi theo đường dẫn tùy máy
mt = pd.read_csv(CSV_PATH_MT, encoding="utf-8-sig")

### Xử lí file CSV Data_coffee

### a. Xử lí tên, kiểu dữ liệu của hàng, cột và melt Year từ cột --> hàng

In [5]:
df = df.rename(columns=lambda c: str(c).strip())

In [6]:
if "Hang_muc" not in df.columns:
    raise SystemExit("CSV thiếu cột Hang_muc")
year_cols = [c for c in df.columns if str(c).isdigit()]
if not year_cols:
    raise SystemExit("Không tìm thấy cột năm (tên cột toàn số).")

long_df = df.melt(id_vars=["Hang_muc"], value_vars=year_cols,
                  var_name="year", value_name="value")


In [7]:
# Ép kiểu (đổi tên & ép số)
long_df["hang_muc"] = long_df["Hang_muc"].astype(str).str.strip()
long_df["year"]     = pd.to_numeric(long_df["year"], errors="coerce").astype("Int64")
long_df["value"]    = pd.to_numeric(long_df["value"], errors="coerce")
long_df["year"]     = long_df["year"].astype(int)

### b. Tạo dataframe cho từng bảng

In [8]:
# a) WEATHER
weather_map = {
    "Nhiet_do_trung_binh(Celcius)": "temperature",
    "Do_am_trung_binh(%)":          "humidity",
    "Tong_luong_mua(mm)":           "rain",
}
w = long_df[long_df["hang_muc"].isin(weather_map.keys())].copy()
w["col"] = w["hang_muc"].map(weather_map)
weather_df = (w.pivot_table(index="year", columns="col", values="value", aggfunc="first")
                .reset_index()
                .rename_axis(None, axis=1))

In [9]:
# b) PRODUCTION
prod_map = {
    "Area (Thousand ha)":                 "area_thousand_ha",
    "San luong ca phe san xuat":          "output_tons",
    "San luong ca phe xuat khau (Thousand tons)": "export_tons",
}
p = long_df[long_df["hang_muc"].isin(prod_map.keys())].copy()
p["col"] = p["hang_muc"].map(prod_map)
production_df = (p.pivot_table(index="year", columns="col", values="value", aggfunc="first")
                   .reset_index()
                   .rename_axis(None, axis=1))

In [24]:
# c) COFFEE_EXPORT
export_map = {
    "Kim_Ngach(millionUSD)":                        "export_value_million_usd",
    "coffee_price_usd_per_ton(world)(USD/ton)":     "price_world_usd_per_ton",
    "coffee_price_usd_per_ton(vietnam)(USD/ton)":   "price_vn_usd_per_ton",
}
e = long_df[long_df["hang_muc"].isin(export_map.keys())].copy()
e["col"] = e["hang_muc"].map(export_map)
export_df = (e.pivot_table(index="year", columns="col", values="value", aggfunc="first")
               .reset_index()
               .rename_axis(None, axis=1))

### Xử lí file CSV Market_Trade

In [11]:
mt = mt.rename(columns=lambda c: str(c).strip())

In [12]:
need = {"Year", "Importer", "Trade Value(million_USD)", "Quantity(tons)"}
if not need.issubset(set(mt.columns)):
    miss = need - set(mt.columns)
    raise SystemExit(f"Thi_phan_3_thi_truong_chinh.csv thiếu cột: {miss}")

mt["Importer"] = mt["Importer"].astype(str).str.strip()
mt["Year"]     = pd.to_numeric(mt["Year"], errors="coerce").astype("Int64")
mt["Trade Value(million_USD)"] = pd.to_numeric(mt["Trade Value(million_USD)"], errors="coerce")
mt["Quantity(tons)"]           = pd.to_numeric(mt["Quantity(tons)"], errors="coerce")
mt["Year"] = mt["Year"].astype(int)
mt = mt.rename(columns={
    "Trade Value(million_USD)": "trade_value_million_usd",
    "Quantity(tons)":           "quantity_tons"
})[["Importer", "Year", "trade_value_million_usd", "quantity_tons"]]


### Tạo DDL(các bảng SQL)

In [13]:
ddl_weather = """
CREATE TABLE IF NOT EXISTS weather (
  id BIGINT AUTO_INCREMENT PRIMARY KEY,
  year INT NOT NULL,
  temperature DECIMAL(5,2) NULL,
  humidity    DECIMAL(5,2) NULL,
  rain        DECIMAL(10,1) NULL,
  UNIQUE KEY uq_weather_year (year)
) CHARACTER SET utf8mb4;
"""

In [14]:
ddl_production = """
CREATE TABLE IF NOT EXISTS production (
  id BIGINT AUTO_INCREMENT PRIMARY KEY,
  year INT NOT NULL,
  area_thousand_ha DECIMAL(10,1) NULL,
  output_tons      DECIMAL(14,2) NULL,
  export_tons      DECIMAL(14,2) NULL,
  UNIQUE KEY uq_prod_year (year)
) CHARACTER SET utf8mb4;
"""

In [15]:
ddl_export = """
CREATE TABLE IF NOT EXISTS coffee_export (
  id BIGINT AUTO_INCREMENT PRIMARY KEY,
  year INT NOT NULL,
  export_value_million_usd DECIMAL(16,2) NULL,
  price_world_usd_per_ton  DECIMAL(12,2) NULL,
  price_vn_usd_per_ton     DECIMAL(12,2) NULL,
  UNIQUE KEY uq_trade_year (year)
) CHARACTER SET utf8mb4;
"""

In [16]:
ddl_market_trade = """
CREATE TABLE IF NOT EXISTS market_trade (
  id BIGINT AUTO_INCREMENT PRIMARY KEY,
  importer VARCHAR(100) NOT NULL,
  year INT NOT NULL,
  trade_value_million_usd DECIMAL(16,2) NULL,
  quantity_tons           DECIMAL(16,2) NULL,
  UNIQUE KEY uq_importer_year (importer, year)
) CHARACTER SET utf8mb4;
"""

### Upsert lên MySQL

In [17]:
def to_none(x):   # đổi NaN -> None (NULL)
    return None if pd.isna(x) else float(x) if isinstance(x, (int, float)) else x

In [18]:
upsert_weather = """
INSERT INTO weather (year, temperature, humidity, rain)
VALUES (%s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
  temperature = VALUES(temperature),
  humidity    = VALUES(humidity),
  rain        = VALUES(rain);
"""

In [19]:
upsert_production = """
INSERT INTO production (year, area_thousand_ha, output_tons, export_tons)
VALUES (%s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
  area_thousand_ha = VALUES(area_thousand_ha),
  output_tons      = VALUES(output_tons),
  export_tons      = VALUES(export_tons);
"""

In [20]:
upsert_export = """
INSERT INTO coffee_export (year, export_value_million_usd, price_world_usd_per_ton, price_vn_usd_per_ton)
VALUES (%s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
  export_value_million_usd = VALUES(export_value_million_usd),
  price_world_usd_per_ton  = VALUES(price_world_usd_per_ton),
  price_vn_usd_per_ton     = VALUES(price_vn_usd_per_ton);
"""

In [21]:
upsert_market_trade = """
INSERT INTO market_trade (importer, year, trade_value_million_usd, quantity_tons)
VALUES (%s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
  trade_value_million_usd = VALUES(trade_value_million_usd),
  quantity_tons           = VALUES(quantity_tons);
"""

### Ghi vào database (thực thi các dòng lệnh)

In [22]:
DROP_EXISTING = False #nếu muốn tạo lại bảng mỗi lần chạy: True

In [23]:
try:
    with conn.cursor() as cur:
        if DROP_EXISTING:
            cur.execute("DROP TABLE IF EXISTS weather")
            cur.execute("DROP TABLE IF EXISTS production")
            cur.execute("DROP TABLE IF EXISTS coffee_export")
            cur.execute("DROP TABLE IF EXISTS market_trade")

        cur.execute(ddl_weather)
        cur.execute(ddl_production)
        cur.execute(ddl_export)
        cur.execute(ddl_market_trade)

        # a) weather rows
        w_rows = []
        for _, r in weather_df.iterrows():
            w_rows.append((
                int(r["year"]),
                to_none(r.get("temperature")),
                to_none(r.get("humidity")),
                to_none(r.get("rain")),
            ))
        if w_rows: cur.executemany(upsert_weather, w_rows)

        # b) production rows
        p_rows = []
        for _, r in production_df.iterrows():
            p_rows.append((
                int(r["year"]),
                to_none(r.get("area_thousand_ha")),
                to_none(r.get("output_tons")),
                to_none(r.get("export_tons")),
            ))
        if p_rows: cur.executemany(upsert_production, p_rows)

        # c) export rows
        e_rows = []
        for _, r in export_df.iterrows():
            e_rows.append((
                int(r["year"]),
                to_none(r.get("export_value_million_usd")),
                to_none(r.get("price_world_usd_per_ton")),
                to_none(r.get("price_vn_usd_per_ton")),
            ))
        if e_rows: cur.executemany(upsert_export, e_rows)

        # d) market_trade rows
        mt_rows = []
        for _, r in mt.iterrows():
            mt_rows.append((
                str(r["Importer"]), int(r["Year"]),
                to_none(r["trade_value_million_usd"]),
                to_none(r["quantity_tons"]),
            ))
        if mt_rows: cur.executemany(upsert_market_trade, mt_rows)

        # Report nhanh
        def count_table(name):
            cur.execute(f"SELECT COUNT(*) AS c FROM {name}"); return cur.fetchone()["c"]
        print("weather rows:",     count_table("weather"))
        print("production rows:",  count_table("production"))
        print("coffee_export rows:", count_table("coffee_export"))
        print("market_trade rows:",  count_table("market_trade"))

    conn.commit()
except Exception as e:
    conn.rollback()
    print("ERROR:", repr(e))
finally:
    conn.close()

weather rows: 20
production rows: 20
coffee_export rows: 20
market_trade rows: 57
