In [10]:
import argparse


CITY_CODES_PATH = '../static/codes.json'

# multiprocessing library
N_PROCESSES = 2

# Number of web browsers opened at same time (can depend empirically of internet speed)
N_BROWSERS = 1

# Range of days from start date to look up for flight fares
SPAN_WINDOW = 15

# Range of days to look up before and after expected number of days in stay

RETURN_WINDOW = 10

# Saving to csv rate
SAVING_RATE = 12

# URL body for this situation : 1 adult, French website, EUR currency
PRE_URL = "https://www.skyscanner.fr/transport/vols/"
URL_PARAMS = {
    'adults': 1,
    'children': 0,
    'adultsv2': 1,
    'childrenv2': 0,
    'infants': 0,
    'cabinclass': 'economy',
    'rtn': 0,
    'preferdirects': 'false',
    'outboundaltsenabled': 'false',
    'inboundaltsenabled': 'false',
    'ref': 'home'
}
POST_URL = "/?" + '&'.join([key + '=' + str(value) for key, value in URL_PARAMS.items()])

COLUMNS_ONEWAY = [
    'From', 'To', 'Date', 'Departure time', 'Duration', 'Company', 'Direct flight', 'Price', 'Currency'
]
COLUMNS_RETURN = [
    'From', 'To', 'Date - OneWay', 'Departure time - OneWay', 'Duration - OneWay', 'Direct flight - OneWay',
    'Company - OneWay', 'Date - Return', 'Departure time - Return', 'Duration - Return', 'Direct flight - Return',
    'Company - Return', 'Price', 'Currency'
]


def get_args():
    parser = argparse.ArgumentParser()
    parser.add_argument('-d', help='Departure airport', required=True)
    parser.add_argument('-a', help='Arrival airport', required=True)
    parser.add_argument('-s', help='Start date to look up', required=True)
    parser.add_argument('--days', help='Stay in days before return (0 if one way flight)', default=0)
    args = parser.parse_args()
    return args

from selenium.webdriver import Chrome, ChromeOptions
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.common.by import By
from time import sleep

# TODO : class browser
def init_browser():
    """
    Initialize the browser
    """
    chrome_options = ChromeOptions()
    chrome_options.add_argument("--incognito")
    browser = Chrome(options=chrome_options)
    return browser


def cookie_banner(browser):
    try:
        browser.find_element_by_xpath('//button[contains(text(), "OK")]') \
                .click()
        return 1
    except:
        return 0


def covid_banner(browser):
    try:
        browser.find_element_by_xpath('//a[contains(text(), "Cancel")]') \
                .click()
        return 1
    except:
        return 0


def price_alert_banner(browser):
    try:
        browser.find_element_by_xpath('//button[contains(@class, "BpkCloseButton_bpk-close-button_")]') \
                .click()
        return 1
    except:
        return 0


def bot_is_detected(browser):
    """
    <section class="App_App__headline__fdpD_">Êtes-vous une personne ou un robot ?</section>
    <section class="App_App__instruction__3GRTU">Cochez la case pour continuer :</section>
    <div class="recaptcha-checkbox-border" role="presentation" style=""></div>
    """
    try:
        _ = browser.find_element_by_xpath('//section[contains(text(), "Êtes-vous une personne ou un robot ?")]')
        return 1
    except:
        return 0


