## Stock catch

點選 Header，看 General 的 Request URL 及 Method 與 Response Headers 的 Content-Type 可以知道：
https://ithelp.ithome.com.tw/upload/images/20211006/20139358GVxM0ylTC2.png

- URL：https://www.twse.com.tw/exchangeReport/STOCK_DAY。
- 必要 Query：response=json&date=20211006&stockNo=2330。所以可以根據需求在 Query 的 response、date、stockNo 代入想要的值。
- HTTP Method 是 GET。
- Headers 的 Content-Type：application/json;charset=UTF-8 所以格式是 JSON，編碼為 UTF-8。
- 另外我們也知道了，不管我們 Query 的 date 是幾號，他的 server 是整個月份的日成交資訊都回給你，這點要注意一下喔！

![image.png](attachment:image.png)

In [None]:
# https://www.twse.com.tw/zh/page/trading/exchange/STOCK_DAY.html

In [None]:
import requests

url = "https://www.twse.com.tw/exchangeReport/STOCK_DAY"

# 
res = requests.get(url, params={
    "response": "json",
    "date": "20211006",
    "stockNo": "2330"
})

# 把 JSON 轉成 Python 可存取之型態
res_json = res.json()

# 我們要的每日成交資訊在 data 這個欄位
daily_price_list = res_json.get("data", [])

# 該欄位是 List 所以用 for 迴圈印出
for daily_price in daily_price_list:
    print(daily_price)

## Stock catch 2

根據前篇得到的資訊：

- URL：https://isin.twse.com.tw/isin/class_main.jsp。
- 必要的 Query：market=1&issuetype=1&Page=1&chklike=Y。
- 所以可以根據需求在 Query 的 market、issuetype、Page、chklike 代入自己要的值。
- HTTP Method 是 GET。
- Content-Type：text/html;charset=MS950，所以格式是 HTML，編碼為 MS950。

In [None]:
# project2
# pip install beautifulsoup4
# pip install lxml

In [None]:
from bs4 import BeautifulSoup
html = "<html>test test test</html>"
soup = BeautifulSoup(html, "lxml")
soup

In [None]:
from bs4 import BeautifulSoup
html = "<title>example1</title><title>example2</title>"
soup = BeautifulSoup(html, "lxml")

# find_all() 會搜尋整個 html 回傳符合的值
print(soup.find_all("title"))
# [<title>example1</title>, <title>example2</title>]

# find() 只會回傳"第一個"符合的值
print(soup.find("title"))
# <title>example1</title>

# 拿取 tag 之間的 text
print(soup.find("title").text)
# example1


In [None]:
import json
import requests
from bs4 import BeautifulSoup
# index constant，數字代表我們要的資料在 list 的位置
TARGET_TABLE_INDEX = 1
STOCK_NO_INDEX = 2
STOCK_NAME_INDEX = 3
STOCK_INDUSTRY_INDEX = 6

# JSON settings
TITLE = "stock"
JSON_INDENT = 4

# HTTP Request
url = "https://isin.twse.com.tw/isin/class_main.jsp"
res = requests.get(url, params={
    "market": "1",
    "issuetype": "1",
    "Page": "1",
    "chklike": "Y"
})

# 處理編碼，使用預設 utf-8 的話 res.text 的內容會有亂碼
res.encoding = "MS950"
res_html = res.text

# Parse
soup = BeautifulSoup(res_html, "lxml")

# HTML 裡面有兩張 table
tr_list = soup.find_all("table")[TARGET_TABLE_INDEX].find_all("tr") #find_all("table") 回傳的 list length 是 2
tr_list.pop(0)  # item 是欄位名稱用不到所以 pop 掉

# 開始處理資料
result = []
for tr in tr_list:
    td_list = tr.find_all("td")
    stock_no_val = td_list[STOCK_NO_INDEX].text  #Num
    stock_name_val = td_list[STOCK_NAME_INDEX].text #Name
    stock_industry_val = td_list[STOCK_INDUSTRY_INDEX].text #Industry

    # Save to dict
    result.append({
        "stockNo": stock_no_val,
        "stockName": stock_name_val,
        "stockIndustry": stock_industry_val
    })

