In [None]:
# pip install beautifulsoup4 -q
# pip install lxml -q


In [5]:
import os

def create_folder_if_not_exist(folder_name):
    """
    如果當前目錄中不存在指定名稱的資料夾，則創建它。
    
    參數:
        folder_name (str): 資料夾名稱。
    """
    if not os.path.exists(folder_name):
        os.makedirs(folder_name)


# 建立股票清單 stoke_list_pd

In [3]:
import requests
from bs4 import BeautifulSoup
import pandas as pd
import numpy as np
import time

In [3]:
'''抓清單
'''
# 股票列表網頁URL
url = "https://isin.twse.com.tw/isin/C_public.jsp?strMode=2"

# 用 requests 獲取網頁內容
response = requests.get(url)
# response.encoding = 'utf-8'  # 確保文字編碼正確

# 使用 BeautifulSoup 解析 HTML
soup = BeautifulSoup(response.text, 'html.parser')

# 定位到表格並解析出每一行
rows = soup.find_all('tr')

# 解析資料
stoke_list_data = []
for row in rows:
    cols = row.find_all('td')
    cols = [ele.text.strip() if ele.text.strip() else ' ' for ele in cols]  # 將空值替換為空格
    stoke_list_data.append(cols)

# 檢查並調整列表中最後一個欄位是空的情況
for data in stoke_list_data:
    if data[-1] == ' ':  # 檢查最後一個欄位是否是空的
        data[-1] = data[-2]  # 將倒數第二個欄位的值移動到最後一個欄位
        data[-2] = ' '  # 將倒數第二個欄位設置為空

# 刪除所有列表的倒數第二個位置的資料（如果列表長度足夠）
for data in stoke_list_data:
    if len(data) >= 2:
        del data[-2]

stoke_list_pending_pd= pd.DataFrame(stoke_list_data[1:])
stock_info = stoke_list_pending_pd[0]
stock_info = stock_info.str.split(r'\s+', n=1, expand=True)
stock_info.columns = ['股票代號', '公司名稱'] 
stoke_list_pending_pd.columns = ['有價證券代號及名稱','國際證券辨識號碼(ISIN Code)', '上市日','市場別', '產業別','CFICode'] 
stoke_list_pd = pd.concat([stock_info,stoke_list_pending_pd.iloc[:,1:]], axis=1)

stoke_list_pd['類別'] = ''

group_name = "test"  # 初始化 group_name 變數

for index, row in stoke_list_pd.iterrows():
    if pd.isna(row['公司名稱']):  # Check if '公司名稱' is not NaN
        group_name = row['股票代號']  # Update group_name to the current '股票代號'
        nan_indices = stoke_list_pd.index[stoke_list_pd['公司名稱'].isna()].tolist()
    elif pd.notna(row['公司名稱']) and group_name is not None:
        # Set '類別' to group_name where '公司名稱' is NaN and group_name has been set
        stoke_list_pd.at[index, '類別'] = group_name
    
# 删除这些行
stoke_list_pd.drop(nan_indices, inplace=True)
stoke_list_pd.head(5)

# stoke_list_pd.to_csv('stoke_list.csv', index=False,encoding='utf-8-sig')

Unnamed: 0,股票代號,公司名稱,國際證券辨識號碼(ISIN Code),上市日,市場別,產業別,CFICode,類別
1,1101,台泥,TW0001101004,1962/02/09,上市,水泥工業,ESVUFR,股票
2,1102,亞泥,TW0001102002,1962/06/08,上市,水泥工業,ESVUFR,股票
3,1103,嘉泥,TW0001103000,1969/11/14,上市,水泥工業,ESVUFR,股票
4,1104,環泥,TW0001104008,1971/02/01,上市,水泥工業,ESVUFR,股票
5,1108,幸福,TW0001108009,1990/06/06,上市,水泥工業,ESVUFR,股票


