In [1]:
import requests
import pandas as pd
import time
pd.set_option("display.max_columns", None)
import datetime
from datetime import timezone, timedelta
from geopy.distance import geodesic #計算GPS兩端點距離
import folium #GPS數據視覺化
from global_land_mask import globe #確認GPS位置是否在陸地
import warnings
warnings.filterwarnings('ignore') # 關閉運行的警告
pd.set_option("mode.chained_assignment", None)
import pprint
import json

In [2]:
class TDX():
    
    '''
    將常用的屬性放在類別屬性中方便後續呼叫.
    '''
    client_id : str = "kc.hsu-e51d77a0-5d9b-418b"
    client_secret = "afbb6a0a-f69d-4556-bd0b-4e53eae46379"
    cities = ["Taipei", "NewTaipei", "Taoyuan", "Taichung", "Tainan", "Kaohsiung", "Keelung", "Hsinchu", "HsinchuCounty", "MiaoliCounty", "ChanghuaCounty", "NantouCounty", "YunlinCounty", "ChiayiCounty", "ChiayiCounty", "Chiayi", "PingtungCounty", "YilanCounty", "HualienCounty", "TaitungCounty", "KinmenCounty", "PenghuCounty", "LienchiangCounty"]

    
    def __init__(self, client_id=client_id, client_secret=client_secret):
        self.client_id = client_id
        self.client_secret = client_secret
        self.token = self.get_token()
        
    def get_token(self):
        token_url = "https://tdx.transportdata.tw/auth/realms/TDXConnect/protocol/openid-connect/token"
        headers = {'content-type': 'application/x-www-form-urlencoded'}
        data = {
            'grant_type': 'client_credentials',
            'client_id': self.client_id,
            'client_secret': self.client_secret
        }
        response = requests.post(token_url, headers=headers, data=data)
        return response.json()['access_token']
    
    def get_response(self, url):
        headers = {"authorization": f"Bearer {self.token}"}
        response = requests.get(url, headers=headers)
        return response.json()
        # return json.loads(response.text)
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        

### 市區公車業者名單抓取

In [None]:
dfs = []
cities = ["Taipei", "NewTaipei", "Taoyuan", "Taichung", "Tainan", "Kaohsiung", "Keelung", "Hsinchu", "HsinchuCounty", "MiaoliCounty", "ChanghuaCounty", "NantouCounty", "YunlinCounty", "ChiayiCounty", "ChiayiCounty", "Chiayi", "PingtungCounty", "YilanCounty", "HualienCounty", "TaitungCounty", "KinmenCounty", "PenghuCounty", "LienchiangCounty"]
for city in cities:
    tdx = TDX()
    base_url = "https://tdx.transportdata.tw/api/basic"
    endpoint = "/v2/Bus/Operator/City/"
    url = f"{base_url}{endpoint}{city}"
    operators = tdx.get_response(url)
    dfs.append(pd.DataFrame.from_records(operators))
df = pd.concat(dfs)
df["OperatorNameZh_tw"] = df["OperatorName"].map(lambda x: x["Zh_tw"])
df.to_excel(r"D:\kc.hsu\OneDrive - Bridgestone\數據\TDX\20221026_市區公車業者名單.xlsx", index=False)

### 國道客運業者名單抓取

In [None]:
tdx = TDX(client_id, client_secret)
base_url = "https://tdx.transportdata.tw/api/basic"
endpoint = "/v2/Bus/Operator/InterCity"
url = f"{base_url}{endpoint}"
operators = tdx.get_response(url)
intercity_operators = pd.DataFrame.from_records(operators)
intercity_operators["OperatorNameZh_tw"] = intercity_operators["OperatorName"].map(lambda x: x["Zh_tw"])
intercity_operators.to_excel(r"D:\kc.hsu\OneDrive - Bridgestone\數據\TDX\20221026_國道客運業者名單.xlsx", index=False)

## 國道客運即時數據抓取

In [7]:
tdx = TDX()
base_url = "https://tdx.transportdata.tw/api/basic"
endpoint = "/v2/Bus/RealTimeByFrequency/InterCity"
url = base_url + endpoint
result = pd.DataFrame.from_records(tdx.get_response(url))
result = result[['PlateNumb', 'OperatorID', 'RouteUID', 'RouteID',
       'SubRouteUID', 'SubRouteID', 'Direction', 'BusPosition',
       'Speed', 'Azimuth', 'DutyStatus', 'BusStatus', 'MessageType', 'GPSTime',
       'SrcRecTime', 'SrcTransTime', 'UpdateTime']]

