In [256]:
AWS_ACCESS_KEY_ID = None
AWS_SECRET_ACCESS_KEY = None
EC2_PUBLIC_IP = None
SNOWFLAKE_ACCOUNT = None
SNOWFLAKE_DATABASE = None
SNOWFLAKE_PASSWORD = None
SNOWFLAKE_SCHEMA = None
SNOWFLAKE_USER = None
SNOWFLAKE_WAREHOUSE = None
target_date = None
ref_time_str = None

In [259]:
import pandas as pd
import numpy as np
import boto3
import psycopg2
import snowflake.connector
from datetime import datetime, timedelta
import botocore.exceptions
import lightgbm as lgb
import json
from snowflake.connector.pandas_tools import write_pandas
import requests

In [260]:
aws_access_key_id = AWS_ACCESS_KEY_ID
aws_secret_access_key = AWS_SECRET_ACCESS_KEY

s3_client = boto3.client(
    's3',
    aws_access_key_id=aws_access_key_id,
    aws_secret_access_key=aws_secret_access_key,
    region_name='us-east-1'
)

bucket_name = "de6-team7-bucket"

In [261]:
def try_convert(s, fmt="%Y-%m-%d %H:%M:%S"):
    try:
        dt = pd.to_datetime(s, errors='raise')
        return dt.strftime(fmt)
    except (ValueError, TypeError):
        return None

In [285]:
def load_hourly_data(target_date: str, ref_time_str: str) -> pd.DataFrame:
    url = f"http://" + EC2_PUBLIC_IP + ":8080/query"

    ref_time = datetime.strptime(ref_time_str, "%H:%M")
    start_time = (ref_time - timedelta(hours=1)).strftime("%H:%M")
    end_time = ref_time.strftime("%H:%M")

    start_datetime_utc = f"{target_date} {start_time}"
    end_datetime_utc = f"{target_date} {end_time}"

    query = {
        "sql": f"""
            SELECT
                timestamp_ms, stock_code, open_price, high_price, low_price,
                close_price, cum_volume, cum_amount, (ask_price_1 - bid_price_1) as spread,
                prev_close_price
            FROM raw_data.stock_raw_info
            WHERE
                TO_TIMESTAMP(timestamp_ms) >= TO_TIMESTAMP('{start_datetime_utc}', 'YYYY-MM-DD HH24:MI')
                AND TO_TIMESTAMP(timestamp_ms) < TO_TIMESTAMP('{end_datetime_utc}', 'YYYY-MM-DD HH24:MI')
            ORDER BY timestamp_ms
        """
    }

    try:
        response = requests.post(url, json=query)
        response.raise_for_status()
        data = response.json()
    except Exception as e:
        print(f"🚨 API 요청 또는 JSON 변환 중 오류 발생: {e}")
        return pd.DataFrame()

    if not data:
        print(f"✅ API는 정상 응답했으나, '{start_datetime_utc}' ~ '{end_datetime_utc}' (UTC) 사이의 데이터가 없습니다.")
        return pd.DataFrame()

    column_names = [
        'timestamp_ms', 'stock_code', 'open_price', 'high_price', 'low_price',
        'close_price', 'cum_volume', 'cum_amount', 'spread', 'prev_close_price'
    ]

    df = pd.DataFrame(data, columns=column_names)

    df['prediction_time'] = pd.to_datetime(df['timestamp_ms'], unit='s')

    agg_logic = {
        'open_price': 'first', 'high_price': 'max', 'low_price': 'min',
        'close_price': 'last', 'cum_volume': 'last', 'cum_amount': 'last',
        'spread': 'mean'
    }

    hourly_df = df.set_index('prediction_time').groupby('stock_code').resample('1h').agg(agg_logic)
    hourly_df.reset_index(inplace=True)

    hourly_df['vwap_price'] = hourly_df['cum_amount'] / hourly_df['cum_volume'].replace(0, np.nan)
    hourly_df.dropna(subset=['vwap_price'], inplace=True)

    prev_close_map = df.groupby('stock_code')['prev_close_price'].first()
    hourly_df['prev_close_price'] = hourly_df['stock_code'].map(prev_close_map)
    hourly_df['change_rate'] = (hourly_df['close_price'] / hourly_df['prev_close_price'] - 1) * 100

    final_cols = [
        'prediction_time', 'stock_code', 'open_price', 'high_price', 'low_price',
        'close_price', 'cum_volume', 'cum_amount', 'vwap_price', 'change_rate', 'spread'
    ]

    return hourly_df[final_cols]

