In [82]:
import pandas as pd
from sqlalchemy import create_engine
from datetime import datetime, timedelta

# 設置MySQL資料庫連接
db_user = 'root'
db_password = '19970730'
db_host = '127.0.0.1'
db_name = 'sql_stock'

# 創建資料庫連接引擎
engine = create_engine(f'mysql+mysqlconnector://{db_user}:{db_password}@{db_host}/{db_name}')

# 設置日期範圍
#today = datetime(2024, 5, 25)
today = datetime.now().date()
one_year_ago = today - timedelta(days=720)

# 格式化日期
today_str = today.strftime('%Y%m%d')
one_year_ago_str = one_year_ago.strftime('%Y%m%d')

# 可自由調整select的區間
# today_str = '20240810'
# one_year_ago_str = '20210101'

# SQL 查詢 加權指數
query_twse = f"""
SELECT *
FROM daily_twse
WHERE date >= '{one_year_ago_str}' AND date <= '{today_str}'
"""

# SQL 查詢 個股
query_stock = f"""
SELECT *
FROM daily_price
WHERE date >= '{one_year_ago_str}' AND date <= '{today_str}'
"""

# SQL 查詢 細產業
query_sub_category = f"""
SELECT *
FROM sub_category
"""

# 使用 pd.read_sql 來執行查詢並讀取數據到 DataFrame
twse_df_raw = pd.read_sql(query_twse, engine)

stock_df_raw = pd.read_sql(query_stock, engine)

sub_category_list = pd.read_sql(query_sub_category, engine)



In [84]:
import pandas as pd
import pymysql

class MySQLUpdater:
    def __init__(self, db_params, table_name, df_stocks, condition, method):
        self.db = pymysql.connect(**db_params)
        self.cursor = self.db.cursor()
        self.table_name = table_name
        self.df_stocks = df_stocks
        self.condition = condition
        self.method = method
        


    def update_records(self):
        # 遍历 DataFrame 中的每一行

        for i, row in self.df_stocks.iterrows():
            try:
                if(self.method == '1'):
                    # 构建 SQL 更新语句
                    sql = f"""
                    UPDATE `{self.table_name}`
                    SET `5MA` = %s,
                        `8MA` = %s,
                        `10MA` = %s,
                        `20MA` = %s,
                        `20RS` = %s,
                        `60MA` = %s,
                        `60RS` = %s,
                        `240MA` = %s,
                        `240RS` = %s
                    WHERE `{self.condition}` = %s AND `Date` = %s
                    """
                    # 执行更新语句
                    self.cursor.execute(sql, (float(row['5MA']),float(row['8MA']),float(row['10MA']),float(row['20MA']),float(row['20RS']),float(row['60MA']),float(row['60RS']),float(row['240MA']),float(row['240RS']), row[self.condition], row['Date']))

                    # 提交更改
                    self.db.commit()
                    
                elif (self.method == '2'):

                    # 构建 SQL 更新语句 更新標準化RS
                    sql = f"""
                    UPDATE `{self.table_name}`
                    SET `20RS_sd` = %s,
                        `20RS_rank` = %s,
                        `60RS_sd` = %s,
                        `60RS_rank` = %s,
                        `240RS_sd` = %s,
                        `240RS_rank` = %s
                    WHERE `{self.condition}` = %s AND `Date` = %s
                    """

                    # 执行更新语句
                    self.cursor.execute(sql, (float(row['20RS_sd']),float(row['20RS_rank']),float(row['60RS_sd']),float(row['60RS_rank']),float(row['240RS_sd']),float(row['240RS_rank']), row[self.condition], row['Date']))

                    # 提交更改
                    self.db.commit()

                # elif (self.method == '3'):

                #     # 构建 SQL 更新语句 更新標準化RS
                #     sql = f"""
                #     UPDATE `{self.table_name}`
                #     SET `60RS_sd` = %s,
                #         `60RS_rank` = %s
                #     WHERE `{self.condition}` = %s AND `Date` = %s
                #     """

                #     # 执行更新语句
                #     self.cursor.execute(sql, (float(row['60RS_sd']),float(row['60RS_rank']), row[self.condition], row['Date']))

                #     # 提交更改
                #     self.db.commit()

            except Exception as e:
                print(f"Error updating row {i}: {e}")
                self.db.rollback()

    def close_connection(self):
        self.cursor.close()
        self.db.close()

# 示例使用
db_params = {
    'host': '127.0.0.1',
    'user': 'root',
    'password': '19970730',
    'database': 'sql_stock'
    # 設置MySQL資料庫連接
}