def format_page_results(browser, trip, df, banners, is_one_way):
    """
    For a given browser (Selenium object) and a given trip (string concatening
    Departure, Arrival and Date), appends results of web page to DataFrame
    Note that there are several banners to handle only once
    """
    browser.get(PRE_URL + trip + POST_URL)
    WebDriverWait(browser, 25).until(EC.presence_of_element_located((By.ID, "Layer_1")))
    # Bot
    if bot_is_detected(browser):
        browser.close()
        browser = init_browser()
        sleep(1)
        browser.get(PRE_URL + trip + POST_URL)
        WebDriverWait(browser, 25).until(EC.presence_of_element_located((By.ID, "Layer_1")))
    # Cookie banner
    if not banners['cookie']:
        sleep(1)
        if cookie_banner(browser):
            banners['cookie'] = True
    # Price alert banner
    if not banners['price_alert']:
        sleep(8) # Security at set up to dodge banners
        if price_alert_banner(browser):
            banners['price_alert'] = True
    # Covid banner
    if not banners['covid']:
        sleep(15) # Security at set up to dodge banners
        if covid_banner(browser):
            banners['covid'] = True
    # Results
    try: # Plus de résultats
        plus = browser.find_element_by_xpath('//button[contains(text(), "Plus de résultats")]')
        plus.click()
    except:
        pass
    sleep(2)
    # Flight boxes
    boxes = browser.find_elements_by_xpath('//div[starts-with(@class, "EcoTicketWrapper_itineraryContainer_")]')
    for j in range(len(boxes)):
        # NOTE TO MYSELF : " .// " in the xpath, otherwise whole page is read !
        price_currency = boxes[j].find_element_by_xpath('.//div[starts-with(@class, "Price_mainPriceContainer_")]').text
        price_currency.strip()
        assert len(price_currency.split(' ')) == 2
        if is_one_way:
            direct_or_stops = boxes[j].find_element_by_xpath('.//div[contains(@class, "LegInfo_stopsLabelContainer_")]').text
            try:
                company = boxes[j].find_element_by_xpath('.//div[contains(@class, "LegLogo_legImage_")]') \
                    .find_element_by_tag_name("img").get_attribute("alt")
            except:
                company = boxes[j].find_element_by_xpath('.//div[contains(@class, "TicketBody_legLogo_")]').text
            df.at[trip + '_' + str(j)] = [
                trip.split('/')[0],
                trip.split('/')[1],
                trip.split('/')[2],
                boxes[j].find_element_by_xpath('.//span[contains(@class, "LegInfo_routePartialTime_")]').text, # Departure Time
                boxes[j].find_element_by_xpath('.//span[contains(@class, "Duration_duration_")]').text, # Flight Length
                direct_or_stops.split("\n")[0], # Direct ?
                company, # Airtravel Company
                int(price_currency.split(' ')[0]), # Price
                price_currency.split(' ')[1] # Currency
            ]
        else:
            ways = boxes[j].find_elements_by_xpath('.//div[starts-with(@class, "LegDetails_container_")]')
            assert len(ways)==2
            direct_or_stops_1 = ways[0].find_element_by_xpath('.//div[contains(@class, "LegInfo_stopsLabelContainer_")]').text
            direct_or_stops_2 = ways[1].find_element_by_xpath('.//div[contains(@class, "LegInfo_stopsLabelContainer_")]').text
            try:
                company_1 = ways[0].find_element_by_xpath('.//div[contains(@class, "LegLogo_legImage_")]') \
                                .find_element_by_tag_name("img").get_attribute("alt")
            except:
                company_1 = ways[0].find_element_by_xpath('.//div[contains(@class, "TicketBody_legLogo_")]').text
            try:
                company_2 = ways[1].find_element_by_xpath('.//div[contains(@class, "LegLogo_legImage_")]') \
                                .find_element_by_tag_name("img").get_attribute("alt")
            except:
                company_2 = ways[1].find_element_by_xpath('.//div[contains(@class, "TicketBody_legLogo_")]').text
            df.at[trip + '_' + str(j)] = [
                trip.split('/')[0],
                trip.split('/')[1],
                trip.split('/')[2],
                ways[0].find_element_by_xpath('.//span[contains(@class, "LegInfo_routePartialTime_")]').text, # Departure Time
                ways[0].find_element_by_xpath('.//span[contains(@class, "Duration_duration_")]').text, # Flight Length
                direct_or_stops_1.split("\n")[0], # Direct ?
                company_1, # Airtravel Company
                trip.split('/')[3],
                ways[1].find_element_by_xpath('.//span[contains(@class, "LegInfo_routePartialTime_")]').text, # Departure Time
                ways[1].find_element_by_xpath('.//span[contains(@class, "Duration_duration_")]').text, # Flight Length
                direct_or_stops_2.split("\n")[0], # Direct ?
                company_2, # Airtravel Company
                int(price_currency.split(' ')[0]), # Price
                price_currency.split(' ')[1] # Currency
            ]
            del ways
    del boxes
    return df


