In [1]:
# 把1分K線資料轉換為日K線資料 output 原始
import os
import pandas as pd

# 根目錄路徑
root_folder = "data"

# 股票清單
stock_list = ["2308","2303","2317","2330","2382","2454","2881","2882","2891","3711"]
# 年份清單
year_list = ["2020", "2021", "2022", "2023", "2024"]

# 遍歷股票清單
for stock in stock_list:
    stock_folder = os.path.join(root_folder, stock+"_kbars")
    if not os.path.exists(stock_folder):
        print(f"Stock folder {stock} does not exist.")
        continue

    # 遍歷年份清單
    for year in year_list:
        year_folder = os.path.join(stock_folder, year)
        if not os.path.exists(year_folder):
            print(f"Year folder {year} for stock {stock} does not exist.")
            continue

        # 初始化存儲當前年份的結果
        yearly_data = []

        # 遍歷年份資料夾中的所有檔案
        for file_name in os.listdir(year_folder):
            if file_name.endswith("_1min.csv"):
                file_path = os.path.join(year_folder, file_name)
                try:
                    # 讀取檔案
                    df = pd.read_csv(file_path)

                    # 檢查資料是否有內容
                    if df.empty:
                        continue

                    # 確保必要欄位存在
                    required_columns = ['date', 'open', 'high', 'low', 'close', 'volume', 'tic']
                    if not all(col in df.columns for col in required_columns):
                        print(f"File {file_name} is missing required columns.")
                        continue

                    # 將 'date' 欄位轉換為日期格式並只保留年月日
                    df['date'] = pd.to_datetime(df['date']).dt.date

                    # 按日期分組，計算一日K
                    daily_data = df.groupby('date').agg(
                        date=('date', 'first'),
                        open=('open', 'first'),
                        high=('high', 'max'),
                        low=('low', 'min'),
                        close=('close', 'last'),
                        open_volume=('volume', 'first'),
                        end_volume=('volume', 'last'),
                        total_volume=('volume', 'sum'),
                        tic=('tic', 'first')
                    ).reset_index(drop=True)

                    # 確保數值型資料最多保留5位小數
                    numeric_columns = ['open', 'high', 'low', 'close', 'open_volume', 'avg_volume']
                    for col in numeric_columns:
                        if col in daily_data.columns:
                            daily_data[col] = daily_data[col].round(5)

                    # 添加到當前年份結果列表
                    yearly_data.append(daily_data)

                except Exception as e:
                    print(f"Error processing file {file_name} in {year_folder}: {e}")

        # 合併當前年份資料並儲存
        if yearly_data:
            yearly_result = pd.concat(yearly_data, ignore_index=True)
            output_file = os.path.join(stock_folder, f"{stock}_{year}_daily_data.csv")
            yearly_result.to_csv(output_file, index=False)
            print(f"Processed data for {stock} {year} saved to {output_file}")
        else:
            print(f"No valid data found for {stock} {year}.")


Processed data for 2308 2020 saved to data\2308_kbars\2308_2020_daily_data.csv
Processed data for 2308 2021 saved to data\2308_kbars\2308_2021_daily_data.csv
Processed data for 2308 2022 saved to data\2308_kbars\2308_2022_daily_data.csv
Processed data for 2308 2023 saved to data\2308_kbars\2308_2023_daily_data.csv
Processed data for 2308 2024 saved to data\2308_kbars\2308_2024_daily_data.csv
Processed data for 2303 2020 saved to data\2303_kbars\2303_2020_daily_data.csv
Processed data for 2303 2021 saved to data\2303_kbars\2303_2021_daily_data.csv
Processed data for 2303 2022 saved to data\2303_kbars\2303_2022_daily_data.csv
Processed data for 2303 2023 saved to data\2303_kbars\2303_2023_daily_data.csv
Processed data for 2303 2024 saved to data\2303_kbars\2303_2024_daily_data.csv
Processed data for 2317 2020 saved to data\2317_kbars\2317_2020_daily_data.csv
Processed data for 2317 2021 saved to data\2317_kbars\2317_2021_daily_data.csv
Processed data for 2317 2022 saved to data\2317_kbar

