In [2]:
import pandas as pd
import requests
from bs4 import BeautifulSoup
import re
from typing import Dict, List
import pyprind
from math import cos, pi, floor
from datetime import datetime

In [5]:
def parse_challenge(page):
    """
    Parse a challenge given by mmi and mavat's web servers, forcing us to solve
    some math stuff and send the result as a header to actually get the page.
    This logic is pretty much copied from https://github.com/R3dy/jigsaw-rails/blob/master/lib/breakbot.rb
    """
    top = page.split('<script>')[1].split('\n')
    challenge = top[1].split(';')[0].split('=')[1]
    challenge_id = top[2].split(';')[0].split('=')[1]
    return {'challenge': challenge, 'challenge_id': challenge_id, 'challenge_result': get_challenge_answer(challenge)}

def telegram_bot_sendtext(bot_message: str) -> None:
    """
    Send telegram msg for my bot
    """
    bot_token = '1172952527:AAGoM74Rx25DPBpmQhEwacs_AQ9GWI8Oybk'
    chat_id = "839266998"
    send_text = 'https://api.telegram.org/bot' + bot_token + '/sendMessage?chat_id=' + chat_id + '&parse_mode=Markdown&text=' + bot_message
    requests.get(send_text)

def get_challenge_answer(challenge):
    """
    Solve the math part of the challenge and get the result
    """
    arr = list(challenge)
    last_digit = int(arr[-1])
    arr.sort()
    min_digit = int(arr[0])
    subvar1 = (2 * int(arr[2])) + int(arr[1])
    subvar2 = str(2 * int(arr[2])) + arr[1]
    power = ((int(arr[0]) * 1) + 2) ** int(arr[1])
    x = (int(challenge) * 3 + subvar1)
    y = cos(pi * subvar1)
    answer = x * y
    answer -= power
    answer += (min_digit - last_digit)
    answer = str(int(floor(answer))) + subvar2
    return answer

def make_hkjc_request(url: str) -> str:
    headers = {'User-agent': 'Mozilla/5.0 (X11; U; Linux i686; en-US; rv:1.9.0.1) Gecko/2008071615 Fedora/3.0.1-1.fc9 Firefox/3.0.1'}
    with requests.Session() as s:
        r = s.get(url, headers = headers)
        if 'X-AA-Challenge' in r.text:
            challenge = parse_challenge(r.text)
            cookies = s.get(url, headers={
                'X-AA-Challenge': challenge['challenge'],
                'X-AA-Challenge-ID': challenge['challenge_id'],
                'X-AA-Challenge-Result': challenge['challenge_result']
            }).cookies  
        else:
            cookies = r.cookies

        r = s.post(url, cookies = cookies)

    return r.text

def evaluate_course(title):
    clean_title = title.replace(" ","").lower()
    if "shatin" in clean_title:
        return "ST"
    
    elif "happyvalley" in clean_title:
        return "HV"
    
    else:
        return None


def scrape_prize_pool(date) -> pd.DataFrame:
    df = pd.DataFrame()
    
    html_list = []
    # first race
    url = f"https://racing.hkjc.com/racing/information/english/Racing/LocalResults.aspx?RaceDate={date}"
    first_race = make_hkjc_request(url)
    soup = BeautifulSoup(first_race, "lxml")
    html_list.append(soup)

    # extract info
    num_of_races = len(soup.find("table", attrs = {"class": re.compile("f_fs12.+racecard$")}).find_all("a"))
    course = evaluate_course(str(soup.find("span", attrs = {"class": "f_fl f_fs13"})))
    #overhead_text = soup.find("tr", attrs = {"class": re.compile("bg_blue.+font_wb$")}).get_text()
    #first_race_no = re.search("\((\d+)\)", overhead_text).group(1)

    # extract info for all other races and append to html_list
    for i in range(2, num_of_races):
        url = f"https://racing.hkjc.com/racing/information/english/Racing/LocalResults.aspx?RaceDate={date}&Racecourse={course}&RaceNo={str(i)}"
        race_info = make_hkjc_request(url)
        soup = BeautifulSoup(race_info, "lxml")
        html_list.append(soup)

    # get prize pool and race_number
    for soup in html_list:
        try:
            df_temp = pd.read_html(str(soup.find("div", attrs = {"class": "dividend_tab f_clear"})), thousands = "placeholder")[0]
            df_temp.columns = df_temp.columns.get_level_values(1)
            overhead_text = soup.find("tr", attrs = {"class": re.compile("bg_blue.+font_wb$")}).get_text()
            race_no = re.search("\((\d+)\)", overhead_text).group(1)
            df_temp["race_no"] = race_no
            df_temp["date"] = date

            for col in df_temp.columns:
                df_temp[col] = [str(x) for x in df_temp[col]]

            df = df.append(df_temp)
            
        except ValueError:
            pass

    return df

def remove_scraped_date(date_list: list) -> list:
    """
    read status.txt and remove those already been successfully scraped
    return a filtered horse_id_list for future loops
    """
    with open("status_prize_pool.txt","r",encoding = 'utf-8') as txt:
        status = txt.readlines()
        status = [x.replace('\n','') for x in status]
    date_list = [x for x in date_list if x not in status]
    return date_list

def update_status(date: str) -> None:
    with open("status_prize_pool.txt","a+",encoding = 'utf-8') as txt:
        txt.write(date+"\n")



In [6]:
if __name__ == "__main__":
    # fetch date list
    telegram_bot_sendtext(f"Prize pool scraper program started at {str(datetime.now())}.")
    error_list = []
    date_list = list(pd.read_pickle("1_horse_id_data.pkl")["date"])
    date_list = remove_scraped_date(date_list)

    print(f"Total {str(len(date_list))} race days.")
    bar = pyprind.ProgBar(len(date_list))
    
    for date in date_list:
        try:
            status_int = 0
            df = scrape_prize_pool(date)
            
            prize_col = {'Pool': 40,
                         'Winning Combination': 50,
                         'Dividend (HK$)': 40,
                         'race_no': 10,
                         'date': 20}
            

            # write to hd5 -> hd5 defaults to append mode
            df.to_hdf(r"data_20200822\prize_pool.h5", key = "prize_pool", append = True, format = "table", min_itemsize = prize_col)
            
            # report status to telegram
            if status_int % 200 == 0 and status_int > 0:
                telegram_bot_sendtext(f"Completed {str(status_int)} out of {str(len(date_list))} at {str(datetime.now())} with {len(error_list)} errors.")

            bar.update()
            update_status(date)
            status_int += 1

        except Exception as e:
            status_int +=1
            print(f"{date}: {str(e)}")
            error_list.append((date, str(e)))
            bar.update()

Total 1 race days.


  check_attribute_name(name)
  check_attribute_name(name)
  check_attribute_name(name)
  check_attribute_name(name)
  check_attribute_name(name)
  check_attribute_name(name)
0% [#] 100% | ETA: 00:00:00
Total time elapsed: 00:00:14
