In [1]:
import json
import os
import time
from datetime import date, datetime, timedelta
from pathlib import Path

import pandas as pd
import requests
from bs4 import BeautifulSoup
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait

In [2]:
def read_or_build(folder, file):
    file_path = os.path.join(folder, file)
    path = Path(file_path)

    # 若檔案不存在則先新建空的df並存檔
    if path.exists():
        df = pd.read_csv(file_path)
    else:
        columns = get_col()
        df = pd.DataFrame(columns=columns)

    return df


def get_col():
    return [
        'flight_NO',
        'flight_type',
        'flight_company',
        'fly_distance',
        'departure_airport_code',
        'departure_city',
        'arrival_airport_code',
        'arrival_city',
        'departure_date',
        'leave_gate_estimate',
        'leave_gate_actual',
        'departure_time_estimate',
        'departure_time_actual',
        'departure_timezone',
        'arrival_date',
        'landing_time_estimate',
        'landing_time_actual',
        'arrive_gate_estimate',
        'arrive_gate_actual',
        'arrive_timezone',
        'link'
    ]


def source_list_mask(df_list, df_table):
    today = pd.Timestamp.now()

    mask_1 = (df_list['sync'] == 0)
    mask_2 = ((today - df_list['query_date']).dt.days >= 2)
    mask_3 = df_list['link'].isin(df_table['link'])

    source = df_list[mask_1 & mask_2 & ~mask_3]

    return source


def get_page_source(url, driver):
    driver.get(url)

    # 網頁內有JavaScript動態生成內容，故設定等待網頁讀取完畢後再動作
    wait = WebDriverWait(driver, 15)
    element = wait.until(
        EC.presence_of_all_elements_located(
            (By.CLASS_NAME, "flightPageSummaryDepartureDay"))
    )

    page_source = driver.page_source

    return page_source


def trans_date_from_chinese(chinese_date):
    """將中文日期格式轉為datetime格式"""
    clean_date = chinese_date.split("(")[0].strip()
    fmt = "%Y年 %m月 %d日"
    trans = datetime.strptime(clean_date, fmt)

    return trans


def find_tag(div_list, target_str: str):
    """用於尋找特定字串標籤的index"""
    target = 0
    for i in div_list:
        if i.get_text() == target_str:
            break
        else:
            target += 1
    return target


def safe_extract(func):
    """判斷一個soup物件是否存在/有值，若沒有則回傳None"""
    try:
        return func()
    except (IndexError, AttributeError):
        return None


def gate_exist(soup):
    """判斷一個航班頁面中是否有到/離閘口資料"""
    gate = 0
    x = soup
    for i in x('div'):
        if '閘口' or "停机位" in i.text:
            gate = 1
            break
    return gate


def split_tz(time_str: str):
    """將帶有時區的時間字串分割，得到[time, timezone]列表"""
    if "(" in time_str:
        time_str = time_str.split("(")[0].strip()

    time_, tz = time_str.split(' ')
    return time_, tz