In [263]:
def make_feature(df):
    try:
        df.sort_values(by=['prediction_time', 'stock_code'], inplace=True)
        df['stock_code'] = df['stock_code'].astype('category')
        if df.empty:
            print("Empty dataframe")
            return
        return df
    except Exception as e:
        print(e)
        return pd.DataFrame()

In [264]:
def check_booster_exists(bucket_name: str, s3_key: str) -> bool:
    try:
        s3_client.head_object(Bucket=bucket_name, Key=s3_key)
        return True
    except botocore.exceptions.ClientError as e:
        if e.response['Error']['Code'] == '404':
            return False
        else:
            raise

In [265]:
def load_booster(model_type='clf'):
    try:
        colab_model_path = "/tmp/"
        if model_type == 'reg':
            booster_name = "stock_reg_booster.txt"
            params_name = "stock_reg_params.json"
        else:
            booster_name = "stock_clf_booster.txt"
            params_name = "stock_clf_params.json"

        s3_booster_key = f'models/stock/{booster_name}'
        s3_params_key = f'models/stock/{params_name}'

        if check_booster_exists(bucket_name, s3_booster_key):
            s3_client.download_file(bucket_name, s3_booster_key, colab_model_path+booster_name)  # booster download
            s3_client.download_file(bucket_name, s3_params_key, colab_model_path+params_name)  # params download
            booster = lgb.Booster(model_file=colab_model_path+booster_name)
            with open(colab_model_path+params_name, 'r') as f:
                params = json.load(f)
        else:
            raise
        return booster, params
    except Exception as e:
        raise

In [266]:
def predict_and_save(df):
    if df.empty:
        print("데이터프레임이 비어있어 예측을 건너뜁니다.")
        return

    feature_cols = df.columns.difference(['prediction_time'])

    clf_booster, clf_params = load_booster()
    reg_booster, reg_params = load_booster('reg')

    pred_price = reg_booster.predict(df[feature_cols])
    pred_up = clf_booster.predict(df[feature_cols])
    pred_up = np.argmax(pred_up, axis=1)
    df['future_avg_price'] = pred_price
    df['target_direction'] = pred_up
    df['future_return'] = (df['future_avg_price'] - df['close_price']) / df['close_price']

    df = df[['prediction_time', 'stock_code', 'open_price', 'high_price', 'low_price', 'close_price', 'cum_volume', 'cum_amount', 'vwap_price', 'change_rate', 'spread', 'future_avg_price', 'future_return', 'target_direction']]

    upload_to_snowflake(df, "STOCK_PREDICTION_TABLE", target_date)
    return

In [290]:
def snowflake_connection():
    return snowflake.connector.connect(
        user=SNOWFLAKE_USER,
        password=SNOWFLAKE_PASSWORD,
        account=SNOWFLAKE_ACCOUNT,
        warehouse=SNOWFLAKE_WAREHOUSE,
        database=SNOWFLAKE_DATABASE,
        schema=SNOWFLAKE_SCHEMA
    )


def upload_to_snowflake(df, table_name, target_dt):
    if df.empty:
        print("업로드할 데이터 없음")
        return
    df.columns = [col.strip().upper().replace(' ', '_') for col in df.columns]
    df['PREDICTION_TIME'] = df['PREDICTION_TIME'].dt.strftime('%Y-%m-%d %H:%M:%S')

    conn = None
    try:
        print(f"\n--- Snowflake에 결과 적재 시작 ---")
        print(f"대상 테이블: {table_name}")
        conn = snowflake_connection()
        if conn:
            cursor = conn.cursor()

            print(f"\n--- DataFrame을 본 테이블로 직접 적재 ---")
            write_pandas(conn=conn, df=df, table_name=table_name.upper(), auto_create_table=False, overwrite=False)
            print(f"✅ Snowflake 테이블 적재 성공: {df.shape[0]} 행")

            cursor.close()
        else:
            print("🚨 Snowflake 연결 실패, 업로드를 건너뜁니다.")
    except Exception as e:
        print(f"🚨 Snowflake 적재 오류: {e}")
    finally:
        if conn is not None:
            conn.close()

In [293]:
if __name__ == "__main__":
    df = load_hourly_data(target_date, ref_time_str) # 인자 전달
    if not df.empty:
      df = make_feature(df)
      predict_and_save(df)
    else:
      print("처리할 데이터가 없어 모든 과정을 건너뜁니다.")


--- Snowflake에 결과 적재 시작 ---
대상 테이블: STOCK_PREDICTION_TABLE

--- DataFrame을 본 테이블로 직접 적재 ---


  write_pandas(conn=conn, df=df, table_name=table_name.upper(), auto_create_table=False, overwrite=False)


✅ Snowflake 테이블 적재 성공: 2631 행