In [2]:
#把輸入資料取前30筆資料，並計算漲幅，存成新檔案
import os
import pandas as pd

# 根目錄路徑
root_folder = "data"
output_folder = "processed_data"

# 股票清單
stock_list = ["2308","2303","2317","2330","2382","2454","2881","2882","2891","3711"]
# 年份清單
year_list = ["2020", "2021", "2022", "2023", "2024"]

# 確保輸出資料夾存在
if not os.path.exists(output_folder):
    os.makedirs(output_folder)

# 遍歷股票清單
for stock in stock_list:
    stock_folder = os.path.join(root_folder, stock+"_kbars")
    if not os.path.exists(stock_folder):
        print(f"Stock folder {stock} does not exist.")
        continue

    # 確保股票分類資料夾存在於輸出資料夾
    stock_output_folder = os.path.join(output_folder, stock)
    if not os.path.exists(stock_output_folder):
        os.makedirs(stock_output_folder)

     # 遍歷年份清單
    for year in year_list:
        year_folder = os.path.join(stock_folder, year)
        if not os.path.exists(year_folder):
            print(f"Year folder {year} for stock {stock} does not exist.")
            continue

        # 確保年份資料夾存在於輸出資料夾
        year_output_folder = os.path.join(stock_output_folder, year)
        if not os.path.exists(year_output_folder):
            os.makedirs(year_output_folder)

        # 遍歷年份資料夾中的所有檔案
        for file_name in os.listdir(year_folder):
            if file_name.endswith("_1min.csv"):
                file_path = os.path.join(year_folder, file_name)
                try:
                    # 讀取檔案
                    df = pd.read_csv(file_path)

                    # 檢查資料是否有內容
                    if df.empty:
                        continue

                    # 確保必要欄位存在
                    required_columns = ['date', 'open', 'high', 'low', 'close', 'volume', 'tic']
                    if not all(col in df.columns for col in required_columns):
                        print(f"File {file_name} is missing required columns.")
                        continue

                    # 將 'date' 欄位轉換為日期格式
                    df['datetime'] = pd.to_datetime(df['date'])
                    df['date'] = df['datetime'].dt.date

                    # 按日期分組，保留前30筆資料
                    grouped = df.groupby('date')
                    for date, group in grouped:
                        top_30 = group.head(30).copy()  # 明確建立副本

                        # 計算漲幅並取代原始值 (high, low, close, open)
                        reference_price = top_30.iloc[0]['open']
                        for col in ['high', 'low', 'close', 'open']:
                            top_30[col] = (((top_30[col] - reference_price) / reference_price) ).round(10)

                        # 儲存每一天的資料
                        output_file = os.path.join(year_output_folder, f"{stock}_{date}_top30_data.csv")
                        top_30.to_csv(output_file, index=False)

                except Exception as e:
                    print(f"Error processing file {file_name} in {year_folder}: {e}")


In [3]:
# 把日K線資料轉換為漲幅資料
import os
import pandas as pd

# 根目錄路徑
daily_data_root = "data"
output_folder = "processed_data"

# 股票清單
stock_list = ["2308","2303","2317","2330","2382","2454","2881","2882","2891","3711"]
# 年份清單
year_list = ["2020", "2021", "2022"]

# 確保輸出資料夾存在
if not os.path.exists(output_folder):
    os.makedirs(output_folder)

