In [None]:
from datetime import datetime, time
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from io import StringIO
from pandas import DataFrame, Series
from smtplib import SMTP
from time import perf_counter, localtime, mktime, gmtime
from zoneinfo import ZoneInfo
import pandas as pd
import requests, logging, sys

KST = ZoneInfo('Asia/Seoul')

# UTILITIES

In [None]:
LAP     = lambda : datetime.now(tz=KST)
OPEN    = datetime.combine(LAP(), time(7,0), tzinfo=KST)
CLOSE   = datetime.combine(LAP(), time(22,0), tzinfo=KST)

def fetch(url:str, **kwargs):
    """
    url: https://api.bithumb.com 의 api 데이터 취득
    base url의 하위 주소를 입력하여 응답을 pandas Series 또는 DataFrame으로 변환

    Args:
        url     (str)   : 주소, base url이 제외된 하위 주소
        kwargs  (dict)  : Series 또는 DataFrame 변환 시 전달할 Keywoard Arguments

    Returns:
        Union[Series, DataFrame] : 
    """
    resp = requests.get(f"https://api.bithumb.com/v1{url}", headers={"accept": "application/json"})
    json = resp.json()
    return Series(json[0], **kwargs) if len(json) == 1 else DataFrame(json, **kwargs)


class Logger(logging.Logger):

    _runtime = None
    _buffer  = None

    @classmethod
    def kst(cls, *args):
        return localtime(mktime(gmtime()) + 9 * 3600)
    
    def __init__(self, name:str):
        super().__init__(name=name, level=logging.INFO)
        self.propagate = False
        formatter = logging.Formatter(
            fmt=f"%(asctime)s %(message)s",
            datefmt="%Y-%m-%d %H:%M:%S"
        )
        formatter.converter = self.kst

        stream = logging.StreamHandler(stream=sys.stdout)
        stream.setLevel(logging.INFO)
        stream.setFormatter(formatter)
        self.addHandler(stream)

        self._buffer = StringIO()
        memory = logging.StreamHandler(stream=self._buffer)
        memory.setLevel(logging.INFO)
        memory.setFormatter(formatter)
        self.addHandler(memory)
        return
    
    def __call__(self, msg:str):
        self.info(msg=msg)
        return
    
    @property
    def stream(self) -> str:
        return self._buffer.getvalue()
    
    def clear(self):
        self._buffer.truncate(0)
        self._buffer.seek(0)
        return


class Mail(MIMEMultipart):
    """
    e-mail 전송 서비스
    """
    def __init__(self):
        super().__init__()
        self['From'] = 'snob.labwons@gmail.com'
        self.content = ""
        return

    @property
    def Subject(self) -> str:
        return self['Subject']

    @Subject.setter
    def Subject(self, _subject:str):
        self['Subject'] = _subject

    @property
    def To(self) -> str:
        return self['To']
    
    @To.setter
    def To(self, _to:str):
        self['To'] = _to

    def send(self):
        self.attach(MIMEText(self.content))
        with SMTP('smtp.gmail.com', 587) as server:
            server.ehlo()
            server.starttls()
            server.login(self['From'], "puiz yxql tnoe ivaa")
            server.send_message(self)
        return

