- sportsnaviプロ野球ページからのwebスクレイピングスクリプト

In [1]:
import os
import re
import requests
import datetime
import pandas as pd
import numpy as np
from bs4 import BeautifulSoup

In [87]:
# dateで受け取った日に開催された各試合の出場成績のリンクをリスト形式で返す
def fetch_game_links(date):
    params = { 'date': date }
    schedule_page = requests.get('https://baseball.yahoo.co.jp/npb/schedule', params=params)
    soup_schedule = BeautifulSoup(schedule_page.text, 'html.parser')
    game_link_elms = soup_schedule.find_all('a', class_='bb-score__content')
    game_links = list(map(lambda x: x['href'].replace('index', 'stats'), game_link_elms))
    detail_links = list(map(lambda x: x['href'].replace('index', 'score'), game_link_elms))
    return game_links, detail_links

def fetch_score_stats(game_link, dat, id_):
    game_page = requests.get(game_link)
    soup_game = BeautifulSoup(game_page.text, 'html.parser')
    
    score_stats_rows = soup_game.find_all('td', class_='bb-gameScoreTable__data')
    result_kind_rows = ["チーム"] + [str(i) for i in range(1,13)]
    result_kind_rows.append("日付")
    result_kind_rows.append("ID")
    
    stats_list = [result_kind_rows]
    stats = [score_stats_rows[0].text.replace('\n', '')]
    count = 1
    for row in score_stats_rows[1:]:
        ele = row.text.replace('\n', '')
        try:
            if ele[-1] == 'X':
                ele = int(ele[:-1])
            else:
                ele = int(ele)
            stats.append(ele)
        except:
            if count == 1:
                stats += ["X"] * (13 -len(stats))
                stats.append(dat)
                stats.append(id_)
                stats_list.append(stats)
                stats = []
                stats.append(ele)
                count += 1
            else:
                break
    stats += ["X"] * (13 -len(stats))
    stats.append(dat)
    stats.append(id_)
    stats_list.append(stats)
        
    return stats_list