In [9]:
result["Longitude"] = result["BusPosition"].map(lambda x: x["PositionLon"])
result["Latitude"] = result["BusPosition"].map(lambda x: x["PositionLat"])

In [11]:
result = result[['PlateNumb', 'OperatorID', 'RouteUID', 'RouteID',
       'SubRouteUID', 'SubRouteID', 'Direction', 'Longitude', 'Latitude',
       'Speed', 'Azimuth', 'DutyStatus', 'BusStatus', 'MessageType', 'GPSTime',
       'SrcRecTime', 'SrcTransTime', 'UpdateTime']]

In [16]:
result["Coordinates"] = result[["Longitude", "Latitude"]].apply(tuple, axis=1)

In [21]:
result.groupby(["OperatorID"])["Coordinates"].shift(-1)

0       (120.786511666667, 24.4588066666667)
1       (121.537478333333, 24.9586483333333)
2       (121.438336666667, 25.0859366666667)
3               (120.298941666667, 23.56911)
4             (120.326695, 22.6297216666667)
                        ...                 
1816                                     NaN
1817            (121.696346666667, 25.15598)
1818                                     NaN
1819    (121.756978333333, 25.1282183333333)
1820                                     NaN
Name: Coordinates, Length: 1821, dtype: object

In [14]:
result.to_excel(r"C:\Users\kc.hsu\Desktop\國道客運API範例內容.xlsx", index=False)

In [None]:
def time_range_data(st_year=0, st_month=0, st_day=0, st_hour=0, st_min=0, end_year=0, end_month=0, end_day=0, end_hour=0, end_min=0):
    st_time = datetime.datetime(st_year, st_month, st_day, st_hour, st_min).replace(tzinfo=datetime.timezone.utc).astimezone(timezone(timedelta(hours=0))).replace(microsecond=0).isoformat()
    end_time = datetime.datetime(end_year, end_month, end_day, end_hour, end_min).replace(tzinfo=datetime.timezone.utc).astimezone(timezone(timedelta(hours=0))).replace(microsecond=0).isoformat()
    dfs = []
    tdx = TDX()
    while True:
        current_time = datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc).astimezone(tz=None).replace(microsecond=0).isoformat()
        if st_time <= current_time <= end_time:
            base_url = "https://tdx.transportdata.tw/api/basic"
            endpoint = "/v2/Bus/RealTimeByFrequency/InterCity" # batch update(Frequency: 60sec.)
            # filter = "Direction eq 1"  # 順逆行: [0:'順行', 1:'逆行']
            url = f"{base_url}{endpoint}"
            try:
                result = pd.DataFrame.from_records(tdx.get_response(url))
            except:
                tdx.token = tdx.get_token()
                continue
            else:
                result["Longitude"] = result["BusPosition"].map(lambda x: x["PositionLon"])
                result["Latitude"] = result["BusPosition"].map(lambda x: x["PositionLat"])
                dfs.append(result)
                time.sleep(60) # Update the data every 60 sec.
        elif current_time < st_time:
            # print("等待開始")
            continue
        else:
            print("抓取結束")
            result = pd.concat(dfs)
            # print(result)
            break
    # result = result[["PlateNumb", "OperatorID", "RouteUID", "Direction", "DutyStatus", "BusStatus", "MessageType", "Latitude", "Longitude", "UpdateTime"]]
    return result
    

