In [1]:
import plugins.database.models as models
models.create_table_if_not_exists("margin_trading")

In [1]:
from airflow.configuration import conf


In [2]:
conf.get('at_web','db_uri')

'postgresql+psycopg2://postgres:test@localhost:5432/investment'

In [1]:
from airflow.providers.postgres.hooks.postgres import PostgresHook

hook = PostgresHook(postgres_conn_id="postgres_investment")


In [1]:
from mongoengine import DateField, Document, ListField, StringField, connect, disconnect


class CynesNews(Document):
    date = DateField()
    headline = StringField()
    tags = ListField()
    content = StringField()

meta = {"shard_key": ("date"), "indexes": [("date", "headline")]}

username =  "root"
password = 'example'
connect(
    db='News',
    username=username,
    password=password,
    host="localhost",
    port=27018
)

MongoClient(host=['localhost:27018'], document_class=dict, tz_aware=False, connect=True, read_preference=Primary(), uuidrepresentation=3)

In [2]:
item_dict = {'headline':'123'}
CynesNews.objects(headline='123').modify(upsert=True, **item_dict)

In [6]:
import httpx
import pandas as pd
import pendulum
from after_trading.processor.utils import turn_year_to_ROC_year

LISTED_COL_MAPPING = {
    "代號": "stock_code",
    "名稱": "stock_name",
    "買進": "MarginPurchaseBuy",
    "賣出": "MarginPurchaseSell",
    "現金償還": "MarginPurchaseCashRepayment",
    "前日餘額": "MarginPurchaseYesterdayBalance",
    "今日餘額": "MarginPurchaseTodayBalance",
    "限額": "MarginPurchaseLimit",
    "買進.1": "ShortSaleBuy",
    "賣出.1": "ShortSaleSell",
    "現券償還": "ShortSaleCashRepayment",
    "前日餘額.1": "ShortSaleYesterdayBalance",
    "今日餘額.1": "ShortSaleTodayBalance",
    "限額.1": "ShortSaleLimit",
    "資券互抵": "OffsetLoanAndShort",
    "註記": "Note",
}

OTC_COL_MAPPING = {
    "代號": "stock_code",
    "名稱": "stock_name",
    "資買": "MarginPurchaseBuy",
    "資賣": "MarginPurchaseSell",
    "現償": "MarginPurchaseCashRepayment",
    "前資餘額(張)": "MarginPurchaseYesterdayBalance",
    "資餘額": "MarginPurchaseTodayBalance",
    "資限額": "MarginPurchaseLimit",
    "券賣": "ShortSaleSell",
    "券買": "ShortSaleBuy",
    "券償": "ShortSaleCashRepayment",
    "券餘額": "ShortSaleTodayBalance",
    "前券餘額(張)": "ShortSaleYesterdayBalance",
    "券限額": "ShortSaleLimit",
    "資券相抵(張)": "OffsetLoanAndShort",
    "備註": "Note",
}


default_date_str = pendulum.today().to_date_string()


def get_listed_margin_trading_data(orignal_date_str: str = "", listed: bool = True):
    if not orignal_date_str:
        orignal_date_str, date_str = get_date_str(listed)

    else:
        if listed:
            date_str = orignal_date_str
        else:
            date_str = turn_year_to_ROC_year(orignal_date_str)

    if listed:
        api_url = f"https://www.twse.com.tw/rwd/zh/marginTrading/MI_MARGN?date={date_str}&selectType=ALL&response=csv"
        kwargs = {"skiprows": 7, "skipfooter": 7}

    else:
        api_url = f"https://www.tpex.org.tw/web/stock/margin_trading/margin_balance/margin_bal_result.php?l=zh-tw&o=csv&d={date_str}&s=0"
        kwargs = {"skiprows": 2, "skipfooter": 20}

    print(api_url)
    with httpx.Client() as client:
        response = client.get(api_url)

    if response.status_code != 200:
        return pd.DataFrame()

    df = pd.read_csv(api_url, encoding="cp950", **kwargs)

    df["occured_date"] = orignal_date_str
    df["occured_date"] = pd.to_datetime(df["occured_date"], utc=True)
    return clean_df(df, listed)


def get_date_str(listed: bool):
    date_str = pendulum.today().to_date_string()

    if listed:
        date_str = date_str.replace("-", "")
        return date_str, date_str

    date_str = date_str.replace("-", "/")
    return date_str, turn_year_to_ROC_year(date_str)


def clean_df(df, listed):
    # clean column names
    col_mapping = LISTED_COL_MAPPING if listed else OTC_COL_MAPPING
    rename_cols = {col.strip(): col for col in df.columns}
    df = df.rename(columns=rename_cols).rename(columns=col_mapping)

    df.dropna(subset=["stock_name"], inplace=True)
    df["stock_code"].replace(r'=|"', "", regex=True, inplace=True)
    df.fillna(0, inplace=True)

    use_cols = list(col_mapping.values())
    int_cols = use_cols[2:-1]
    df[int_cols] = df[int_cols].replace(",", "", regex=True).astype(int)

    return df[use_cols + ["occured_date"]]


In [7]:
df = get_listed_margin_trading_data(listed=True)

https://www.twse.com.tw/rwd/zh/marginTrading/MI_MARGN?date=20231205&selectType=ALL&response=csv


In [12]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 0 entries
Empty DataFrame
