In [1]:
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
import time
import pandas as pd
import numpy as np
from pathlib import Path
import os
import itertools
import threading
import sys

In [2]:
def initialize_patstat():
    """
    Function to initialize Pastat with Selenium
    """
    
    initialized = False
    #Waiting animation
    def animate():
        for c in itertools.cycle(['   ','.  ', '.. ', '...']):
            if initialized:
                break
            sys.stdout.write('\rInitializing Patstat, please wait' + c)
            sys.stdout.flush()
            time.sleep(0.5)
        sys.stdout.write('\nPatstat initialized.')

    t = threading.Thread(target=animate, daemon=True)
    t.start()
    
    #Browser exposes an executable file
    #Through Selenium test we will invoke the executable file which will then invoke actual browser
    driver = webdriver.Firefox(executable_path="C:\Program Files\Mozilla Firefox\geckodriver.exe")
    # Maximize the browser window
    driver.maximize_window()
    # Method to launch the URL
    URL = driver.get("https://data.epo.org/expert-services/")
    # Close PatStat warning message
    time.sleep(1)
    while len([el for el in driver.find_elements(By.ID, 'dijit_form_Button_0_label') if 'Close' in el.text]) != 1:
        pass
    button_Close = [el for el in driver.find_elements(By.ID, 'dijit_form_Button_0_label') if 'Close' in el.text][0]
    button_Close.click()
    time.sleep(1)
    
    # Connect to Patstat using the credentials
    
    username='contact@chaireeconomieduclimat.org'
    password='cLnI7ZDA'
    
    button_Username = driver.find_element(By.ID,'username')
    button_Username.click()
    button_Username.send_keys(username)
    button_Password = driver.find_element(By.ID,'password')
    button_Password.click()
    button_Password.send_keys(password)

    button_Log_in = driver.find_element(By.ID,'buttonLogin_label').click()
    time.sleep(1)

    # PatStat Spring 2022
    objects = [el for el in driver.find_elements(By.CLASS_NAME, 'fullPart') if 'PATSTAT Online' in el.text]
    for obj in objects:
        try:
            obj.click()
            break
        except:
            pass

    # Go to the tab Table
    go_to_Table = driver.find_element(By.ID,'goToTableLink')
    go_to_Table.click()
    time.sleep(1)

    # Look for the query field in the elements of the page
    list_el = [el for el in driver.find_elements_by_xpath('//*') if 'tls201' in el.text]
    
    #If the function does not work, maybe the idenfication of the query field is wrong.
    #The second element (index 1) has been identified as the good one in previous trials.
    #Try to set query_el = range(len(list_el)) or identify the right element corresponding to the query field,
    #within 0 to len(list_el).
    
    query_field = list_el[1]
    list_query_field_click = [list_el[i] for i in range(13,22)]
    
    initialized = True
    
    return driver, query_field, list_query_field_click

In [3]:
def write_query(text):
    """
    Writes text to the query field.
    """
    query_field.click()
    try:
        #try_click(By.CLASS_NAME,'CodeMirror')
        query_field.click()
        #time.sleep(1)
        query_field.send_keys(Keys.CONTROL + "a")
        query_field.send_keys(text)
    except:
        pass

In [4]:
def try_click(by_what, text_search, condition=None, display=True):
    """
    A function that tries to click elements of type 'by_what' which names contain 'text_search'
    """
    # Look for elements
    objects = driver.find_elements(by_what, text_search)
    
    if condition:
        objects = [el for el in objects if condition in el.text]
    
    count = 0
    clicked_object = None
    
    # Try to click on found elements
    for obj in objects:
        try:
            obj.click()
            clicked_object = obj
            break
        except:
            count += 1
            pass
    if count == len(objects) and display:
        print('No element clicked')
    return clicked_object