#Download
stock_list_dict = {TITLE: result}
with open("stock_info_list.json", "w", encoding="utf-8") as f:
    f.write(json.dumps(stock_list_dict, indent=JSON_INDENT, ensure_ascii=False))

## User-Agent  Change 偽裝

F12 -> Network -> Request -> Headers -> User Agent

In [None]:
import requests
res = requests.get("https://www.google.com/")  #your browser
print(res.request.headers)

In [None]:
# pip install fake-useragent

In [None]:
# Sned Request User-agent。
from fake_useragent import UserAgent
ua = UserAgent()

# browser User-Agent
ua.ie
ua.google
ua.firefox
ua.safari
ua.random

In [None]:
import requests
from fake_useragent import UserAgent

ua = UserAgent()
res = requests.get("https://www.google.com/", headers={
    "User-Agent": ua.random
})

print(res.request.headers)

## multithreading

In [None]:
import time
def req1():
    print("req1: 開始送 request") 
    time.sleep(3)
    print("req1: 接收到 response") 
def req2():
    print("req2: 開始送 request") 
    time.sleep(3)
    print("req2: 接收到 response")

if __name__ == '__main__':
    req1()
    req2()

In [None]:
# import python 的多執行緒套件
import time
from threading import Thread

def req1():
    print("req1: 開始送 request") 
    time.sleep(3)
    print("req1: 接收到 response") 

def req2():
    print("req2: 開始送 request") 
    time.sleep(3)
    print("req2: 接收到 response")

if __name__ == '__main__':
    # 先將 funtion 包裝成 thread 的物件，再執行
    Thread(target=req1).start()
    Thread(target=req2).start()

In [None]:
import time
import requests
from threading import Thread

# 將流程先用 function 封裝起來，接下來才能帶入 thread 中
def daily_price_req(date, stock_no):
    res = requests.get("https://www.twse.com.tw/exchangeReport/STOCK_DAY",
                       params={
                           "response": "json",
                           "date": date,
                           "stockNo": stock_no
                       })

    # 把 JSON 轉成 Python 可存取之型態
    res_json = res.json()

    # 我們要的每日成交資訊在 data 這個欄位
    daily_price_list = res_json['data']

    # 印出資料
    print("{} 每日成交資訊: {}".format(stock_no, daily_price_list))


if __name__ == '__main__':
    # 日期什麼的，可以依照需求設定
    req_info_list = [
        {
            "date": "20211011",
            "stockNo": "2330"
        },
        {
            "date": "20211011",
            "stockNo": "2603"
        },
        {
            "date": "20211011",
            "stockNo": "2609"
        }
    ]

    for req_info in req_info_list:
        req_date = req_info.get("date")
        req_stock_no = req_info.get("stockNo")
        if req_date and req_stock_no:
            # args 為 function 會用到的參數
            req_thread = Thread(target=daily_price_req,
                                args=(req_date, req_stock_no))
            req_thread.start()

            # 每個 req 間隔最好 3 秒以上，不然會被證交所鎖 IP 一段時間
            time.sleep(3)

In [None]:
req_thread_list = []
for req_info in req_info_list:
        req_date = req_info.get("date")
        req_stock_no = req_info.get("stockNo")
        if req_date and req_stock_no:
            req_thread = Thread(target=daily_price_req,
                                args=(req_date, req_stock_no))
            req_thread.start()
            req_thread_list.append(req_thread)

            # 每個 req 間隔最好 3 秒以上，不然會被證交所鎖 IP 一段時間
            time.sleep(3)

for req_thread in req_thread_list:
    # join() 就是讓程式等著，確定該 thread 已經執行完畢之後再繼續往下
    # 所以這邊的用法是說：
    # 我的程式會在這邊等所有 req thread（daily_price_req 裡的那些動作）都執行完畢後再往下做
    req_thread.join()

# do something...
print("Ending...")

## Project 自動更新每日個股日成交資訊

In [None]:
#進度條 progress bar