# game_linkで受け取ったリンク先の野手成績をリスト形式で返す
def fetch_batter_stats(game_link, dat, id_):
    flg = False
    game_page = requests.get(game_link)
    soup_game = BeautifulSoup(game_page.text, 'html.parser')
    
    result_kind_rows = soup_game.find_all('p', class_='bb-statsTable__headLabel')
    result_kind_rows = [i.text for i in result_kind_rows]
    result_kind_rows = result_kind_rows[:len(result_kind_rows)//2]
    result_kind_rows.insert(1, "名")
    result_kind_rows.append("自")
    result_kind_rows.append("相")
    result_kind_rows.append("H/V")
    result_kind_rows.append("日付")
    result_kind_rows.append("ID")
    
    team_name_rows = soup_game.find_all('h1', class_='bb-head03__title')
    team_name_rows = [i.text for i in team_name_rows]
    
    batter_stats_rows = soup_game.find_all('tr', class_='bb-statsTable__row')

    stats_list = [result_kind_rows]
    # 野手成績
    for row in batter_stats_rows:
        if len(row.find_all('p', class_='bb-statsTable__headLabel')) and flg:
            my = team_name_rows[1][:2]
            enemy = team_name_rows[0][:2]
            team = "H"
        if len(row.find_all('p', class_='bb-statsTable__headLabel')) and not flg:
            my = team_name_rows[0][:2]
            enemy = team_name_rows[1][:2]
            team = "V"
            flg = True
    
        stats_html = row.find_all('td', class_='bb-statsTable__data')
        if stats_html:
            stats = list(map(lambda x: x.text, stats_html))[:14]
            stats[0] = re.sub("[()打走]", '', stats[0])
            stats[0] = '指' if len(stats[0]) == 0 else stats[0][:len(stats[0])] # 最初に出場したポジションのみに変換（代打のみの場合は指名打者扱い）
            stats[3:] = list(map(int, stats[3:])) # 野手成績をintに変換
            stats.append(my)
            stats.append(enemy)
            stats.append(team)
            stats.append(dat)
            stats.append(id_)
            stats_list.append(stats)
    return stats_list

# game_linkで受け取ったリンク先の投手成績をリスト形式で返す
def fetch_pitcher_stats(game_link, dat, id_):
    flg = False
    count = 0
    game_page = requests.get(game_link)
    soup_game = BeautifulSoup(game_page.text, 'html.parser')

    result_kind_rows = soup_game.find_all('th', class_='bb-scoreTable__head')
    result_kind_rows = [i.text for i in result_kind_rows]
    result_kind_rows = result_kind_rows[:len(result_kind_rows)//2]
    result_kind_rows[0] = "位"
    result_kind_rows.append("自")
    result_kind_rows.append("相")
    result_kind_rows.append("H/V")
    result_kind_rows.append("日付")
    result_kind_rows.append("ID")
    
    team_name_rows = soup_game.find_all('h1', class_='bb-head03__title')
    team_name_rows = [i.text for i in team_name_rows]
    
    pitcher_stats_rows = soup_game.find_all(['tr', 'h1'], class_=['bb-scoreTable__row', 'bb-head03__title'])
    words = ["北海道", "東北", "埼玉", "千葉ロッテ", "オリックス", "ソフトバンク", "横浜", "東京", "読売", "中日", "阪神", "広島"]
    stats_list = [result_kind_rows]
    # 投手成績
    for row in pitcher_stats_rows:
        if any([s in row.text for s in words]) and flg:
            my = team_name_rows[1][:2]
            enemy = team_name_rows[0][:2]
            team = "H"
            count = 0
        if any([s in row.text for s in words]) and not flg:
            my = team_name_rows[0][:2]
            enemy = team_name_rows[1][:2]
            team = "V"
            flg = True
            
        stats_html = row.find_all('td', class_='bb-scoreTable__data')
        if stats_html:
            stats = list(map(lambda x: x.text, stats_html))[:14]
            stats[0] = '先発' if count == 0 else '中継'
            stats[1] = stats[1].replace('\n', '')
            stats[2] = stats[2].replace('\n', '')
            stats[3] = float(stats[3]) # 投球回をfloatに変換
            stats[4:] = list(map(int, stats[4:])) # 投手成績をintに変換
            stats.append(my)
            stats.append(enemy)
            stats.append(team)
            stats.append(dat)
            stats.append(id_)
            stats_list.append(stats)
            count += 1
    return stats_list


# 各試合のイニングへのリンクをリストで取得
def get_inning_links(detail_link):
    game_page = requests.get(detail_link)
    soup_game = BeautifulSoup(game_page.text, 'html.parser')
    
    result_rows = soup_game.find_all('td', class_="bb-gameScoreTable__data")
    link_to_each_inning = []
    
    for row in result_rows:
        count = 100
        if row.find_all('a', class_='bb-gameScoreTable__score'):
            text = row.find_all('a', class_='bb-gameScoreTable__score')[0].get('href')
            #print(text.split("=")[1])
            inning = int(text[-6]) if int(text[-7]) == 0 else int(text[-7:-5])
            
            if text[-3:]== "000":
                text = text[:-3] + str(count)
            link_to_each_inning.append([inning, "https://baseball.yahoo.co.jp"+text])
            #print("https://baseball.yahoo.co.jp"+text)
            
            while True:  
                count += 100
                if count >= 1000:
                    text = text[:-4] + str(count)
                else:
                    text = text[:-3] + str(count)
                url = "https://baseball.yahoo.co.jp"+text
                res = requests.get(url)
                if res.status_code != 404:
                    link_to_each_inning.append([inning, "https://baseball.yahoo.co.jp"+text])
                    print("https://baseball.yahoo.co.jp"+text)
                else:
                    break
    
    return sorted(link_to_each_inning)

# 1球データを手に入れる
def fetch_details(inning_links, dat, id_):
    N = 20
    people_col = ["投手", "投", "打者", "打", "T/B"]
    balltype_col = ["type_"+str(i) for i in range(1, N+1)]
    speed_col = ["speed_"+str(i) for i in range(1, N+1)]
    result_col = ["result_"+str(i) for i in range(1, N+1)]
    element_list = [people_col + balltype_col + speed_col + result_col + [ "イニング", "日付", "ID"]]
    
    for inning, link_ in inning_links:
        game_page = requests.get(link_)
        soup_game = BeautifulSoup(game_page.text, 'html.parser')
    
        table = soup_game.findAll('table', {'class':'bb-splitsTable tracked_mods'}) 
        people = [i.text for i in table] 
        people = re.sub(r" ", '', people[0])
        people = re.sub(r"[¥\n]{2,}", " ", people)
        people = re.sub(r"[¥\n]{1,}", " ", people)     
        people = people.split(" ")[1:-1]
        if people[0] == "投手":
            people = people[2:]
            people.append("T")
        else:
            people = people[4:] + people[2:4]
            people.append("B")
        
        balltype = soup_game.find_all('td', class_="bb-splitsTable__data--ballType")
        balltype = [ele.text for ele in balltype]
        balltype += [np.nan for _ in range(N-len(balltype))]
    
        speed = soup_game.find_all('td', class_="bb-splitsTable__data--speed")
        speed = [ele.text for ele in speed]
        speed+= [np.nan for _ in range(N-len(speed))]
        
        result = soup_game.find_all('td', class_="bb-splitsTable__data--result")
        result = [ele.text.replace('\n', '').replace(' ', '') for ele in result]
        result += [np.nan for _ in range(N-len(result))]
        
        element = people + balltype + speed + result + [inning, dat, id_]
        
        element_list.append(element)
            
    return element_list

def connect_data(list_):
    main_df = pd.DataFrame(list_[0])
    for df_ in list_[1:]:
        df_ = pd.DataFrame(df_)
        main_df = pd.concat([main_df, df_.iloc[1:]], axis=0)
    main_df.columns = main_df.iloc[0]
    main_df = main_df.iloc[1:].reset_index(drop=True)
    return main_df

def make_score_result(df):
    columns =["1","2","3","4","5","6","7","8","9","10","11","12"]
    df[columns] = df[columns].replace("X", np.nan)
    df = df[~np.all(df[columns].isnull(), axis=1)].reset_index(drop=True) #試合中止分を削除
    df[columns[:-4]] = df[columns[:-4]].astype(int)
    df["total"] = df[columns].sum(axis=1).astype(int)

    visitor_wl = np.where(df["total"][0::2].values > df["total"][1::2].values, "W", "L")
    home_wl = np.where(df["total"][0::2].values < df["total"][1::2].values, "W", "L")
    draw = np.where(df["total"][0::2].values == df["total"][1::2].values, "D",  "K")

    all_d = np.empty(visitor_wl.shape[0] + visitor_wl.shape[0],dtype=str)
    all_d[0::2] = draw
    all_d[1::2] = draw

    all_wl = np.empty(visitor_wl.shape[0] + visitor_wl.shape[0],dtype=str)
    all_wl[0::2] = visitor_wl
    all_wl[1::2] = home_wl

    for i in range(all_wl.shape[0]):
        if all_d[i] == "D":
            all_wl[i] = "D"

    df["result"] = all_wl
    return df

In [92]:
if __name__ == '__main__':
    d_today = datetime.date(2022,4,7) #datetime.date.today()
    game_links, detail_links = fetch_game_links(d_today)
    batter_list = []
    pitcher_list = []
    score_list = []
    
    detail_list = []
    
    for id_, game_link in enumerate(game_links):
        print(id_)
        detail_link = detail_links[id_]
        # 野手成績取得
        batter_stats = fetch_batter_stats(game_link, str(d_today), id_)
        # 投手成績取得
        pitcher_stats = fetch_pitcher_stats(game_link, str(d_today), id_)
        # スコア取得
        score_stats = fetch_score_stats(game_link, str(d_today), id_)
        # 1球データ取得
        inning_links = get_inning_links(detail_link) 
        detail_stats = fetch_details(inning_links, str(d_today), id_)
        
        batter_list.append(batter_stats) 
        pitcher_list.append(pitcher_stats)
        score_list.append(score_stats)
        
        detail_list.append(detail_stats)

    batter_df = connect_data(batter_list)
    pitcher_df = connect_data(pitcher_list)
    score_df = connect_data(score_list)
    score_df = make_score_result(score_df)
    
    detail_df = connect_data(detail_list)
    
    if str(d_today) not in os.listdir("./data/"):
        os.makedirs("./data/"+str(d_today))
        
    batter_df.to_csv("./data/"+str(d_today)+"/batter.csv", index=False)
    pitcher_df.to_csv("./data/"+str(d_today)+"/pitcher.csv", index=False)
    score_df.to_csv("./data/"+str(d_today)+"/score.csv", index=False)

    detail_df.to_csv("./data/"+str(d_today)+"/detail.csv", index=False)

0
1
2
3


参考
- https://qiita.com/Jun-T/items/6a641b6e28c487127484
- https://gammasoft.jp/support/solutions-of-requests-get-failed/
- https://tonari-it.com/python-response-status-code/