def crawl_flight_data(soup, url, gate_exist: bool):
    """當該班機有到/離閘門資料時使用的爬蟲"""
    flight_data = []

    # 班機基本資料。較容易在各網頁中出現差異，故先使用函式取得定位，再去取得資訊
    div_list = soup('div', class_='flightPageDataLabel')

    # 航班編號、機型、航空公司、飛行距離
    flight_data.append(safe_extract(lambda: soup(
        'div', class_='flightPageIdent')[0].h1.text.strip()))
    flight_data.append(safe_extract(lambda: soup('div', class_='flightPageDataRow')[
                       0]('div', class_='flightPageData')[0].text.strip().replace('\xa0', ' ')))
    flight_data.append(safe_extract(lambda: soup('div', class_='flightPageDataRow')[
                       2]('div', class_='flightPageData')[0].text.strip().split('\n')[0]))
    flight_data.append(safe_extract(lambda: soup('div', class_='flightPageDataRow')[
                       5].span.text.strip().replace(',', '').replace("\n", "").replace("\t", "")))

    # 起飛機場、起飛城市
    flight_data.append(safe_extract(lambda: soup('div', class_='flightPageSummaryOrigin')[
                       0]('span', class_='displayFlexElementContainer')[0].text.strip()))
    flight_data.append(safe_extract(lambda: soup('div', class_='flightPageSummaryOrigin')[
                       0]('span', class_='flightPageSummaryCity')[0].text.strip()))

    # 降落機場、降落城市
    flight_data.append(safe_extract(lambda: soup('div', class_='flightPageSummaryDestination')[
                       0]('span', class_='displayFlexElementContainer')[0].text.strip()))
    flight_data.append(safe_extract(lambda: soup('div', class_='flightPageSummaryDestination')[
                       0]('span', class_='destinationCity')[0].text.strip()))

    # 起飛日期
    flight_data.append(safe_extract(lambda: soup(
        'span', class_='flightPageSummaryDepartureDay')[0].text))

    if gate_exist == 1:
        # 預計/實際離開閘門時間
        lg_e_time, lg_e_tz = split_tz(safe_extract(lambda: soup('div', class_='flightPageDataTimesChild')[
                                      0].span.text.strip().replace('\xa0', ' ').replace('\n', '').replace('\t', '')))
        lg_a_time, lg_a_tz = split_tz(safe_extract(lambda: soup('div', class_='flightPageDataTimesChild')[0](
            'div', class_="flightPageDataActualTimeText")[0].text.strip().replace('\xa0', ' ').replace('\\n', '').replace('\\t', '')))

        flight_data.append(lg_e_time)
        flight_data.append(lg_a_time)

        # 預計/實際起飛時間
        d_e_time, d_e_tz = split_tz(safe_extract(lambda: soup('div', class_='flightPageDataTimesChild')[
                                    1]('span')[1].text.strip().replace('\xa0', ' ').replace('\n', '').replace('\t', '')))
        d_a_time, d_a_tz = split_tz(safe_extract(lambda: soup('div', class_='flightPageDataTimesChild')[
                                    1]('span')[0].text.strip().replace('\xa0', ' ').replace('\n', '').replace('\t', '')))

        flight_data.append(d_e_time)
        flight_data.append(d_a_time)

    else:
        # 預計/實際離開閘門時間
        flight_data.append(None)
        flight_data.append(None)

        # 預計/實際起飛時間
        d_e_time, d_e_tz = split_tz(safe_extract(lambda: soup('div', class_='flightPageDataTimesChild')[
                                    0]('span')[1].text.strip().replace('\xa0', ' ').replace('\n', '').replace('\t', '')))
        d_a_time, d_a_tz = split_tz(safe_extract(lambda: soup('div', class_='flightPageDataTimesChild')[
                                    0]('span')[0].text.strip().replace('\xa0', ' ').replace('\n', '').replace('\t', '')))

        flight_data.append(d_e_time)
        flight_data.append(d_a_time)

    # 起飛時區
    flight_data.append(d_a_tz)

    # 抵達日期
    flight_data.append(safe_extract(lambda: soup(
        'span', class_='flightPageSummaryArrivalDay')[0].text))

    if gate_exist == 1:
        # 預計/實際降落時間
        a_e_time, a_e_tz = split_tz(safe_extract(lambda: soup('div', class_='flightPageDataTimesChild')[
                                    2]('span')[1].text.strip().replace('\xa0', ' ').replace('\n', '').replace('\t', '')))
        a_a_time, a_a_tz = split_tz(safe_extract(lambda: soup('div', class_='flightPageDataTimesChild')[
                                    2]('span')[0].text.strip().replace('\xa0', ' ').replace('\n', '').replace('\t', '')))

        flight_data.append(a_e_time)
        flight_data.append(a_a_time)

        # 預計/實際抵達閘門時間
        ag_e_time, ag_e_tz = split_tz(safe_extract(lambda: soup('div', class_='flightPageDataTimesChild')[
                                      3]('span')[1].text.strip().replace('\xa0', ' ').replace('\n', '').replace('\t', '')))
        ag_a_time, ag_a_tz = split_tz(safe_extract(lambda: soup('div', class_='flightPageDataTimesChild')[3](
            'div', class_='flightPageDataActualTimeText')[0].span.text.strip().replace('\xa0', ' ').replace('\n', '').replace('\t', '')))

        flight_data.append(ag_e_time)
        flight_data.append(ag_a_time)

    else:
        # 預計/實際降落時間
        a_e_time, a_e_tz = split_tz(safe_extract(lambda: soup('div', class_='flightPageDataTimesChild')[
                                    1]('span')[1].text.strip().replace('\xa0', ' ').replace('\n', '').replace('\t', '')))
        a_a_time, a_a_tz = split_tz(safe_extract(lambda: soup('div', class_='flightPageDataTimesChild')[
                                    1]('span')[0].text.strip().replace('\xa0', ' ').replace('\n', '').replace('\t', '')))

        flight_data.append(a_e_time)
        flight_data.append(a_a_time)

        # 預計/實際抵達閘門時間
        flight_data.append(None)
        flight_data.append(None)

    # 降落時區
    flight_data.append(a_a_tz)

    # 紀錄該航班網址，若有需要可再重新訪問
    flight_data.append(url)

    return flight_data