In [5]:
def download_all():
    """
    A function that downloads all files.
    """
    
    button_download_menu = driver.find_element(By.ID,'download_text')
    button_download_menu.click()
    time.sleep(0.5)
    button_download_menu_2 = driver.find_element(By.ID,'downloadManagerMenuItem_text')
    button_download_menu_2.click()
    
    while len([q for q in driver.find_elements(By.CLASS_NAME, 'exportStatusMessage') if 'Ready' in q.text]) != 1:
        pass
    
    for download_button in driver.find_elements(By.XPATH,"//*[contains(@class, 'buttonDownloadSave') and not(contains(@class, 'buttonDownloadDelete'))]"):
        try:
            download_button.click()
        except:
            pass

In [6]:
show_again = True
def clear_all_downloads():
    """
    Clears all the downloads.
    """
    
    global show_again
    
    button_download_menu = driver.find_element(By.ID,'download_text')
    button_download_menu.click()
    time.sleep(1)
    button_download_menu_2 = driver.find_element(By.ID,'downloadManagerMenuItem_text')
    button_download_menu_2.click()
    time.sleep(1)
    try_click(By.CLASS_NAME, 'dijitReset', condition='Clear list', display=False)
    
    # Clicking the confirmation dialogue
    if show_again:
        try_click(By.CLASS_NAME, 'doNotShowAgain', display=False)
        try_click(By.CLASS_NAME, 'dijitReset', condition='Yes', display=False)
        show_again = False

    # Closing the download manager
    close_dialogue()

In [7]:
def close_dialogue():
    """
    Function to close any dialogue.
    """
    for close_button in driver.find_elements(By.CLASS_NAME,'dijitDialogCloseIcon'):
        try:
            close_button.click()
        except:
            pass

In [8]:
def search():
    """
    Function to launch the query.
    """
    _ = try_click(By.ID,'queryBlockContentResultTable_btLaunchQuery')

In [9]:
def find_nb_rows():
    """
    Function to find the number of rows of a query. Uses the red figure displayed on the top left.
    """
    try:
        idx = driver.page_source.find(' rows')
        text_around = driver.page_source[idx-100:idx]
        idx = text_around.find(""""onDblClick:stopEvent">""")
        text_around = text_around[idx+23:]
        nb_rows = int(text_around.replace("&nbsp;",''))
        return nb_rows
    
    except ValueError:
        print('No query found.')

In [10]:
def prepare_download():
    """
    Function to prepare the download of the actual query.
    """
    
    nb_rows = find_nb_rows()
    delta_row = 700000 # maximum entry for a download in Patstat
    row = 1
    count_downloads = 0
    while row + delta_row < nb_rows:
        
        count_downloads += 1

        button_download_menu = driver.find_element(By.ID,'download_text')
        button_download_menu.click()
        time.sleep(1)
        button_download_menu_2 = driver.find_element(By.ID,'downloadMenuItem_text')
        button_download_menu_2.click()
        time.sleep(1)

        content_from = driver.find_element(By.ID,'dijit_form_TextBox_1')
        content_from.click()
        content_from.send_keys(Keys.CONTROL + "a") #select all
        content_from.send_keys(row)

        content_to = driver.find_element(By.ID,'dijit_form_TextBox_2')
        content_to.click()
        content_to.send_keys(Keys.CONTROL + "a") #select all
        content_to.send_keys(row+delta_row-1)

        button_OK = driver.find_element(By.ID,'btDownload_label').click()

        row += delta_row

        time.sleep(1)

    # Last query
    
    count_downloads += 1

    button_download_menu = driver.find_element(By.ID,'download_text')
    button_download_menu.click()
    time.sleep(1)
    button_download_menu_2 = driver.find_element(By.ID,'downloadMenuItem_text')
    button_download_menu_2.click()
    time.sleep(1)

    content_from = driver.find_element(By.ID,'dijit_form_TextBox_1')
    content_from.click()
    content_from.send_keys(Keys.CONTROL + "a") #select all
    content_from.send_keys(row)

    content_to = driver.find_element(By.ID,'dijit_form_TextBox_2')
    content_to.click()
    content_to.send_keys(Keys.CONTROL + "a") #select all
    content_to.send_keys(nb_rows)

    button_OK = driver.find_element(By.ID,'btDownload_label').click()
    
    print(count_downloads,'download(s) ready')
    return count_downloads