# 遍歷股票清單
for stock in stock_list:
    stock_folder = os.path.join(daily_data_root, stock+"_kbars")
    if not os.path.exists(stock_folder):
        print(f"Stock folder {stock} does not exist.")
        continue

    # 確保股票分類資料夾存在於輸出資料夾
    stock_output_folder = os.path.join(output_folder, stock)
    if not os.path.exists(stock_output_folder):
        os.makedirs(stock_output_folder)

    # 遍歷年份清單
    for year in year_list:
        input_file = os.path.join(stock_folder, f"{stock}_{year}_daily_data.csv")
        if not os.path.exists(input_file):
            print(f"File {input_file} does not exist.")
            continue

        try:
            # 讀取每日K資料
            df = pd.read_csv(input_file)

            # 檢查資料是否有內容
            if df.empty:
                continue

            # 確保必要欄位存在
            required_columns = ['date', 'open', 'high', 'low', 'close']
            if not all(col in df.columns for col in required_columns):
                print(f"File {input_file} is missing required columns.")
                continue

            # 計算漲幅並取代原始值 (high, low, close)
            df['high'] = (((df['high'] - df['open']) / df['open']) ).round(10)
            df['low'] = (((df['low'] - df['open']) / df['open']) ).round(10)
            df['close'] = (((df['close'] - df['open']) / df['open'])).round(10)
            df['open'] = 0.0

            # 儲存更新後的資料
            output_file = os.path.join(stock_output_folder, f"{stock}_{year}_daily_data.csv")
            df.to_csv(output_file, index=False)
            print(f"Processed data saved to {output_file}")

        except Exception as e:
            print(f"Error processing file {input_file}: {e}")


Processed data saved to processed_data\2308\2308_2020_daily_data.csv
Processed data saved to processed_data\2308\2308_2021_daily_data.csv
Processed data saved to processed_data\2308\2308_2022_daily_data.csv
Processed data saved to processed_data\2303\2303_2020_daily_data.csv
Processed data saved to processed_data\2303\2303_2021_daily_data.csv
Processed data saved to processed_data\2303\2303_2022_daily_data.csv
Processed data saved to processed_data\2317\2317_2020_daily_data.csv
Processed data saved to processed_data\2317\2317_2021_daily_data.csv
Processed data saved to processed_data\2317\2317_2022_daily_data.csv
Processed data saved to processed_data\2330\2330_2020_daily_data.csv
Processed data saved to processed_data\2330\2330_2021_daily_data.csv
Processed data saved to processed_data\2330\2330_2022_daily_data.csv
Processed data saved to processed_data\2382\2382_2020_daily_data.csv
Processed data saved to processed_data\2382\2382_2021_daily_data.csv
Processed data saved to processed_

In [4]:
# 把日K線資料轉換為Z-score資料 並計算大量交易日
import os
import pandas as pd

# 根目錄路徑
daily_data_root = "processed_data"
output_folder = "final_output"


# 股票清單
stock_list = ["2308","2303","2317","2330","2382","2454","2881","2882","2891","3711"]
# 年份清單
year_list = ["2020", "2021", "2022"]

# 確保輸出資料夾存在
if not os.path.exists(output_folder):
    os.makedirs(output_folder)

# 初始化統計數據
all_high = []
all_low = []
all_close = []

# 收集所有股票所有年份的數據
for stock in stock_list:
    stock_folder = os.path.join(daily_data_root, stock)
    if not os.path.exists(stock_folder):
        print(f"Stock folder {stock} does not exist.")
        continue

    for year in year_list:
        input_file = os.path.join(stock_folder, f"{stock}_{year}_daily_data.csv")
        if not os.path.exists(input_file):
            print(f"File {input_file} does not exist.")
            continue

        try:
            # 讀取每日K資料
            df = pd.read_csv(input_file)

            # 檢查資料是否有內容
            if df.empty:
                continue

            # 確保必要欄位存在
            if not all(col in df.columns for col in ['high', 'low', 'close']):
                print(f"File {input_file} is missing required columns.")
                continue

            # 添加到全域數據列表
            all_high.extend(df['high'].tolist())
            all_low.extend(df['low'].tolist())
            all_close.extend(df['close'].tolist())

        except Exception as e:
            print(f"Error processing file {input_file}: {e}")

# 計算全域 Z-score 參數
zscore_params = {
    "high_mean": pd.Series(all_high).mean(),
    "high_std": pd.Series(all_high).std(),
    "low_mean": pd.Series(all_low).mean(),
    "low_std": pd.Series(all_low).std(),
    "close_mean": pd.Series(all_close).mean(),
    "close_std": pd.Series(all_close).std(),
}