In [None]:
class Coin:

    rename = {
        "candle_date_time_kst": "datetime",
        "opening_price": "open",
        "high_price": "high",
        "low_price": "low",
        "trade_price": "close",
        "candle_acc_trade_price": "amount",
        "candle_acc_trade_volume": "volume",
        "trade_volume": 'volume',
        "ask_bid": "quote",
        "datetime": "datetime"
    }

    def __init__(self, ticker:str):
        self.ticker = ticker
        return

    def __repr__(self):
        return repr(self.snapShot())
    
    def snapShot(self) -> Series:
        """
        market                               KRW-BTC
        trade_date                          20251128
        trade_time                            021901
        trade_date_kst                      20251128
        trade_time_kst                        111901
        trade_timestamp                1764328741157
        opening_price                      135796000
        high_price                         137125000
        low_price                          135372000
        trade_price                        135442000
        prev_closing_price                 135796000
        change                                  FALL
        change_price                          354000
        change_rate                           0.0026
        signed_change_price                  -354000
        signed_change_rate                   -0.0026
        trade_volume                        0.000073
        acc_trade_price           29804029073.873871
        acc_trade_price_24h      111366853330.307678
        acc_trade_volume                  218.675501
        acc_trade_volume_24h              815.641639
        highest_52_week_price              179734000
        highest_52_week_date              2025-10-10
        lowest_52_week_price               110000000
        lowest_52_week_date               2024-12-04
        timestamp                      1764328741157
        Name: KRW-BTC, dtype: object        
        """
        return fetch(f"/ticker?markets={self.ticker}", name=self.ticker)

    def ohlcv(self, period:str='d', *args, **kwargs) -> DataFrame:
        period = {'d':'days', 'min':'minutes', 'w':'weeks', 'm':'months'}[period.lower()]
        if period == 'minutes':
            unit = args[0] if args else kwargs.get("unit", 60)
            query = f'/candles/{period}/{unit}?market={self.ticker}&count=200'
        else:
            query = f'/candles/{period}?market={self.ticker}&count=200'
        
        data = fetch(query)
        cols = {k:v for k, v in self.rename.items() if k in data.columns}
        data = data.rename(columns=cols)[cols.values()]
        data = data.set_index(keys='datetime')
        data['volume'] = data['volume'] * 1e+6
        return data

    def execution(self, count:int=100) -> DataFrame:
        data = fetch(f'/trades/ticks?market={self.ticker}&count={min(count, 500)}')
        data['datetime'] = pd.to_datetime(data['trade_date_utc'].astype(str) + ' ' + data['trade_time_utc'].astype(str))
        data['datetime'] = data['datetime'] + pd.Timedelta(hours=9)
        cols = {k:v for k, v in self.rename.items() if k in data.columns}
        
        data = data.rename(columns=cols)[cols.values()]
        data = data.set_index(keys='datetime')
        return data
    
    def order(self) -> DataFrame:
        base = fetch(f'/orderbook?markets={self.ticker}')
        data = DataFrame(base['orderbook_units'])
        data['datetime'] = pd.to_datetime(base['timestamp'], unit='ms') + pd.Timedelta(hours=9)
        data['datetime'] = data['datetime'].dt.strftime("%Y-%m-%d %H:%M:%S")
        return data.set_index(keys='datetime')


# coin = Coin('KRW-BTC')
# coin.snapShot()
# coin.ohlcv('min', 30)
# coin.execution()
# coin.order()
# coin

In [None]:
class Coins:

    _tickers = DataFrame()
    _baseline = DataFrame()
    rename = {
        'market':'ticker',
        'english_name': 'eng',
        # 'market_warning': 'warning',
        'korean_name':'kor',
        'warning_type': 'warning',
        'end_date': 'warning_end'
    }

    def __init__(self):
        self._lap = perf_counter()
        return
    
    def __iter__(self):
        for ticker in self.tickers.index:
            yield ticker

    def __getitem__(self, ticker:str) -> DataFrame:
        return Coin(ticker=ticker)
    
    def _repr_html_(self):
        return self.tickers._repr_html_()
    
    def capture_baseline(self, period:str='d', *args, **kwargs ) -> DataFrame:
        self._baseline = pd.concat(
            {t: Coin(t).ohlcv(period, *args, **kwargs) for t in self.tickers.index},
            axis=1
        ).sort_index(ascending=True).tail(200)
        return self._baseline
    
    @property
    def baseline(self) -> DataFrame:
        if self._baseline.empty:
            self.capture_baseline(period='min', unit=60)
        return self._baseline
    
    @property
    def tickers(self) -> DataFrame:
        if self._tickers.empty or ((perf_counter() - self._lap) >= 600):
            self._tickers = self.fetch_tickers().join(self.fetch_warnings())
            self._lap = perf_counter()
        return self._tickers
    
    @classmethod
    def fetch_tickers(cls) -> DataFrame:
        data = fetch('/market/all?isDetails=true')
        cols = {k:v for k, v in cls.rename.items() if k in data.columns}
        data = data.rename(columns=cols)[cols.values()]
        data = data[data['ticker'].str.startswith('KRW')]
        # data["warning"] = data["warning"].replace("NONE", None)
        return data.set_index(keys='ticker')

    @classmethod
    def fetch_warnings(cls) -> DataFrame:
        data = fetch('/market/virtual_asset_warning')
        cols = {k:v for k, v in cls.rename.items() if k in data.columns}
        data = data.rename(columns=cols)[cols.values()]
        data = data[data['ticker'].str.startswith('KRW')]
        return data.set_index(keys='ticker')
    

# coins = Coins()
# coins.capture_baseline(period='min', unit=60)
# coins.baseline
# coins