import pandas as pd
from openpyxl import load_workbook


def init_workbook(filename, stay_in_days = 0):
    if stay_in_days:
        df = pd.DataFrame(columns=COLUMNS_RETURN)
    else:
        df = pd.DataFrame(columns=COLUMNS_ONEWAY)
    df.to_excel(filename, index=False)
    return 1


def save_df_to_workbook(filename, df):
    book = load_workbook(filename) 
    writer = pd.ExcelWriter(filename, engine='openpyxl')
    writer.book=book  # Save former sheets
    df.to_excel(writer, sheet_name='Sheet1', index=False)
    writer.save()
    return 1


def final_workbook(filename):
    """
    Concat all sheets (from each browser parallelisation) in one sheet    
    """
    df = pd.read_excel(filename, sheet_name=None)            
    sheets = [k for k in df.keys() if not df[k].empty]
    df_final = df[sheets.pop(0)]
    for sheet in sheets:
        df_final = pd.concat([df_final, df[sheet]], axis=0)
    df_final.to_csv(filename.replace('.xslx', '.csv'), index=False)
    return 1


from time import sleep

import pandas as pd
import json


def get_city_codes_on_skyscanner(departure, arrival):
    """ Get the cities nomenclature in URL """
    dict_cities = {}
    browser = init_browser()
    browser.get("https://www.skyscanner.fr/")
    cookie_banner(browser)
    browser.find_element_by_id("fsc-origin-search").clear()
    browser.find_element_by_id("fsc-origin-search").send_keys(departure)
    browser.find_element_by_id("fsc-destination-search").send_keys(arrival)
    browser.find_element_by_id("fsc-trip-type-selector-one-way").click()
    browser.find_element_by_id("fsc-origin-search").click()
    browser.find_element_by_id("fsc-destination-search").click()
    browser.find_element_by_xpath('//button[contains(text(), "Trouver un vol")]').click()
    sleep(2.5)
    # Bot detection : manual intervention for this session
    if bot_is_detected(browser):
        print("You have 100 seconds to complete captcha test !")
        t = sleep(100)  # 100 seconds to pass captcha test
    url = browser.current_url
    url = url.replace(PRE_URL, '')
    url = url.replace(POST_URL, '')
    # Cities nomenclature in Skyscanner's URL
    dict_cities['departure'] = {'name': departure, 'code': url.split('/')[0]}
    dict_cities['arrival'] = {'name': arrival, 'code': url.split('/')[1]}
    browser.close()
    return dict_cities


def get_city_codes(departure, arrival):
    with open(CITY_CODES_PATH, 'r') as f:
        codes = json.load(f)
    dict_cities = {
        'departure': {'name': departure, 'code': codes.get(departure, '')},
        'arrival': {'name': arrival, 'code': codes.get(arrival, '')}
    }
    if not (dict_cities['departure']['code'] and dict_cities['arrival']['code']):
        dict_cities = get_city_codes(departure, arrival)
    return dict_cities