# 遍歷每隻股票並應用 Z-score 和判斷
for stock in stock_list:
    stock_folder = os.path.join(daily_data_root, stock)
    if not os.path.exists(stock_folder):
        continue

    large_business_days = 0
    total_days = 0
    total_volume_sum = 0

    for year in year_list:
        input_file = os.path.join(stock_folder, f"{stock}_{year}_daily_data.csv")
        if not os.path.exists(input_file):
            continue

        try:
            # 讀取每日K資料
            df = pd.read_csv(input_file)
            if df.empty:
                continue

            # 計算當前年份的平均交易量
            if 'total_volume' in df.columns:
                year_total_volume = df['total_volume'].sum()
                year_total_days = len(df)
                average_volume = year_total_volume / year_total_days if year_total_days > 0 else 0

                # 更新全域交易量統計
                total_volume_sum += year_total_volume
                total_days += year_total_days

                # 判斷大量交易
                df['large_business'] = (df['total_volume'] > 2 * average_volume).astype(int)
                large_business_days += df['large_business'].sum()

            # 應用全域 Z-score
            for col in ['high', 'low', 'close']:
                mean = zscore_params[f"{col}_mean"]
                std = zscore_params[f"{col}_std"]
                df[col] = ((df[col] - mean) / std).round(10)

            # 儲存更新後的資料
            output_file = os.path.join(output_folder, stock, f"{stock}_{year}_daily_data.csv")
            os.makedirs(os.path.dirname(output_file), exist_ok=True)
            df.to_csv(output_file, index=False)

        except Exception as e:
            print(f"Error processing file {input_file} for stock {stock}: {e}")

    # 輸出統計結果
    no_large_business_days = total_days - large_business_days
    print(f"Stock: {stock}")
    print(f"Average daily volume: {total_volume_sum / total_days if total_days > 0 else 0:.2f}")
    print(f"Days with large business: {large_business_days}")
    print(f"Days without large business: {no_large_business_days}")

# 輸出全域 Z-score 參數
print(f"Global Z-score parameters: {zscore_params}")


Stock: 2308
Average daily volume: 7216.22
Days with large business: 38
Days without large business: 663
Stock: 2303
Average daily volume: 149270.82
Days with large business: 52
Days without large business: 649
Stock: 2317
Average daily volume: 40681.68
Days with large business: 51
Days without large business: 650
Stock: 2330
Average daily volume: 39221.11
Days with large business: 40
Days without large business: 661
Stock: 2382
Average daily volume: 9153.91
Days with large business: 24
Days without large business: 677
Stock: 2454
Average daily volume: 8281.57
Days with large business: 36
Days without large business: 665
Stock: 2881
Average daily volume: 21734.45
Days with large business: 48
Days without large business: 653
Stock: 2882
Average daily volume: 26536.95
Days with large business: 56
Days without large business: 645
Stock: 2891
Average daily volume: 41030.00
Days with large business: 40
Days without large business: 661
Stock: 3711
Average daily volume: 16327.26
Days with larg

In [5]:
import os
import pandas as pd

# 根目錄路徑
daily_data_root = "processed_data"
output_folder = "final_input"
# 股票清單
stock_list = ["2308","2303","2317","2330","2382","2454","2881","2882","2891","3711"]
# 年份清單
year_list = ["2020", "2021", "2022"]

# 確保輸出資料夾存在
if not os.path.exists(output_folder):
    os.makedirs(output_folder)

# 初始化統計數據
all_open = []
all_high = []
all_low = []
all_close = []
all_volume = []