建立在當前目錄下的 以市場別命名的資料夾下的以類別名稱命名的資料匣 如果沒有就創建他
類別
    編號_股票名稱
        編號_股票名稱_年_月.csv


# 依據清單內容將抓取資料，並根據類別建立資料夾，再根據代號以及名稱建立資料夾，最終講每月的交易量都放進去

In [1]:
import pandas as pd
# 指定檔案路徑
file_path = r'C:\Users\D\Desktop\project\股票程式碼\stoke_list.csv'

# 讀取CSV檔案到 pandas DataFrame
stoke_list_pd = pd.read_csv(file_path)

In [6]:
'''查歷年股價 stockNo_str
'''

stockNo_str = "2618"
stockName_str = stoke_list_pd.loc[stoke_list_pd["股票代號"] == stockNo_str, "公司名稱"].iloc[0]
financial_category_str = stoke_list_pd.loc[stoke_list_pd["股票代號"] == stockNo_str, "類別"].iloc[0]

year_list = [str(year) for year in range(2015, 2025)]
month_dict = {1: "01", 2: "02", 3: "03", 4: "04", 5: "05", 6: "06", 7: "07", 8: "08", 9: "09", 10: "10", 11: "11", 12: "12"}

lasr_stock_monthly_data_last_stroke_pd = pd.DataFrame()

for year in year_list:
    for month, month_str in month_dict.items():
        url = f"https://www.twse.com.tw/exchangeReport/STOCK_DAY?response=json&date={year}{month_str}01&stockNo={stockNo_str}"
        response = requests.get(url)
        data = response.json()

        try:
            response = requests.get(url)
            data = response.json()

            if data['stat'] == 'OK':
                stock_monthly_data_pd = pd.DataFrame(data['data'], columns=data['fields'])
            else:
                print("無法獲取數據，回應狀態:", data['stat'])
                continue

            columns_to_convert_float = ['成交股數', '成交金額', '開盤價', '最高價', '最低價', '收盤價', '漲跌價差', '成交筆數']
            for column in columns_to_convert_float:
                stock_monthly_data_pd[column] = pd.to_numeric(stock_monthly_data_pd[column].str.replace(',', '').replace('"', '', regex=True), errors='coerce')
                if column in ['成交股數', '成交筆數']:
                    stock_monthly_data_pd[column] = stock_monthly_data_pd[column].astype(float)  # 取整數位
                else:
                    stock_monthly_data_pd[column] = round(stock_monthly_data_pd[column], 2)  # 取小數點後二位
                # 轉換成NAN

            # 處理 NAN
            if stock_monthly_data_pd.isna().any(axis=1).any():
                stock_monthly_data_nan_iloc =stock_monthly_data_pd[stock_monthly_data_pd.isna().any(axis=1)].index.values[0]
                if stock_monthly_data_nan_iloc == 0 :
                    if lasr_stock_monthly_data_last_stroke_pd.empty:
                        stock_monthly_data_pd['漲跌價差'] = 0
                    else:
                        # lasr_stock_monthly_closing_price = lasr_stock_monthly_data_last_stroke_pd.loc[lasr_stock_monthly_data_last_stroke_pd, '收盤價']
                        lasr_stock_monthly_closing_price = lasr_stock_monthly_data_last_stroke_pd['收盤價']
                        stock_monthly_data_pd['漲跌價差'] = stock_monthly_data_pd.loc[stock_monthly_data_nan_iloc, '收盤價'] -lasr_stock_monthly_closing_price
                else:
                    stock_monthly_data_pd.loc[stock_monthly_data_nan_iloc, '漲跌價差'] = stock_monthly_data_pd.loc[stock_monthly_data_nan_iloc, '收盤價'] - stock_monthly_data_pd.loc[stock_monthly_data_nan_iloc-1, '收盤價']
                lasr_stock_monthly_data_last_stroke_pd = stock_monthly_data_pd.iloc[-1].copy()

            if stock_monthly_data_pd[columns_to_convert_float].isnull().any().any():
                anomaly_path = os.path.join(os.getcwd(), "異常")
                create_folder_if_not_exist(anomaly_path)
                file_name = f"{stockNo_str}_{year}_{month_str}_anomaly.csv"
                stock_monthly_data_pd.to_csv(os.path.join(anomaly_path, file_name), index=False)
            else:
                create_folder_if_not_exist((financial_category_str))
                stock_folder_name = f"{stockNo_str}_{stockName_str}"
                stock_folder_path = os.path.join(os.getcwd(), financial_category_str, stock_folder_name)
                create_folder_if_not_exist(stock_folder_path)
                file_name = f"{stockNo_str}_{stockName_str}_{year}_{month_str}.csv"
                stock_monthly_data_pd.to_csv(os.path.join(stock_folder_path, file_name), index=False)
        except requests.exceptions.RequestException as e:
            print("請求出現異常:", e)
        except ValueError as ve:
            print("解析 JSON 數據時出現錯誤:", ve)

        time.sleep(6)
        