In [5]:
# 航空公司清單
flight_corp = ["EVA", "CAL", "SJX", "TTW"]
today = date.today()
read_date = today - timedelta(days=2)
read_date_str = read_date.strftime("%Y%m%d")

for corp in flight_corp:
    # 設定航班資訊的檔案路徑
    table_folder = r"C:\Users\add41\Documents\Data_Engineer\Project\Flights-Data-Crawler\Data"
    table_file = f"{read_date_str}_{corp}_FlightsTable.csv"
    df_table = read_or_build(folder=table_folder, file=table_file)

    # 設定航班列表的資料表路徑，並將列表資料讀入
    list_folder = r"C:\Users\add41\Documents\Data_Engineer\Project\Flights-Data-Crawler\Data"
    list_file = f"{read_date_str}_{corp}_FlightList.csv"
    list_path = os.path.join(list_folder, list_file)
    path = Path(list_path)

    if path.exists():
        df_list = pd.read_csv(list_path)
    else:
        print(f"目前尚無{corp}資料可供查詢")
        continue

    # 設定篩選條件，保留符合條件的（未同步，且日期超過兩天以上）
    # df_list['query_date'] = pd.to_datetime(df_list['query_date'])
    # source = source_list_mask(df_list, df_table)
    source = df_list.copy()

    # 建立dataframe需要的data list
    data = []

    # 建立selenium連線
    selenium_url = "http://localhost:4444/wd/hub"
    options = Options()
    options.add_argument("--headless")
    print('建立連線')

    # 根據df_list中的link欄位跑回圈，逐一進入網頁取得html編碼
    for url in source['link']:
        with webdriver.Remote(command_executor=selenium_url, options=options) as driver:
            try:
                page_source = get_page_source(url, driver)

            except Exception as e:
                print(f"無法存取 {url}: {e}")
                continue

        soup = BeautifulSoup(page_source, 'html.parser')
        flight_no = soup('div', class_='flightPageIdent')[0].h1.text.strip()

        # 根據取得的soup物件，開始抓取各項資訊
        print(f'開始查詢{flight_no}班機資訊資訊...')

        try:
            gate_exist = gate_exist(soup)
            print(f'{flight_no}航班有無閘門資訊：{gate_exist}')
            flight_data = crawl_flight_data(
                soup, url, gate_exist=gate_exist)

        except Exception as e:
            print(f'發生錯誤：{e}')

        data.append(flight_data)
        print(f'完成存取{flight_no}航班資料')
        time.sleep(7)

    columns = get_col()
    df_flight = pd.DataFrame(columns=columns, data=data)

    # 將本次爬取航班中，航行未完成（沒有降落時間）的資料先去除，待下次再爬取
    df_flight = df_flight.dropna(subset=['landing_time_actual'])

    # 將爬取的新資料與原本的table資料合併
    df_combine = pd.concat([df_table, df_flight], ignore_index=True)

    # 根據link欄位再去除可能的重複值
    df_combine = df_combine.drop_duplicates(subset='link', keep='first')

    # 將去除重複後的資料存檔
    table_path = os.path.join(table_folder, table_file)
    df_combine.to_csv(table_path, index=False)

    # # 將已經爬取過的航班sync欄位改為1，避免下次重複爬取
    # mask_done = df_list['link'].isin(df_flight['link'])
    # df_list.loc[mask_done, 'sync'] = 1

    # # 將修改後的df_list再存檔回FlightList.csv檔案
    # list_path = os.path.join(list_folder, list_file)
    # df_list.to_csv(list_path, index=False)

print('已更新所有資料！')

目前尚無EVA資料可供查詢
目前尚無CAL資料可供查詢
目前尚無SJX資料可供查詢
目前尚無TTW資料可供查詢
已更新所有資料！