# 收集所有股票所有年份的數據
for stock in stock_list:
    stock_folder = os.path.join(daily_data_root, stock)
    if not os.path.exists(stock_folder):
        print(f"Stock folder {stock} does not exist.")
        continue

    for year in year_list:
        year_folder = os.path.join(stock_folder, year)
        if not os.path.exists(year_folder):
            print(f"Year folder {year} for stock {stock} does not exist.")
            continue

        # 遍歷年份資料夾中的所有檔案
        for file_name in os.listdir(year_folder):
            if file_name.endswith(".csv"):
                file_path = os.path.join(year_folder, file_name)

                try:
                    # 讀取每日K資料
                    df = pd.read_csv(file_path)
                    print(file_name)

                    # 檢查資料是否有內容
                    if df.empty:
                        continue

                    # 確保必要欄位存在
                    if not all(col in df.columns for col in ['open', 'high', 'low', 'close', 'volume']):
                        print(f"File {file_path} is missing required columns.")
                        continue

                    # 添加到全域數據列表
                    all_open.extend(df['open'].tolist())
                    all_high.extend(df['high'].tolist())
                    all_low.extend(df['low'].tolist())
                    all_close.extend(df['close'].tolist())
                    all_volume.extend(df['volume'].tolist())

                except Exception as e:
                    print(f"Error processing file {file_path}: {e}")

# 計算全域 Z-score 參數
zscore_params = {
    "open_mean": pd.Series(all_open).mean(),
    "open_std": pd.Series(all_open).std(),
    "high_mean": pd.Series(all_high).mean(),
    "high_std": pd.Series(all_high).std(),
    "low_mean": pd.Series(all_low).mean(),
    "low_std": pd.Series(all_low).std(),
    "close_mean": pd.Series(all_close).mean(),
    "close_std": pd.Series(all_close).std(),
    "volume_mean": pd.Series(all_volume).mean(),
    "volume_std": pd.Series(all_volume).std(),
}

# 遍歷每隻股票並應用 Z-score
for stock in stock_list:
    stock_folder = os.path.join(daily_data_root, stock)
    if not os.path.exists(stock_folder):
        continue

    for year in year_list:
        year_folder = os.path.join(stock_folder, year)
        if not os.path.exists(year_folder):
            continue

        # 遍歷年份資料夾中的所有檔案
        for file_name in os.listdir(year_folder):
            if file_name.endswith(".csv"):
                input_file = os.path.join(year_folder, file_name)
                try:
                    # 讀取每日K資料
                    df = pd.read_csv(input_file)
                    if df.empty:
                        continue

                    # 應用全域 Z-score
                    for col in ['open', 'high', 'low', 'close', 'volume']:
                        mean = zscore_params[f"{col}_mean"]
                        std = zscore_params[f"{col}_std"]
                        df[col] = ((df[col] - mean) / std).round(8)

                    # 儲存更新後的資料
                    output_file = os.path.join(output_folder, stock, year, file_name)
                    os.makedirs(os.path.dirname(output_file), exist_ok=True)
                    df.to_csv(output_file, index=False)

                except Exception as e:
                    print(f"Error processing file {input_file} for stock {stock}: {e}")

# 輸出全域 Z-score 參數
print(f"Global Z-score parameters: {zscore_params}")


2308_2020-03-02_top30_data.csv
2308_2020-03-03_top30_data.csv
2308_2020-03-04_top30_data.csv
2308_2020-03-05_top30_data.csv
2308_2020-03-06_top30_data.csv
2308_2020-03-09_top30_data.csv
2308_2020-03-10_top30_data.csv
2308_2020-03-11_top30_data.csv
2308_2020-03-12_top30_data.csv
2308_2020-03-13_top30_data.csv
2308_2020-03-16_top30_data.csv
2308_2020-03-17_top30_data.csv
2308_2020-03-18_top30_data.csv
2308_2020-03-19_top30_data.csv
2308_2020-03-20_top30_data.csv
2308_2020-03-23_top30_data.csv
2308_2020-03-24_top30_data.csv
2308_2020-03-25_top30_data.csv
2308_2020-03-26_top30_data.csv
2308_2020-03-27_top30_data.csv
2308_2020-03-30_top30_data.csv
2308_2020-03-31_top30_data.csv
2308_2020-04-01_top30_data.csv
2308_2020-04-06_top30_data.csv
2308_2020-04-07_top30_data.csv
2308_2020-04-08_top30_data.csv
2308_2020-04-09_top30_data.csv
2308_2020-04-10_top30_data.csv
2308_2020-04-13_top30_data.csv
2308_2020-04-14_top30_data.csv
2308_2020-04-15_top30_data.csv
2308_2020-04-16_top30_data.csv
2308_202