In [1]:
import pandas as pd
from datetime import datetime, date, timedelta
import pymysql
import os
from dotenv import load_dotenv

In [2]:
def T_calculate_P75(df_store: pd.DataFrame, df_stats: pd.DataFrame) -> pd.DataFrame:
    p75_district_cat = (
        df_store
        .groupby(["city", "district", "category_id"], as_index=False)["reviews"]
        .quantile(0.75)
        .rename(columns={"reviews": "p75_district_cat"})
    )

    # city, category
    p75_city_cat = (
        df_store
        .groupby(["city", "category_id"], as_index=False)["reviews"]
        .quantile(0.75)
        .rename(columns={"reviews": "p75_city_cat"})
    )

    # 合併city,district P75
    merged = (
        df_stats
        .merge(p75_district_cat, on=["city", "district", "category_id"], how="left")
        .merge(p75_city_cat, on=["city", "category_id"], how="left")
    )

    return merged


def T_merged_fillna(df: pd.DataFrame) -> pd.DataFrame:
    df["p75_district_cat"] = df["p75_district_cat"].fillna(0)
    df["p75_city_cat"] = df["p75_city_cat"].fillna(0)

    return df


def T_calculate_weight_and_mscore(df: pd.DataFrame, t: int) -> pd.DataFrame:
    # 權重w：樣本越多（n_district_cat 大），越信任district的 P75
    df["w_district_cat"] = df["n_district_cat"] / (df["n_district_cat"] + t)

    # m = w * P75_district_cat + (1 - w) * P75_city_cat
    df["m_city_district_cat"] = (
        df["w_district_cat"] * df["p75_district_cat"] +
        (1 - df["w_district_cat"]) * df["p75_city_cat"]
    )

    result = df[["city", "district", "category_id", "m_city_district_cat"]].copy()

    return result


def create_pymysql_connect():
    """
    自動透過pymysql建立連線，回傳conn連線物件。
    所需各項資料請寫入.env檔案中。請勿直接寫於程式中。
    """

    load_dotenv()

    username = os.getenv("MYSQL_USERNAME")
    password = os.getenv("MYSQL_PASSWORD")
    target_ip = os.getenv("MYSQL_IP")
    target_port = int(os.getenv("MYSQL_PORTT"))
    db_name = os.getenv("MYSQL_DB_NAME")

    conn = pymysql.connect(
        host=target_ip,
        port=target_port,
        user=username,
        password=password,
        database=db_name,
        charset='utf8mb4'
    )

    return conn


def E_query_from_sql(sql: str) -> pd.DataFrame:
    conn = create_pymysql_connect()

    try:
        df = pd.read_sql(sql, conn)
        return df.to_dict(orient='records')

    except Exception as e:
        raise Exception(f"執行指令時發生錯誤：{e}")


def T_transform_to_df(data: list[dict]) -> pd.DataFrame:
    """將包含多個dict的list轉換成dataframe"""
    df = pd.DataFrame(data=data)
    return df


In [4]:
sql_stores = """
    select
        f.category_id,
        f.rating_total as reviews,
        l.city,
        l.district
    from v_2fact_store_all as f
    left join location as l
    on f.loc_id = l.loc_id;
    """

df_store_dict = E_query_from_sql(sql=sql_stores)
df_store = T_transform_to_df(data=df_store_dict)

sql_stats = """
    SELECT
        city, district, category_id, n_district_cat
    FROM v_3district_cat_stats;
    """

df_stats_dict = E_query_from_sql(sql=sql_stats)
df_stats = T_transform_to_df(data=df_stats_dict)

df_merged = T_calculate_P75(df_store=df_store, df_stats=df_stats)

df_merged = T_merged_fillna(df=df_merged)

df_result = T_calculate_weight_and_mscore(df=df_merged, t=30)

  df = pd.read_sql(sql, conn)
  df = pd.read_sql(sql, conn)


In [5]:
df_result

Unnamed: 0,city,district,category_id,m_city_district_cat
0,高雄市,鹽埕區,7,379.000000
1,高雄市,鹽埕區,6,345.000000
2,高雄市,鹽埕區,5,308.161290
3,高雄市,鹽埕區,4,93.354839
4,高雄市,鹽埕區,3,156.000000
...,...,...,...,...
1101,桃園市,復興區,5,360.743902
1102,桃園市,復興區,4,95.000000
1103,桃園市,復興區,3,109.000000
1104,桃園市,復興區,2,932.166667