def create_trips_timetable(dict_cities, start_date, stay_in_days = 0):
    """ Create list of trips by incrementing start date given span window
        and incrementing stay in days if return is expected """
    departure_delay = (pd.Timestamp(start_date) - pd.Timestamp.now()).days
    departure_days = [
        pd.Timestamp.now() + pd.Timedelta(days=departure_delay + i) for i in range(SPAN_WINDOW)
    ]
    if stay_in_days:
        # Return trip
        trips = [
            dict_cities['departure']['code'] + '/' + dict_cities['arrival']['code'] + '/' +
            str(x)[2:4] + str(x)[5:7] + str(x)[8:10] + "/"
            + str(x + pd.Timedelta(days=d + stay_in_days))[2:4]
            + str(x + pd.Timedelta(days=d + stay_in_days))[5:7]
            + str(x + pd.Timedelta(days=d + stay_in_days))[8:10]
            for x in departure_days for d in range(-1 * RETURN_WINDOW, RETURN_WINDOW)
        ]
    else:
        # One Way flight
        trips = [
            dict_cities['departure']['code'] + '/' + dict_cities['arrival']['code'] + '/' + str(x)[2:4]
            + str(x)[5:7] + str(x)[8:10] for x in departure_days
        ]
    return trips


def setup_webscrap(departure, arrival, start_date, stay_in_days = 0):
    """
    Create a list of Trips to search for (a trip is a String object that
    concatenate Departure, Arrival and Date information)
    """
    dict_cities = get_city_codes(departure, arrival)
    if not (dict_cities['departure']['code'] and dict_cities['arrival']['code']):
        return False
    trips = create_trips_timetable(dict_cities, start_date, stay_in_days)
    return trips


import pandas as pd
import multiprocessing as mp

def main_oneway():
    """ Initialize a browser and search results for a list of trips,
    appends results to output DataFrame """
    global filename
    df = pd.DataFrame(columns=COLUMNS_ONEWAY)
    banners = {'cookie': False, 'covid': False, 'price_alert': False}
    browser = init_browser()
    for j in range(len(trips)):
        df = format_page_results(browser, trips[j], df, banners, is_one_way=True)
        if (j+1) % SAVING_RATE == 0:
            save_df_to_workbook(filename, df)
            print("Intermediate save of data to workbook")
            df = pd.DataFrame(columns=COLUMNS_ONEWAY)
    browser.close()
    save_df_to_workbook(filename, df)
    return df


def main_return(trips):
    """ Initialize a browser and search results for a list of trips,
    appends results to output DataFrame """
    global filename
    df = pd.DataFrame(columns=COLUMNS_RETURN)
    banners = {'cookie': False, 'covid': False, 'price_alert': False}
    browser = init_browser()
    for j in range(len(trips)):
        df = format_page_results(browser, trips[j], df, banners, is_one_way=False)
        if (j+1) % SAVING_RATE == 0:
            save_df_to_workbook(filename, df)
            print("Intermediate save of data to workbook")
            df = pd.DataFrame(columns=COLUMNS_RETURN)
    browser.close()
    save_df_to_workbook(filename, df)
    return df


In [11]:
class Args:
    def __init__(self, departure, arrival, start_date, stay_in_days = 0):
        self.d = departure
        self.a = arrival
        self.s = start_date
        self.days = stay_in_days

args = Args('Paris', 'Bangkok', '20220801', 40)
trips = setup_webscrap(args.d, args.a, args.s, int(args.days))
if trips:
    # Workbook for results
    filename = (
        "./workbooks/Skyscanner_from_" + args.d + "_To_" + args.a + "_"
        + ("One-Way.xlsx" if args.days==0 else "Return.xlsx")
    )
    # Check if filename exists and has former data saved on 'Sheet1'
    try:
        _ = pd.read_excel(filename)
    except:
        init_workbook(filename, args.days)

In [8]:
df = pd.DataFrame(columns=COLUMNS_RETURN)
banners = {'cookie': False, 'covid': False, 'price_alert': False}
browser = init_browser()



In [9]:
for j in range(len(trips)):
    df = format_page_results(browser, trips[j], df, banners, is_one_way=False)
    if (j+1) % SAVING_RATE == 0:
        save_df_to_workbook(filename, df)
        print("Intermediate save of data to workbook")
        df = pd.DataFrame(columns=COLUMNS_RETURN)

NoSuchWindowException: Message: Browsing context has been discarded


In [7]:
browser.close()
save_df_to_workbook(filename, df)
final_workbook(filename)

Paris 40