In [85]:
twse_df = twse_df_raw[twse_df_raw['指數名稱'] == '發行量加權股價指數'].copy()

twse_df_na = twse_df.isna()
twse_df_update = twse_df[twse_df_na.any(axis=1)] #需要被更新的欄位

twse_df.loc[:, '5MA'] = twse_df['價格指數值'].rolling(5).mean()
twse_df.loc[:, '8MA'] = twse_df['價格指數值'].rolling(8).mean()
twse_df.loc[:, '10MA'] = twse_df['價格指數值'].rolling(10).mean()
twse_df.loc[:, '20MA']= twse_df['價格指數值'].rolling(20).mean()
twse_df.loc[:, '60MA']= twse_df['價格指數值'].rolling(60).mean()
twse_df.loc[:, '240MA']= twse_df['價格指數值'].rolling(240).mean()


twse_df.loc[:, '20RS'] = round((twse_df['價格指數值']/twse_df['20MA'])*100, 4)
twse_df.loc[:, '60RS'] = round((twse_df['價格指數值']/twse_df['60MA'])*100, 4)
twse_df.loc[:, '240RS'] = round((twse_df['價格指數值']/twse_df['240MA'])*100, 4)
twse_df = twse_df.fillna(0)
#twse_df_new = twse_df.iloc[-2:]
twse_df_new = twse_df.loc[twse_df_update.index]  # 只更新尚未計算的新欄位


# 初始化并更新数据库
updater = MySQLUpdater(db_params, 'daily_twse', twse_df_new, '指數名稱', '1')
updater.update_records()
updater.close_connection()

In [86]:
import numpy as np
stock_ids = stock_df_raw['證券代號'].unique().tolist()

stock_df_na = stock_df_raw.isna()
stock_df_update = stock_df_raw[stock_df_na.any(axis=1)] #需要被更新的欄位

for i in stock_ids:
        if((len(i)==4) ): #1. 排除債 ETF等等 2.排除新掛牌資料不到calculate_day天
                #print(i)
                # 計算個股 RS
                temp_df = stock_df_raw[stock_df_raw['證券代號']==i].copy()

                temp_df.loc[:, '5MA'] = temp_df['收盤價'].rolling(5).mean()
                temp_df.loc[:, '8MA'] = temp_df['收盤價'].rolling(8).mean()
                temp_df.loc[:, '10MA'] = temp_df['收盤價'].rolling(10).mean()
                temp_df.loc[:, '20MA']= temp_df['收盤價'].rolling(20).mean()
                temp_df.loc[:, '60MA']= temp_df['收盤價'].rolling(60).mean()
                temp_df.loc[:, '240MA']= temp_df['收盤價'].rolling(240).mean()

                # 避免 temp_df['20MA']為0 
                if (temp_df['收盤價'] == 0).any() or (temp_df['20MA'] == 0).any():
                        temp_df.loc[:, '20RS'] = 0
                        temp_df.loc[:, '60RS'] = 0
                        temp_df.loc[:, '240RS'] = 0
                else :
                        temp_df.loc[:, '20RS'] = round((temp_df['收盤價']/temp_df['20MA'])*100, 4)
                        temp_df.loc[:, '60RS'] = round((temp_df['收盤價']/temp_df['60MA'])*100, 4)
                        temp_df.loc[:, '240RS'] = round((temp_df['收盤價']/temp_df['240MA'])*100, 4)


                temp_df = temp_df.fillna(0)

                #只更新尚未update欄位
                temp_update_df = stock_df_update[stock_df_update['證券代號']==i].copy()
                temp_df_new = temp_df.loc[temp_update_df.index]  # 只更新尚未計算的新欄位
                # 初始化并更新数据库
                updater = MySQLUpdater(db_params, 'daily_price', temp_df_new, '證券代號','1')
                updater.update_records()
updater.close_connection()

In [87]:
# 

import pandas as pd
from sqlalchemy import create_engine
from datetime import datetime, timedelta

# 設置MySQL資料庫連接
db_user = 'root'
db_password = '19970730'
db_host = '127.0.0.1'
db_name = 'sql_stock'

# 創建資料庫連接引擎
engine = create_engine(f'mysql+mysqlconnector://{db_user}:{db_password}@{db_host}/{db_name}')

# 設置日期範圍
#today = datetime(2024, 5, 25)
start_date = pd.to_datetime(stock_df_update['Date']).unique().tolist()[0].date()
end_date = pd.to_datetime(stock_df_update['Date']).unique().tolist()[-1].date()

# 格式化日期
start_date = start_date.strftime('%Y%m%d')
end_date = end_date.strftime('%Y%m%d')