class ProgressBar:
    bar_string_fmt = '\rProgress: [{}{}] {:.2%} {}/{}'
    cnt = 0

    def __init__(self, total, bar_total=20):
        self.total = total # task 的總數
        self.bar_total = bar_total  # progress bar 的長度

    def update(self, step=1):
        total = self.total
        self.cnt += step           # 更新 progress bar 的進度
        bar_cnt = (int((self.cnt/total)*self.bar_total))  # bar 的數量
        space_cnt = self.bar_total - bar_cnt  # 空白的數量

        # 顯示 progress bar '\r' 的意思代表 replace，print 出來的字串不會印在新的一行而是 replace 原本那行同位置的字符
        # {:.2%}，表示 format 進來的值會以百分比顯示，並只取到小數點後兩位
        progress = self.bar_string_fmt.format(
            '█' * bar_cnt,
            ' ' * space_cnt,
            self.cnt/total,
            self.cnt,
            total
        )
        print(progress, end='')

        # 100%
        if self.cnt/total == 1:
            print('\n')
            
if __name__ == '__main__':
    total = 10000
    progress_bar = ProgressBar(total)
    for _ in range(total):
        progress_bar.update()

In [None]:
import json
import time
import requests
from pathlib import Path
from threading import Thread
from datetime import datetime
from fake_useragent import UserAgent

class DailyPriceSpider:
    def __init__(self, progress_bar=None):
        
        # 重複使用 TCP 連線
        self.req = requests.Session()
        self.url = "https://www.twse.com.tw/exchangeReport/STOCK_DAY"
        self.headers = self.req.headers
        
        # 偽裝 User-Agent
        ua = UserAgent()
        self.headers["User-Agent"] = ua.random
        
        # 加入 progress bar
        if progress_bar:
            self.progress_bar = progress_bar
 
    # 供內部使用
    def __get(self, date, stock_no):
        res = self.req.get(self.url,
                           headers = self.headers,
                           params={
                               "response": "csv",  # 這次抓的是 csv 格式
                               "date": date,
                               "stockNo": stock_no
                           })
        return res.text
    
    # 供內部使用
    def __save_file(self, res_text, path):  # 去掉 res_text 裡多餘的空白行
        res_text = '\n'.join(
            filter(None,res_text.splitlines())
        )
        path = Path(path)
        
        # parents=True，如果父資料夾不存在則會一併創建
        # exist_ok=True，創建資料夾時，該資料夾已存在則不會 throw exception
        path.parent.mkdir(parents=True, exist_ok=True)
        with open(path, 'w', encoding="utf-8") as file:
            file.write(res_text)
        return
 
    
    def scrape(self, date, stock_no, save_path=""):
        # 把上面的兩個 function 整合
        res_text = self.__get(date, stock_no)
        if save_path:
            self.__save_file(res_text, save_path)
            
        # 更新 progress bar
        if self.progress_bar:
            self.progress_bar.update()


if __name__ == '__main__':
    SAVE_PATH_ROOT = "./daily_stock_price/"
    SLEEP_TIME = 3
    stock_info_list_file = {}
    with open("./stock_info_list.json", "r", encoding="utf-8") as f:
        stock_info_list_file = json.load(f)

    # 不管日期是幾號，他回傳都是給我們一整個月的，所以就固定設成1號就好
    # strftime()，把 datetime 輸出成我們要的格式
    today_date = "{}01".format(datetime.now().strftime("%Y%m"))

    stock_info_list = stock_info_list_file.get("stock", [])
    
    # 加入 progress bar
    progress_bar = ProgressBar(len(stock_info_list))
    dps = DailyPriceSpider(progress_bar=progress_bar)
    req_thread_list = []

    for stock_info in stock_info_list:

        stock_no = stock_info.get("stockNo")
        stock_name = stock_info.get("stockName")
        stock_industry = stock_info.get("stockIndustry")
        file_name = "{}_{}_daily_price.csv".format(
            today_date[:-2],  # 字串只需要用到年跟月
            stock_no+stock_name)

        save_path = "{}/{}/{}/{}".format(SAVE_PATH_ROOT,
                                         stock_industry,
                                         stock_no+stock_name,
                                         file_name)

        if stock_no and stock_name and stock_industry:
            req_thread = Thread(target=dps.scrape,
                                args=(today_date, stock_no, save_path),
                                daemon=True)
            req_thread.start()
            req_thread_list.append(req_thread)
            time.sleep(SLEEP_TIME)

    for req_thread in req_thread_list:
        req_thread.join()

    print("Finished")