In [None]:
class Indicator:

    def __init__(self, baseline:DataFrame):
        self.data = baseline.copy()
        return 
    
    def __iter__(self):
        for col in self.data.columns.get_level_values(0).unique():
            yield col
    
    def __contains__(self, col:str):
        return col in self.data['KRW-BTC'].columns
    
    def __getitem__(self, col:str):
        return self.data.xs(col, axis=1, level=1)
    
    def __setitem__(self, col:str, series):
        series.columns = pd.MultiIndex.from_product([series.columns, [col]])
        self.data = pd.concat([self.data, series], axis=1).sort_index(axis=1)
        return
    
    def __delitem__(self, col:str):
        if not col in self:
            return
        mask = self.data.columns.get_level_values(1) == col
        self.data = self.data.drop(columns=self.data.columns[mask])
        return
    
    def _repr_html_(self):
        return self.data._repr_html_()
    
    def _set_columns(self, **kwargs):
        for column, series in kwargs.items():
            self[column] = series
        return
    
    def _del_columns(self, *cols):
        for col in cols:
            del self[col]
        return
    
    def install(self):
        self.add_tp()
        self.add_bb()
        return 
    
    def add_tp(self):
        self['tp'] = round((self['high'] + self['low'] + self['close']) / 3, 4)
        return
    
    def add_bb(self, basis:str='tp', window:int=20, std:int=2):
        basis = basis if basis in self else 'close'
        dev = self[basis].rolling(window).std()
        mid = self[basis].rolling(window).mean()
        up = mid + std * dev
        dn = mid - std * dev
        self._set_columns(
            mid=mid,
            bb_upper=up,
            bb_lower=dn,
            tr_upper=mid + (std/2) * dev,
            tr_lower=mid - (std/2) * dev,
            bb_width=((up - dn) / mid) * 100
        )
        return    

# indicator = Indicator(coins.baseline)
# indicator.install()
# indicator.add_tp()
# indicator.add_bb()
# indicator

In [None]:
class Strategy(Indicator):

    @classmethod
    def _rank(cls, x, pct=True, **kwargs):
        rank = Series(x).rank(pct=pct)
        n = kwargs.get('n', None)
        if n:
            return rank.iloc[n]
        return rank
    
    def to_report(self, sig:DataFrame) -> DataFrame:
        lap = pd.to_datetime(LAP()).tz_localize(None)
        sig = sig[pd.to_datetime(sig.index) >= (lap - pd.Timedelta(hours=48))]
        sig.columns = [c.replace("KRW-", "") for c in sig.columns]
        return sig.apply(lambda row: ','.join(sig.columns[row.notna()]), axis=1) \
                  .replace("", None) \
                  .dropna()

    def squeeze_expand(self, window:int=-1):
        if window == -1:
            self['_width_rank'] = self['bb_width'].rank(pct=True)
        else:    
            self['_width_rank'] = self['bb_width'].rolling(window).apply(self._rank, kwargs={'n':-1})
        self['sig_squeeze_expand'] = (
            (self['_width_rank'] < 0.25) & 
            (self['close'] >= self['bb_upper']) & 
            ((self['close'] - self['open']) >= (self['bb_upper'] - self['mid'])) &
            (self['volume'] >= self['volume'].rolling(7).mean())
        ).astype(int).replace(0, None)
            
        del self['_width_rank']
        del self['_volume_rank']
        return self['sig_squeeze_expand']
    

# coins = Coins()
# strategy = Strategy(coins.baseline)
# strategy.install()
# strategy.squeeze_expand()
# strategy.to_report(strategy['sig_squeeze_expand'])



# AUTO-DETECTION

In [None]:
DEBUG = False
ONCE = True

try:
    """
    
    """
    while True:

        lap = LAP()
        if (lap.minute == lap.second == 0) or DEBUG or ONCE:
            console = Logger('TRADER V1')
            coins = Coins()
            email = Mail()
            email.Subject = f'{lap.strftime("%Y-%m-%d %H:%M:%S")} STRATEGY REPORT'

            console.clear()
            tic = perf_counter()
            coins.capture_baseline(period='min', unit=60)
            console.info(f'CAPTURE BASELINE ... Elapsed: {perf_counter() - tic:.2f}s')

            strategy = Strategy(coins.baseline)
            strategy.install()

            console('Squeeze & Expand ' + '-' * 30)
            strategy_sig = strategy.squeeze_expand()
            strategy_rep = strategy.to_report(strategy_sig)
            strategy_rep.index.name = ""
            console("\n".join(str(strategy_rep).splitlines()[:-1]))
            console('-' * 30 + ' Squeeze & Expand')

            email.To = 'jhlee_0319@naver.com'
            email.content = console.stream
            email.send()
            if ONCE:
                ONCE = False
        
        if DEBUG or LAP() >= CLOSE:
            break

except KeyboardInterrupt:
    console("SYSTEM ABORTED")
finally:
    console("SYSTEM END")
    pass