# SQL 查詢 個股
query_stock = f"""
SELECT *
FROM daily_price
WHERE date >= '{start_date}' AND date <= '{end_date}'
"""

stock_df_update_done = pd.read_sql(query_stock, engine)


In [97]:
# 計算個股對大盤RS
#先轉置取Date
stock_df_update_done['Date'] = pd.to_datetime(stock_df_update_done['Date'])

#stock_date_list = stock_df_raw['Date'].unique().tolist()
stock_date_list =  pd.to_datetime(stock_df_update['Date']).unique().tolist()  # 指查詢要更新的日期

# 要注意一下twse_df_new是不是有資料，沒有的話twse_df_new會空掉就沒辦法更新RS 因為是跟大盤在比 所以要改用註解這段

# twse_df = twse_df_raw[twse_df_raw['指數名稱'] == '發行量加權股價指數'].copy()
# twse_df['Date'] = pd.to_datetime(twse_df['Date'])
# twse_df.rename(columns={'20RS': '20RS_twse','60RS': '60RS_twse','240RS': '240RS_twse'}, inplace=True)
# cols = ['Date', '價格指數值', '20RS_twse', '60RS_twse', '240RS_twse']
# twse_df_forRS = twse_df[cols]


twse_df_new['Date'] = pd.to_datetime(twse_df_new['Date']) 
twse_df_new.rename(columns={'20RS': '20RS_twse','60RS': '60RS_twse','240RS': '240RS_twse'}, inplace=True)
cols = ['Date', '價格指數值', '20RS_twse', '60RS_twse', '240RS_twse']
twse_df_forRS = twse_df_new[cols]

merged_df = pd.merge(stock_df_update_done, twse_df_forRS, on='Date', how='inner')
merged_df['20RS_sd'] = merged_df['20RS']/merged_df['20RS_twse']  #20MA計算RS
merged_df['60RS_sd'] = merged_df['60RS']/merged_df['60RS_twse']  #60MA計算RS
merged_df['240RS_sd'] = merged_df['240RS']/merged_df['240RS_twse']  #240MA計算RS
merged_df = merged_df.fillna(0)

col2 = ['Date', '證券代號', '20RS_sd','20RS_rank','60RS_sd','60RS_rank','240RS_sd','240RS_rank']
for i in range(len(stock_date_list)) :
    temp_merged_df = merged_df[merged_df['Date']==stock_date_list[i]].copy()
    # #標準化成0-100名
    temp_merged_df['20RS_rank'] = (temp_merged_df['20RS_sd'].rank(axis=0,method='first')/len(temp_merged_df))*100
    temp_merged_df['60RS_rank'] = (temp_merged_df['60RS_sd'].rank(axis=0,method='first')/len(temp_merged_df))*100
    temp_merged_df['240RS_rank'] = (temp_merged_df['240RS_sd'].rank(axis=0,method='first')/len(temp_merged_df))*100
    temp_merged_df = temp_merged_df[col2]


    # 初始化并更新数据库
    updater = MySQLUpdater(db_params, 'daily_price', temp_merged_df, '證券代號','2')
    updater.update_records()
updater.close_connection()



# df_rs_week = pd.concat([df_rs_week,df_rs70])
# df_rsall_week = pd.concat([df_rsall_week,df_rs])
# # 順便統計細產業佔比
# rs70_sub_category = get_sub_category_list(sub_category_list,df_rs70,day)                        
# df_rs70_category_week = pd.concat([df_rs70_category_week,rs70_sub_category])


  merged_df = merged_df.fillna(0)


In [None]:
import pandas as pd
from sqlalchemy import create_engine
from datetime import datetime, timedelta

# 設置MySQL資料庫連接
db_user = 'root'
db_password = '19970730'
db_host = '127.0.0.1'
db_name = 'sql_stock'

# 創建資料庫連接引擎
engine = create_engine(f'mysql+mysqlconnector://{db_user}:{db_password}@{db_host}/{db_name}')

# 設置日期範圍
#today = datetime(2024, 5, 25)
today = datetime.now().date()
one_year_ago = today - timedelta(days=365)

# 格式化日期
today_str = today.strftime('%Y%m%d')
one_year_ago_str = one_year_ago.strftime('%Y%m%d')



# SQL 查詢 個股
query_stock = f"""
SELECT *
FROM daily_price
WHERE date >= '{one_year_ago_str}' AND date <= '{today_str}'
"""

# 使用 pd.read_sql 來執行查詢並讀取數據到 DataFrame
stock_df_raw = pd.read_sql(query_stock, engine)

