In [None]:
# 引入requests庫
import requests  
# 定義API的URL
url = 'https://openapi.twse.com.tw/v1/exchangeReport/STOCK_DAY_ALL'  
# 發送GET請求
res = requests.get(url)  
res 

In [None]:
import pandas as pd

# 轉成 DataFrame
df = pd.DataFrame(res.json())

# 儲存為 CSV 檔案
df.to_csv("stock_data.csv", index=False, encoding='utf-8-sig')

In [None]:
# 建立 tse_xxxx.tw 格式，並用 | 串接
filtered = df[df['Code'].isin(df['Code'])]
tse_list = filtered['Code'].apply(lambda x: f"tse_{x}.tw")
joined_str = "|".join(tse_list)

joined_str

In [None]:
tse_all = df['Code'].apply(lambda x: f"tse_{x}.tw").tolist()

# 拆分成每組 50 個，共 26 組
chunks = [tse_all[i:i + 50] for i in range(0, len(tse_all), 50)]

# 每組轉為 | 字串
chunked_strings = ["|".join(chunk) for chunk in chunks]

# 顯示總組數和前三組預覽
len(chunked_strings), chunked_strings[-1:]

In [None]:
# 填入網址模板
base_url = "https://mis.twse.com.tw/stock/api/getStockInfo.jsp?json=1&delay=0&ex_ch={}"
urls = [base_url.format(chunk) for chunk in chunked_strings]
urls


In [None]:
# 🧩 將清單分割成多個小清單（每 batch_n 筆）
def chunk_list(lst, chunk_size):
    for i in range(0, len(lst), chunk_size):
        yield lst[i:i + chunk_size]

# 📥 用來發送 GET 請求
def fetch_url_data(url):
    try:
        response = requests.get(url, timeout=5)
        if response.status_code == 200:
            return response.json()
        else:
            print(f"Failed to fetch {url} - status: {response.status_code}")
            return None
    except Exception as e:
        print(f"Error fetching {url}: {e}")
        return None
 
    


In [None]:
import time
import random
import json
from datetime import datetime


def fetchListedCompanyPrice():
    all_data = []
    
    for url in urls:
        print(f"Fetching: {url}")
        data = fetch_url_data(url)
        if data and 'msgArray' in data:
            all_data.extend(data['msgArray'])
    
            time.sleep(random.uniform(3, 6))  # 避免太頻繁被封鎖

    # 取得目前時間
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    # 自訂檔名
    filename = f"twse_data_{timestamp}.json"
    # 儲存 JSON 檔案
    with open(filename, "w", encoding="utf-8") as f:
        json.dump(all_data, f, ensure_ascii=False, indent=2)

    print(f"✅ 資料儲存完成，檔案名稱：{filename}")
    

In [None]:
from apscheduler.schedulers.background import BackgroundScheduler
import logging

# 啟用 logging，顯示錯誤
logging.basicConfig()
logging.getLogger('apscheduler').setLevel(logging.DEBUG)

scheduler = BackgroundScheduler()
scheduler.add_job(fetchListedCompanyPrice, 'interval', minutes=2)
scheduler.start()

try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    scheduler.shutdown()

In [None]:
import time
import random 

all_data = []

for url in urls:  # 每 10 個一起查詢 
    print(f"Fetching: {url}")
    data = fetch_url_data(url)
    if data and 'msgArray' in data:
        all_data.extend(data['msgArray'])
    
    time.sleep(random.uniform(4, 6))  # 避免太頻繁被封鎖

In [None]:
import json
from datetime import datetime

# 取得目前時間
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

# 自訂檔名
filename = f"twse_data_{timestamp}.json"

# 儲存 JSON 檔案
with open(filename, "w", encoding="utf-8") as f:
    json.dump(all_data, f, ensure_ascii=False, indent=2)

print(f"✅ 資料儲存完成，檔案名稱：{filename}")