無法獲取數據，回應狀態: 查詢日期大於今日，請重新查詢!
無法獲取數據，回應狀態: 查詢日期大於今日，請重新查詢!
無法獲取數據，回應狀態: 查詢日期大於今日，請重新查詢!
無法獲取數據，回應狀態: 查詢日期大於今日，請重新查詢!
無法獲取數據，回應狀態: 查詢日期大於今日，請重新查詢!
無法獲取數據，回應狀態: 查詢日期大於今日，請重新查詢!
無法獲取數據，回應狀態: 查詢日期大於今日，請重新查詢!


In [None]:
import pandas as pd

# 指定檔案路徑
file_path = r'C:\Users\D\Desktop\project\股票\異常\2330_2010_07_anomaly.csv'

# 讀取CSV檔案到 pandas DataFrame
stock_data = pd.read_csv(file_path)

# 顯示 DataFrame 的前幾行以確認數據讀取正確
print(stock_data.head())


In [None]:
print("父目錄:",parent_directory_path)
print("當前:",os.getcwd())

# 將資料合併

In [None]:
import os
import pandas as pd

# 定義函數合併並檢查資料格式
def merge_and_check_format(folder_path, output_csv_path):
    """
    合併指定資料夾中的所有 CSV 檔案，並檢查合併後的 DataFrame 中每個欄位的資料格式。
    
    參數:
        folder_path (str): 包含 CSV 檔案的資料夾路徑。
        output_csv_path (str): 輸出合併後 CSV 檔案的路徑。
    """
    # 獲取資料夾下的所有檔案
    files = os.listdir(folder_path)
    dfs = []

    # 讀取每個檔案並合併到一個 DataFrame 中
    for file in files:
        file_path = os.path.join(folder_path, file)
        df = pd.read_csv(file_path, encoding='utf-8')
        dfs.append(df)

    merged_df = pd.concat(dfs, ignore_index=True)

    # 檢查每個資料格的資料格式
    different_formats = {}

    for col in merged_df.columns:
        formats = merged_df[col].apply(type).unique()
        if len(formats) > 1:
            different_formats[col] = formats

    if different_formats:
        print("下列格子的資料格式不同：")
        for col, formats in different_formats.items():
            print(f"列名: {col}, 不同的資料格式: {formats}")
    else:
        print("所有格子的資料格式相同：")
        for col in merged_df.columns:
            print(f"列名: {col}, 資料格式: {merged_df[col].dtype}")

    # 將合併後的 DataFrame 存儲為 CSV 檔案
    merged_df.to_csv(output_csv_path, index=False, encoding='utf-8-sig')

# 指定資料夾路徑和輸出 CSV 檔案路徑
folder_path = r"C:\Users\D\Desktop\project\股票程式碼\股票\2330_台積電"
output_csv_path = "合併資料.csv"

# 呼叫函數合併並檢查資料格式，並將結果存儲為 CSV 檔案
merge_and_check_format(folder_path, output_csv_path)