In [None]:
def icb_summary(df):
    direction_dict = {0 : "去程(南下)", 1 : "返程(北上)", 2 : "迴圈(雙向)"}
    busStatus_dict = {0: "正常", 1 : "車禍", 2 : "故障", 3 : "塞車", 4 : "緊急救援", 5 : "加油", 98 : "偏移路線", 99 : "非營業狀態", 100 : "客滿", 101 : "包車出租", 255 : "未知"}
    busDutyStatus_dict = {0: "正常", 1 : "開始", 2 : "結束"}
    operator = pd.read_excel(r"D:\kc.hsu\OneDrive - Bridgestone\數據\TDX\20221026_國道客運業者名單.xlsx")
    operator_dict = dict(zip(operator["OperatorID"].astype(str), operator["OperatorNameZh_tw"]))
    route = pd.read_csv(r"D:\kc.hsu\OneDrive - Bridgestone\數據\TDX\公路客運路線代碼表.csv")
    route_dict = dict(zip(route["路線唯一識別代碼"], route[["起站中文名稱", "終站中文名稱"]].apply(tuple, axis=1)))
    message_dict = {0 : "未知", 1 : "定期", 2 : "非定期"}
    
    df["Land"] = globe.is_land(df["Latitude"], df["Longitude"]) # Verify if the GPS coordinate is bias(Land or Oscean)
    df = df[df["Land"] == True]
    df["Operator"] = df["OperatorID"].astype(str).map(operator_dict)
    df["車輛方向"] = df["Direction"].map(direction_dict)
    df["車輛狀態"] = df["BusStatus"].map(busStatus_dict)
    df["車輛勤務狀態"] = df["DutyStatus"].map(busDutyStatus_dict)
    df["執勤路線"] = df["RouteUID"].map(route_dict)
    df["班車資訊"] = df["MessageType"].map(message_dict)
    
    df["Coordinates"] = df[["Latitude", "Longitude"]].apply(tuple, axis=1)
    df["NextCoordinates"] = df.groupby("PlateNumb")["Coordinates"].shift(-1)
    df["Distance"] = df.apply(lambda df: geodesic(df["Coordinates"], df["NextCoordinates"]).meters if pd.notna(df["NextCoordinates"]) else 0, axis=1) / 1000
    df = df[df["Distance"] <= 2.5]
    df["Time"] = pd.to_datetime(df["UpdateTime"].map(lambda x: x.split("+")[0]), format="%Y-%m-%dT%H:%M")
    
    result = df.groupby(["PlateNumb", "Operator"]).agg({"Distance": "sum", "Time": lambda x: (x.max() - x.min()) / pd.Timedelta(hours=1)}).reset_index()
    result["Date"] = df["Time"].max().strftime("%Y-%m-%d")
    result["AVG. Speed"] = result["Distance"] / result["Time"]
    result["AVG. Speed"].fillna(0, inplace=True)
    
    return result, df

In [None]:
df = time_range_data(2023, 1, 5, 18, 0, 2023, 1, 5, 18, 1)
# df.to_excel(r"D:\kc.hsu\OneDrive - Bridgestone\數據\TDX\20221028國道客運即時測試數據.xlsx", index=False)

In [None]:
df

In [None]:
result, raw = icb_summary(df)

In [None]:
today = datetime.datetime.today().date().strftime("%Y%m%d")

In [None]:
path = today + "國道客運即時數據.pkl"

In [None]:
raw.to_pickle(r"D:\kc.hsu\OneDrive - Bridgestone\數據\TDX\20221109國道客運即時數據.pkl")
result.to_excel(r"D:\kc.hsu\OneDrive - Bridgestone\數據\TDX\20221109國道客運即時匯整數據.xlsx", index=False)

In [None]:
result = icb_summary(df)
# result.to_excel(r"D:\kc.hsu\OneDrive - Bridgestone\數據\TDX\20221028國道客運即時測試數據_彙整.xlsx", index=False)


## 測試區

In [None]:
def cb_time_range_data(st_year=0, st_month=0, st_day=0, st_hour=0, st_min=0, end_year=0, end_month=0, end_day=0, end_hour=0, end_min=0):
    cities = ["Taipei", "NewTaipei", "Taoyuan", "Taichung", "Tainan", "Kaohsiung", "Keelung", "Hsinchu", "HsinchuCounty", "MiaoliCounty", "ChanghuaCounty", "NantouCounty", "YunlinCounty", "ChiayiCounty", "ChiayiCounty", "Chiayi", "PingtungCounty", "YilanCounty", "HualienCounty", "TaitungCounty", "KinmenCounty", "PenghuCounty", "LienchiangCounty"]
    city_select = dict()
    for no, city in enumerate(cities):
        print(no, city)
        city_select.setdefault(no, city)
    city = city_select[eval(input("Please type the no. of the city: "))]
    st_time = datetime.datetime(st_year, st_month, st_day, st_hour, st_min).replace(tzinfo=datetime.timezone.utc).astimezone(timezone(timedelta(hours=0))).replace(microsecond=0).isoformat()
    end_time = datetime.datetime(end_year, end_month, end_day, end_hour, end_min).replace(tzinfo=datetime.timezone.utc).astimezone(timezone(timedelta(hours=0))).replace(microsecond=0).isoformat()
    dfs = []
    tdx = TDX()
    while True:
        current_time = datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc).astimezone(tz=None).replace(microsecond=0).isoformat()
        if st_time <= current_time <= end_time:
            base_url = "https://tdx.transportdata.tw/api/basic"
            end_point = "/v2/Bus/RealTimeByFrequency/City/{}".format(city) # batch update(Frequency: 60sec.)
            # filter = "Direction eq 1"  # 順逆行: [0:'順行', 1:'逆行']
            url = base_url + end_point
            try:
                result = pd.DataFrame.from_records(tdx.get_response(url))
            except:
                tdx.token = tdx.get_token()
                continue
            else:
                # result["Longitude"] = result["BusPosition"].map(lambda x: x["PositionLon"])
                # result["Latitude"] = result["BusPosition"].map(lambda x: x["PositionLat"])
                dfs.append(result)
                time.sleep(60) # Update the data every 60 sec.
        elif current_time < st_time:
            # print("等待開始")
            continue
        else:
            print("抓取結束")
            result = pd.concat(dfs).reset_index(drop=True)
            # print(result)
            break
    # result = result[["PlateNumb", "OperatorID", "RouteUID", "Direction", "DutyStatus", "BusStatus", "MessageType", "Latitude", "Longitude", "UpdateTime"]]
    return result