In [11]:
def wait():
    '''
    A function that waits for a task to end on Patstat.
    
    The screen usually darken while some tasks are performed on Patstat,
    preventing the background to be interacted with.
    This function simply tries to interact with the background continuously.
    If it is not available, the tasks are considered to be still in progress.
    '''
    
    flag = True
    while flag:
        try:
            list_query_field_click[0].click()
            flag = False
            break
        except:
            pass


In the following example, the script launches three different queries. One for the patent count between 1990 and 1999, one between 2000 and 2009, and the last one between 2010 and 2019. Notice that this could have been done in one shot, from 1990 to 2019, but this just serves as an example here.

In [12]:
driver, query_field, list_query_field_click = initialize_patstat()

# If TEST is displayed in the query field. Identification worked properly.
write_query('TEST')
print('\nCheck if the word TEST is displayed in the query field.')

Initializing Patstat, please wait   
Patstat initialized.
Check if the word TEST is displayed in the query field.


In [13]:
def example_query(date_start, date_end):
    text = f"""
SELECT a.appln_filing_year, COUNT(DISTINCT a.appln_id) AS NumberOfPatentApplications
FROM tls201_appln a
JOIN tls209_appln_ipc i ON a.appln_id = i.appln_id
WHERE i.ipc_class_symbol LIKE 'B08B%' -- Change IPC symbol here
AND a.appln_filing_year BETWEEN {str(date_start)} AND {str(date_end)} -- Define year range here
GROUP BY a.appln_filing_year
ORDER BY a.appln_filing_year
"""
    return text

In [14]:
print(example_query(2012,2013))


SELECT a.appln_filing_year, COUNT(DISTINCT a.appln_id) AS NumberOfPatentApplications
FROM tls201_appln a
JOIN tls209_appln_ipc i ON a.appln_id = i.appln_id
WHERE i.ipc_class_symbol LIKE 'B08B%' -- Change IPC symbol here
AND a.appln_filing_year BETWEEN 2012 AND 2013 -- Define year range here
GROUP BY a.appln_filing_year
ORDER BY a.appln_filing_year



In [18]:
clear_all_downloads()

for date_start in [1990,2000,2010]:

    wait()
    
    # Write the query to the field
    write_query(example_query(date_start, date_start+9))
    
    # Launch the query
    search()
    
    # Wait for query to end
    wait()
    print(find_nb_rows(), 'entries found')
    
    # Prepare downloads of the actual query
    count_downloads = prepare_download()
    
    # Wait for preparation and download all
    wait()
    download_all()
    close_dialogue()
    
    # Rename the downloads
    
    # Folder in which files are downloaded
    download_path = Path('C:/Users/Dhorne/Downloads')
    
    # Wait for the download to end
    while len([file for file in download_path.iterdir() if 'resulttable' in file.name]) == 0:
        pass
    downloaded_files = [file for file in download_path.iterdir() if 'resulttable' in file.name]
    
    # Identifier to rename the generic file name. Here, the queries correspond to different dates.
    # Dates are therefore used to rename the files (do not use 'resulttable' in the name!)
    identifier = str(date_start)+'_'+str(date_start+9)

    if count_downloads != len(downloaded_files):
        raise Exception('Found '+str(len(downloaded_files))+' files, expected '+str(count_downloads))
    else:
        if len(downloaded_files) == 1:
            os.rename(downloaded_files[0], str(downloaded_files[0].parent)+'/'+identifier+downloaded_files[0].suffix)
        else:
            count = 1
            for file in downloaded_files:
                os.rename(file, str(file.parent)+'/'+identifier+'_'+str(count)+file.suffix)
                count += 1
                
    clear_all_downloads()

10 entries found
1 download(s) ready


FileExistsError: [WinError 183] Impossible de créer un fichier déjà existant: 'C:\\Users\\Dhorne\\Downloads\\resulttable-20221209112325.zip' -> 'C:\\Users\\Dhorne\\Downloads/1990_1999.zip'