In [None]:
cities = ["Taipei", "NewTaipei", "Taoyuan", "Taichung", "Tainan", "Kaohsiung", "Keelung", "Hsinchu", "HsinchuCounty", "MiaoliCounty", "ChanghuaCounty", "NantouCounty", "YunlinCounty", "ChiayiCounty", "ChiayiCounty", "Chiayi", "PingtungCounty", "YilanCounty", "HualienCounty", "TaitungCounty", "KinmenCounty", "PenghuCounty", "LienchiangCounty"]
base_url = "https://tdx.transportdata.tw/api/basic"
end_point = "/v2/Road/Traffic/Live/VD/City/{}"
tdx = TDX()
# dfs = []
# for city in cities:
#     url = base_url + end_point.format(city)
#     try:
#         df = pd.DataFrame.from_records(tdx.get_response(url))
#     except ValueError:
#         pass
#     except:
#         pass
#     else:
#         dfs.append(df)
# df = pd.concat(dfs).reset_index(drop=True)
# print(df)
url = base_url + end_point.format("Taipei")
df = pd.DataFrame.from_records(tdx.get_response(url))
    
    

In [None]:
df["VDLives"].apply(pd.Series)["LinkFlows"].apply(pd.Series)

In [None]:
df["VDLives"].apply(pd.Series)["LinkFlows"].apply(pd.Series)[0].apply(pd.Series)["Lanes"].apply(pd.Series)

In [None]:
from concurrent.futures import ThreadPoolExecutor

In [None]:
with ThreadPoolExecutor() as executor:
    tpe = executor.submit(cb_time_range_data, 2022, 11, 17, 15, 11, 2022, 11, 17, 15, 52)
    ntp = executor.submit(cb_time_range_data, 2022, 11, 17, 15, 11, 2022, 11, 17, 15, 52)

In [None]:
test_data = cb_time_range_data(2022, 11, 17, 15, 11, 2022, 11, 17, 15, 40)

In [None]:
test = test_data.reset_index(drop=True)

In [None]:
test[(test["OperatorID"] == "100") & (test["PlateNumb"].isin(["KKA-1225", "KKA-1232"]))]

In [None]:
direction_dict = {0 : "去程(南下)", 1 : "返程(北上)", 2 : "迴圈(雙向)"}
busStatus_dict = {0: "正常", 1 : "車禍", 2 : "故障", 3 : "塞車", 4 : "緊急救援", 5 : "加油", 98 : "偏移路線", 99 : "非營業狀態", 100 : "客滿", 101 : "包車出租", 255 : "未知"}
busDutyStatus_dict = {0: "正常", 1 : "開始", 2 : "結束"}
operator = pd.read_excel(r"D:\kc.hsu\OneDrive - Bridgestone\數據\TDX\20221026_國道客運業者名單.xlsx")
operator_dict = dict(zip(operator["OperatorID"], operator["OperatorNameZh_tw"]))
route = pd.read_csv(r"D:\kc.hsu\OneDrive - Bridgestone\數據\TDX\公路客運路線代碼表.csv")
route_dict = dict(zip(route["路線唯一識別代碼"], route[["起站中文名稱", "終站中文名稱"]].apply(tuple, axis=1)))
message_dict = {0 : "未知", 1 : "定期", 2 : "非定期"}
df["Operator"] = df["OperatorID"].map(operator_dict)
df["車輛方向"] = df["Direction"].map(direction_dict)
df["車輛狀態"] = df["BusStatus"].map(busStatus_dict)
df["車輛勤務狀態"] = df["DutyStatus"].map(busDutyStatus_dict)
df["執勤路線"] = df["RouteUID"].map(route_dict)
df["班車資訊"] = df["MessageType"].map(message_dict)