In [None]:
from bs4 import BeautifulSoup
import pandas as pd
import time
import random
import undetected_chromedriver as uc
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, ElementClickInterceptedException

class EspacenetScraper:
    def __init__(self, headless=True):
        """Initialize the scraper with configurable options."""
        options = uc.ChromeOptions()
        if headless:
            options.add_argument('--headless')  # Run in headless mode

        options.add_argument('--disable-blink-features=AutomationControlled')
        self.driver = uc.Chrome(options=options)
        self.driver.set_page_load_timeout(30)
        self.driver.set_window_size(1600, 1300)

    def add_random_delay(self, min_seconds=1, max_seconds=3):
        """Add a random delay to mimic human behavior."""
        time.sleep(random.uniform(min_seconds, max_seconds))

    def get_page_html(self, url, retries=3):
        """
        Navigate to the given URL and return the page HTML.
        Retry the operation if a timeout occurs.

        Args:
            url (str): The URL to navigate to.
            retries (int): Number of retry attempts.

        Returns:
            str: The page HTML, or None if all retries fail.
        """
        for attempt in range(retries):
            try:
                print(f"Navigating to: {url} (Attempt {attempt + 1})")
                self.driver.get(url)
                WebDriverWait(self.driver, 30).until(
                    EC.presence_of_element_located((By.TAG_NAME, "body"))
                )

                # Add a random delay to mimic human behavior
                self.add_random_delay(3, 5)

                # Return the page HTML
                return self.driver.page_source

            except TimeoutException:
                print(f"Timed out waiting for the page to load. Retrying ({attempt + 1}/{retries})...")
                if attempt == retries - 1:
                    print("Max retries reached. Unable to load the page.")
                    return None
            except Exception as e:
                print(f"An error occurred: {e}")
                return None

    def download_csv(self, retries=3, max_results=500):
        """
        Complete the sequence of clicking:
        1. More Options button
        2. Download dropdown
        3. List (CSV) option
        4. Handle download dialog by:
           - Setting the "To" value to max_results (e.g., 500)
           - Clicking the Download button
        
        Args:
            retries (int): Number of retry attempts for the entire sequence.
            max_results (int): Maximum number of results to download (1-500).

        Returns:
            bool: True if the download sequence was successful, False otherwise.
        """
        for attempt in range(retries):
            try:
                print(f"Attempting download sequence (Attempt {attempt + 1})...")
                
                # Step 1: Click "More Options" button
                print("Looking for More Options button...")
                more_options_selector = "#more-options-selector--publication-list-header"
                more_options_button = WebDriverWait(self.driver, 30).until(
                    EC.element_to_be_clickable((By.CSS_SELECTOR, more_options_selector))
                )
                
                # Try to click, but handle intercepted clicks
                try:
                    print("More Options button found. Clicking...")
                    more_options_button.click()
                except ElementClickInterceptedException:
                    print("Click intercepted, trying JavaScript click...")
                    self.driver.execute_script('document.querySelector("#more-options-selector--publication-list-header")', more_options_button)
                    
                self.add_random_delay(2, 3)
                print('More Options clicked successfully')
                
                # Step 2: Click "Download" section in the dropdown
                print("Looking for Download section...")
                # Use a more general selector to find the Download section
                # This uses contains() to match the text rather than a fixed CSS path
                download_section_xpath = "/html/body/div[2]/div[3]/ul/section[1]"
                download_section = WebDriverWait(self.driver, 10).until(
                    EC.element_to_be_clickable((By.XPATH, download_section_xpath))
                )
                
                try:
                    print("Download section found. Clicking...")
                    download_section.click()
                except ElementClickInterceptedException:
                    print("Click intercepted, trying JavaScript click...")
                    self.driver.execute_script('document.querySelector("#simple-dropdown > div.prod-jss1034.prod-jss966.prod-jss969.prod-jss1045 > ul > section:nth-child(1)")', download_section)
                    
                self.add_random_delay(1, 2)
                print('Download section clicked successfully')
                
                # Step 3: Click "List (CSV)" option
                print("Looking for List (CSV) option...")
                # Use contains() with the XPATH to find the CSV option based on text
                csv_option_xpath = "/html/body/div[2]/div[3]/ul/li[2]"
                csv_option = WebDriverWait(self.driver, 10).until(
                    EC.element_to_be_clickable((By.XPATH, csv_option_xpath))
                )
                
                try:
                    print("List (CSV) option found. Clicking...")
                    csv_option.click()
                except ElementClickInterceptedException:
                    print("Click intercepted, trying JavaScript click...")
                    self.driver.execute_script('document.querySelector("#simple-dropdown > div.prod-jss1034.prod-jss966.prod-jss969.prod-jss1045 > ul > li:nth-child(3)")', csv_option)
                    
                self.add_random_delay(2, 3)
                print('List (CSV) option clicked successfully')
                
                # Step 4: Handle the download dialog
                print("Waiting for download dialog to appear...")
                
                # Wait for the dialog to appear
                download_dialog_xpath = "/html/body/div[2]/div[3]/div/div"
                WebDriverWait(self.driver, 10).until(
                    EC.presence_of_element_located((By.XPATH, download_dialog_xpath))
                )
                print("Download dialog appeared")
                
                # Find the "To" input field
                to_input_xpath = "/html/body/div[2]/div[3]/div/div/div/div[1]/input[2]"
                to_input = WebDriverWait(self.driver, 10).until(
                    EC.presence_of_element_located((By.XPATH, to_input_xpath))
                )
                
                # Clear the input and set it to max_results
                print(f"Setting maximum results to {max_results}...")
                to_input.clear()
                to_input.send_keys(str(max_results))
                self.add_random_delay(1, 2)
                
                # Click the Download button in the dialog
                download_button_xpath = "/html/body/div[2]/div[3]/div/div/div/button"
                download_button = WebDriverWait(self.driver, 10).until(
                    EC.element_to_be_clickable((By.XPATH, download_button_xpath))
                )
                
                try:
                    print("Download button found. Clicking...")
                    download_button.click()
                except ElementClickInterceptedException:
                    print("Click intercepted, trying JavaScript click...")
                    self.driver.execute_script('document.querySelector("body > div.prod-jss12 > div.prod-jss15.prod-jss13 > div > div > div > button")', download_button)
                
                print("Download button clicked")
                
                # Wait for a moment to ensure the download starts
                self.add_random_delay(3, 5)
                
                # Check if there are any error messages
                try:
                    error_message = self.driver.find_element(By.XPATH, "//div[contains(@class, 'download-modal__validation')]//span")
                    if error_message.is_displayed() and error_message.text.strip():
                        print(f"Error in download dialog: {error_message.text}")
                        return False
                except:
                    # No error message found, continue
                    pass
                
                print("Download sequence completed successfully")
                return True
                
            except TimeoutException as e:
                print(f"Timeout during download sequence: {e}")
                if attempt == retries - 1:
                    print("Max retries reached. Download sequence failed.")
                    return False
            except Exception as e:
                print(f"Error during download sequence: {e}")
                if attempt == retries - 1:
                    print("Max retries reached. Download sequence failed.")
                    return False
                
            # If we reach here, there was an error and we need to try again
            # Refresh the page before the next attempt
            try:
                self.driver.refresh()
                WebDriverWait(self.driver, 30).until(
                    EC.presence_of_element_located((By.TAG_NAME, "body"))
                )
                self.add_random_delay(3, 5)
            except Exception as e:
                print(f"Error refreshing page: {e}")

        return False

    def close(self):
        """Close the browser when done."""
        if self.driver:
            self.driver.quit()


if __name__ == '__main__':
    # Initialize the scraper
    scraper = EspacenetScraper(headless=False)  # Set headless to False to see the browser in action

    # Define the search URL
    search_url = 'https://worldwide.espacenet.com/patent/search?q=ctxt%20all%20"hydrogen"%20AND%20ctxt%20all%20"battery"%20AND%20ctxt%20%3D%20"electric%20car"%20AND%20ctxt%20all%20"charging"&queryLang=en%3Ade%3Afr'

    try:
        # Get the page HTML
        html = scraper.get_page_html(search_url, retries=3)
        if html:
            print("Page HTML retrieved successfully.")

            # Perform the download sequence with max 500 results
            if scraper.download_csv(retries=3, max_results=500):
                print("CSV download initiated successfully.")
                # Wait a bit to ensure the download starts
                time.sleep(10)
                print("Download should be complete or in progress.")
            else:
                print("Failed to download CSV.")

    finally:
        # Close the browser
        scraper.close()
        print("Scraper closed.")

read the results into a dataframe 

In [None]:
import os
import glob

# Get the Downloads folder path
downloads_folder = os.path.expanduser("~/Downloads")

# Get all CSV files in the Downloads folder
list_of_files = glob.glob(os.path.join(downloads_folder, "*.csv"))

if list_of_files:  # Ensure there are CSV files
    latest_file = max(list_of_files, key=os.path.getmtime)
    print("Latest downloaded file:", latest_file)

    # Read the latest CSV file into a DataFrame
    import pandas as pd
    df = pd.read_csv(latest_file,delimiter=';', skiprows=7)
    df.head()
else:
    print("No CSV files found in Downloads.")


Latest downloaded file: C:\Users\tasni/Downloads\Résultat_de_la_recherche_dans_Espacenet_20250416_1229.csv


In [5]:
df.head()

Unnamed: 0,No,Titre,Inventeurs,Demandeurs,Numéro de publication,Priorité la plus ancienne,CIB,CPC,Date de publication,Publication la plus ancienne,Numéro de famille,Unnamed: 11
0,1,Mobile charging vehicle based on hydrogen fuel...,ZHANG XIA \r\nSADAMASA TATSUTAKA \r\nZHANG DUO,INNER MONGOLIA YUANSU MENGTAI INTELLIGENT HYDR...,CN221914114U,2023-12-07,B60L53/54 \r\nB60P3/00,,2024-10-29,2024-10-29,93195451,
1,2,INTEGRATED SOLAR-POWERED HIGH-PRESSURE HYDROGE...,KELLY NELSON A [US] \r\nGIBSON THOMAS L [US] \...,GM GLOBAL TECH OPERATIONS INC [US] \r\nKELLY N...,US2010230292A1 \r\nUS8721868B2,2009-03-16,C25B1/02 \r\nC01B3/04 \r\nC25B1/04,"H01M8/04208 (EP,CN,US) \r\nH01M8/0656 (EP,CN,U...",2010-09-16 \r\n2014-05-13,2010-09-16,42729813,
2,3,Solar and hydrogen fuel battery field charging...,CHEN JIAN \r\nZHANG ZICHAO \r\nHAN YU \r\nWANG...,UNIV CHINA AGRICULTURAL,CN109606174A,2019-01-07,B60L53/31 \r\nB60L53/51 \r\nB60L53/53 \r\nB60L...,H02J7/0027 (CN) \r\nY02E60/36 (EP) \r\nY02T10/...,2019-04-12,2019-04-12,66016295,
3,4,NICKEL-HYDROGEN BATTERY FOR ELECTRIC VEHICLE,KISHIMOTO NORIYOSHI \r\nHINO YUZO \r\nITO TAKA...,YUASA BATTERY CO LTD \r\nHONDA MOTOR CO LTD,JP2000173645A,1998-12-07,B60L11/18 \r\nH01M10/30 \r\nB60L11/18 \r\nH01M...,Y02E60/10 (EP) \r\nY02T10/70 (EP),2000-06-23,2000-06-23,18386090,
4,5,HYBRID PLUG-IN BATTERY AND HYDROGEN FUEL ENGIN...,WANG YONGHUA [US],WANG YONGHUA [US],US12005772B2 \r\nUS2022402354A1,2021-06-17,B60K15/01 \r\nB60K15/03 \r\nF02B43/10 \r\nH01M...,"B60K15/00 (US) \r\nB60K15/01 (EP,US) \r\nB60K1...",2022-12-22 \r\n2024-06-11,2022-12-22,84489982,


changing the query to a list of keywords 

In [5]:
from bs4 import BeautifulSoup
import pandas as pd
import time
import random
import undetected_chromedriver as ucwe
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, ElementClickInterceptedException

class EspacenetScraper:
    def __init__(self, search_keywords, headless=True):
        """Initialize the scraper with configurable options and search keywords."""
        self.search_keywords = search_keywords
        options = uc.ChromeOptions()
        if headless:
            options.add_argument('--headless')  # Run in headless mode

        options.add_argument('--disable-blink-features=AutomationControlled')
        self.driver = uc.Chrome(options=options)
        self.driver.set_page_load_timeout(30)
        self.driver.set_window_size(1600, 1300)

    def construct_search_url(self):
        """Construct the search URL based on the provided keywords."""
        base_url = 'https://worldwide.espacenet.com/patent/search?q='
        query = ' AND '.join([f'ctxt all "{keyword}"' for keyword in self.search_keywords])
        query += '&queryLang=en%3Ade%3Afr'
        return base_url + query

    def add_random_delay(self, min_seconds=1, max_seconds=3):
        """Add a random delay to mimic human behavior."""
        time.sleep(random.uniform(min_seconds, max_seconds))

    def get_page_html(self, retries=3):
        """
        Navigate to the constructed URL and return the page HTML.
        Retry the operation if a timeout occurs.

        Args:
            retries (int): Number of retry attempts.

        Returns:
            str: The page HTML, or None if all retries fail.
        """
        url = self.construct_search_url()
        for attempt in range(retries):
            try:
                print(f"Navigating to: {url} (Attempt {attempt + 1})")
                self.driver.get(url)
                WebDriverWait(self.driver, 30).until(
                    EC.presence_of_element_located((By.TAG_NAME, "body"))
                )

                # Add a random delay to mimic human behavior
                self.add_random_delay(3, 5)

                # Return the page HTML
                return self.driver.page_source

            except TimeoutException:
                print(f"Timed out waiting for the page to load. Retrying ({attempt + 1}/{retries})...")
                if attempt == retries - 1:
                    print("Max retries reached. Unable to load the page.")
                    return None
            except Exception as e:
                print(f"An error occurred: {e}")
                return None

    def download_csv(self, retries=3, max_results=500):
        """
        Complete the sequence of clicking:
        1. More Options button
        2. Download dropdown
        3. List (CSV) option
        4. Handle download dialog by:
           - Setting the "To" value to max_results (e.g., 500)
           - Clicking the Download button
        
        Args:
            retries (int): Number of retry attempts for the entire sequence.
            max_results (int): Maximum number of results to download (1-500).

        Returns:
            bool: True if the download sequence was successful, False otherwise.
        """
        for attempt in range(retries):
            try:
                print(f"Attempting download sequence (Attempt {attempt + 1})...")
                
                # Step 1: Click "More Options" button
                print("Looking for More Options button...")
                more_options_selector = "#more-options-selector--publication-list-header"
                more_options_button = WebDriverWait(self.driver, 30).until(
                    EC.element_to_be_clickable((By.CSS_SELECTOR, more_options_selector))
                )
                
                # Try to click, but handle intercepted clicks
                try:
                    print("More Options button found. Clicking...")
                    more_options_button.click()
                except ElementClickInterceptedException:
                    print("Click intercepted, trying JavaScript click...")
                    self.driver.execute_script('document.querySelector("#more-options-selector--publication-list-header").click()', more_options_button)
                    
                self.add_random_delay(2, 3)
                print('More Options clicked successfully')
                
                # Step 2: Click "Download" section in the dropdown
                print("Looking for Download section...")
                # Use a more general selector to find the Download section
                # This uses contains() to match the text rather than a fixed CSS path
                download_section_xpath = "/html/body/div[2]/div[3]/ul/section[1]"
                download_section = WebDriverWait(self.driver, 10).until(
                    EC.element_to_be_clickable((By.XPATH, download_section_xpath))
                )
                
                try:
                    print("Download section found. Clicking...")
                    download_section.click()
                except ElementClickInterceptedException:
                    print("Click intercepted, trying JavaScript click...")
                    self.driver.execute_script('document.querySelector("#simple-dropdown > div.prod-jss1034.prod-jss966.prod-jss969.prod-jss1045 > ul > section:nth-child(1)").click()', download_section)
                    
                self.add_random_delay(1, 2)
                print('Download section clicked successfully')
                
                # Step 3: Click "List (CSV)" option
                print("Looking for List (CSV) option...")
                # Use contains() with the XPATH to find the CSV option based on text
                csv_option_xpath = "/html/body/div[2]/div[3]/ul/li[2]"
                csv_option = WebDriverWait(self.driver, 10).until(
                    EC.element_to_be_clickable((By.XPATH, csv_option_xpath))
                )
                
                try:
                    print("List (CSV) option found. Clicking...")
                    csv_option.click()
                except ElementClickInterceptedException:
                    print("Click intercepted, trying JavaScript click...")
                    self.driver.execute_script('document.querySelector("#simple-dropdown > div.prod-jss1034.prod-jss966.prod-jss969.prod-jss1045 > ul > li:nth-child(3)").click()', csv_option)
                    
                self.add_random_delay(2, 3)
                print('List (CSV) option clicked successfully')
                
                # Step 4: Handle the download dialog
                print("Waiting for download dialog to appear...")
                
                # Wait for the dialog to appear
                download_dialog_xpath = "/html/body/div[2]/div[3]/div/div"
                WebDriverWait(self.driver, 10).until(
                    EC.presence_of_element_located((By.XPATH, download_dialog_xpath))
                )
                print("Download dialog appeared")
                
                # Find the "To" input field
                to_input_xpath = "/html/body/div[2]/div[3]/div/div/div/div[1]/input[2]"
                to_input = WebDriverWait(self.driver, 10).until(
                    EC.presence_of_element_located((By.XPATH, to_input_xpath))
                )
                
                # Clear the input and set it to max_results
                print(f"Setting maximum results to {max_results}...")
                to_input.clear()
                to_input.send_keys(str(max_results))
                self.add_random_delay(1, 2)
                
                # Click the Download button in the dialog
                download_button_xpath = "/html/body/div[2]/div[3]/div/div/div/button"
                download_button = WebDriverWait(self.driver, 10).until(
                    EC.element_to_be_clickable((By.XPATH, download_button_xpath))
                )
                
                try:
                    print("Download button found. Clicking...")
                    download_button.click()
                except ElementClickInterceptedException:
                    print("Click intercepted, trying JavaScript click...")
                    self.driver.execute_script('document.querySelector("body > div.prod-jss12 > div.prod-jss15.prod-jss13 > div > div > div > button").click()', download_button)
                
                print("Download button clicked")
                
                # Wait for a moment to ensure the download starts
                self.add_random_delay(3, 5)
                
                # Check if there are any error messages
                try:
                    error_message = self.driver.find_element(By.XPATH, "//div[contains(@class, 'download-modal__validation')]//span")
                    if error_message.is_displayed() and error_message.text.strip():
                        print(f"Error in download dialog: {error_message.text}")
                        return False
                except:
                    # No error message found, continue
                    pass
                
                print("Download sequence completed successfully")
                return True
                
            except TimeoutException as e:
                print(f"Timeout during download sequence: {e}")
                if attempt == retries - 1:
                    print("Max retries reached. Download sequence failed.")
                    return False
            except Exception as e:
                print(f"Error during download sequence: {e}")
                if attempt == retries - 1:
                    print("Max retries reached. Download sequence failed.")
                    return False
                
            # If we reach here, there was an error and we need to try again
            # Refresh the page before the next attempt
            try:
                self.driver.refresh()
                WebDriverWait(self.driver, 30).until(
                    EC.presence_of_element_located((By.TAG_NAME, "body"))
                )
                self.add_random_delay(3, 5)
            except Exception as e:
                print(f"Error refreshing page: {e}")

        return False

    def close(self):
        """Close the browser when done."""
        if self.driver:
            self.driver.quit()




In [6]:
if __name__ == '__main__':
    # Define the search keywords
    search_keywords = ["hydrogen", "battery", "electric vehicle", "charging"]

    # Initialize the scraper with the search keywords
    scraper = EspacenetScraper(search_keywords, headless=False)  # Set headless to False to see the browser in action

    try:
        # Get the page HTML
        html = scraper.get_page_html(retries=3)
        if html:
            print("Page HTML retrieved successfully.")

            # Perform the download sequence with max 500 results
            if scraper.download_csv(retries=3, max_results=500):
                print("CSV download initiated successfully.")
                # Wait a bit to ensure the download starts
                time.sleep(10)
                print("Download should be complete or in progress.")
            else:
                print("Failed to download CSV.")

    finally:
        # Close the browser
        scraper.close()
        print("Scraper closed.")

NameError: name 'uc' is not defined

In [3]:
from bs4 import BeautifulSoup
import pandas as pd
import time
import random
import undetected_chromedriver as uc
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, ElementClickInterceptedException

class EspacenetScraper:
    def __init__(self, search_keywords, headless=False):
        """Initialize the scraper with configurable options and search keywords."""
        self.search_keywords = search_keywords
        options = uc.ChromeOptions()
        if headless:
            options.add_argument('--headless')  # Run in headless mode

        options.add_argument('--disable-blink-features=AutomationControlled')
        self.driver = uc.Chrome(options=options)
        self.driver.set_page_load_timeout(30)
        self.driver.set_window_size(1600, 1300)

    def construct_search_url(self):
        """Construct the search URL based on the provided keywords and their search fields."""
        base_url = 'https://worldwide.espacenet.com/patent/search?q='
        
        # Mapping of search fields to Espacenet query parameters
        field_mapping = {
            'title': 'ti',
            'abstract': 'ab',
            'claims': 'cl',
            'title,abstract or claims': 'ctxt' ,
            'all text fields' : 'ftxt',
            'title or abstract' : 'ta',
            'description' : 'desc',
            'all text fields or names' : 'nftxt',
            'title , abstract or names' : 'ntxt'
              # Full text search
        }
        
        query_parts = []
        for keyword, field in self.search_keywords.items():
            field_param = field_mapping.get(field, 'ctxt')  # Default to 'ctxt' if field is unknown
            query_parts.append(f'{field_param} = "{keyword}"')
        
        query = ' AND '.join(query_parts)
        query += '&queryLang=en%3Ade%3Afr'
        
        return base_url + query

    def add_random_delay(self, min_seconds=1, max_seconds=3):
        """Add a random delay to mimic human behavior."""
        time.sleep(random.uniform(min_seconds, max_seconds))

    def get_page_html(self, retries=3):
        """
        Navigate to the constructed URL and return the page HTML.
        Retry the operation if a timeout occurs.

        Args:
            retries (int): Number of retry attempts.

        Returns:
            str: The page HTML, or None if all retries fail.
        """
        url = self.construct_search_url()
        for attempt in range(retries):
            try:
                print(f"Navigating to: {url} (Attempt {attempt + 1})")
                self.driver.get(url)
                WebDriverWait(self.driver, 30).until(
                    EC.presence_of_element_located((By.TAG_NAME, "body"))
                )

                # Add a random delay to mimic human behavior
                self.add_random_delay(3, 5)

                # Return the page HTML
                return self.driver.page_source

            except TimeoutException:
                print(f"Timed out waiting for the page to load. Retrying ({attempt + 1}/{retries})...")
                if attempt == retries - 1:
                    print("Max retries reached. Unable to load the page.")
                    return None
            except Exception as e:
                print(f"An error occurred: {e}")
                return None

    def download_csv(self, retries=3, max_results=500):
        """
        Complete the sequence of clicking:
        1. More Options button
        2. Download dropdown
        3. List (CSV) option
        4. Handle download dialog by:
           - Setting the "To" value to max_results (e.g., 500)
           - Clicking the Download button
        
        Args:
            retries (int): Number of retry attempts for the entire sequence.
            max_results (int): Maximum number of results to download (1-500).

        Returns:
            bool: True if the download sequence was successful, False otherwise.
        """
        for attempt in range(retries):
            try:
                print(f"Attempting download sequence (Attempt {attempt + 1})...")
                
                # Step 1: Click "More Options" button
                print("Looking for More Options button...")
                more_options_selector = "#more-options-selector--publication-list-header"
                more_options_button = WebDriverWait(self.driver, 30).until(
                    EC.element_to_be_clickable((By.CSS_SELECTOR, more_options_selector))
                )
                
                # Try to click, but handle intercepted clicks
                try:
                    print("More Options button found. Clicking...")
                    more_options_button.click()
                except ElementClickInterceptedException:
                    print("Click intercepted, trying JavaScript click...")
                    self.driver.execute_script('document.querySelector("#more-options-selector--publication-list-header").click()', more_options_button)
                    
                self.add_random_delay(2, 3)
                print('More Options clicked successfully')
                
                # Step 2: Click "Download" section in the dropdown
                print("Looking for Download section...")
                # Use a more general selector to find the Download section
                # This uses contains() to match the text rather than a fixed CSS path
                download_section_xpath = "/html/body/div[2]/div[3]/ul/section[1]"
                download_section = WebDriverWait(self.driver, 10).until(
                    EC.element_to_be_clickable((By.XPATH, download_section_xpath))
                )
                
                try:
                    print("Download section found. Clicking...")
                    download_section.click()
                except ElementClickInterceptedException:
                    print("Click intercepted, trying JavaScript click...")
                    self.driver.execute_script('document.querySelector("#simple-dropdown > div.prod-jss1034.prod-jss966.prod-jss969.prod-jss1045 > ul > section:nth-child(1)").click()', download_section)
                    
                self.add_random_delay(1, 2)
                print('Download section clicked successfully')
                
                # Step 3: Click "List (CSV)" option
                print("Looking for List (CSV) option...")
                # Use contains() with the XPATH to find the CSV option based on text
                csv_option_xpath = "/html/body/div[2]/div[3]/ul/li[2]"
                csv_option = WebDriverWait(self.driver, 10).until(
                    EC.element_to_be_clickable((By.XPATH, csv_option_xpath))
                )
                
                try:
                    print("List (CSV) option found. Clicking...")
                    csv_option.click()
                except ElementClickInterceptedException:
                    print("Click intercepted, trying JavaScript click...")
                    self.driver.execute_script('document.querySelector("#simple-dropdown > div.prod-jss1034.prod-jss966.prod-jss969.prod-jss1045 > ul > li:nth-child(3)").click()', csv_option)
                    
                self.add_random_delay(2, 3)
                print('List (CSV) option clicked successfully')
                
                # Step 4: Handle the download dialog
                print("Waiting for download dialog to appear...")
                
                # Wait for the dialog to appear
                download_dialog_xpath = "/html/body/div[2]/div[3]/div/div"
                WebDriverWait(self.driver, 10).until(
                    EC.presence_of_element_located((By.XPATH, download_dialog_xpath))
                )
                print("Download dialog appeared")
                
                # Find the "To" input field
                to_input_xpath = "/html/body/div[2]/div[3]/div/div/div/div[1]/input[2]"
                to_input = WebDriverWait(self.driver, 10).until(
                    EC.presence_of_element_located((By.XPATH, to_input_xpath))
                )
                
                # Clear the input and set it to max_results
                print(f"Setting maximum results to {max_results}...")
                to_input.clear()
                to_input.send_keys(str(max_results))
                self.add_random_delay(1, 2)
                
                # Click the Download button in the dialog
                download_button_xpath = "/html/body/div[2]/div[3]/div/div/div/button"
                download_button = WebDriverWait(self.driver, 10).until(
                    EC.element_to_be_clickable((By.XPATH, download_button_xpath))
                )
                
                try:
                    print("Download button found. Clicking...")
                    download_button.click()
                except ElementClickInterceptedException:
                    print("Click intercepted, trying JavaScript click...")
                    self.driver.execute_script('document.querySelector("body > div.prod-jss12 > div.prod-jss15.prod-jss13 > div > div > div > button").click()', download_button)
                
                print("Download button clicked")
                
                # Wait for a moment to ensure the download starts
                self.add_random_delay(3, 5)
                
                # Check if there are any error messages
                try:
                    error_message = self.driver.find_element(By.XPATH, "//div[contains(@class, 'download-modal__validation')]//span")
                    if error_message.is_displayed() and error_message.text.strip():
                        print(f"Error in download dialog: {error_message.text}")
                        return False
                except:
                    # No error message found, continue
                    pass
                
                print("Download sequence completed successfully")
                return True
                
            except TimeoutException as e:
                print(f"Timeout during download sequence: {e}")
                if attempt == retries - 1:
                    print("Max retries reached. Download sequence failed.")
                    return False
            except Exception as e:
                print(f"Error during download sequence: {e}")
                if attempt == retries - 1:
                    print("Max retries reached. Download sequence failed.")
                    return False
                
            # If we reach here, there was an error and we need to try again
            # Refresh the page before the next attempt
            try:
                self.driver.refresh()
                WebDriverWait(self.driver, 30).until(
                    EC.presence_of_element_located((By.TAG_NAME, "body"))
                )
                self.add_random_delay(3, 5)
            except Exception as e:
                print(f"Error refreshing page: {e}")

        return False

    def close(self):
        """Close the browser when done."""
        if self.driver:
            self.driver.quit()




        'title': 'ti',
            'abstract': 'ab',
            'claims': 'cl',
            'title,abstract or claims': 'ctxt' ,
            'all text fields' : 'ftxt',
            'title or abstract' : 'ta',
            'description' : 'desc',
            'all text fields or names' : 'nftxt',
            'title , abstract or names' : 'ntxt'

In [4]:
if __name__ == '__main__':
    # Define the search keywords with their search fields
    search_keywords = {
        "Autonomous ": "title,abstract and claims",
        "Vehicles":"title,abstract and claims"
        

    }

    # Initialize the scraper with the search keywords
    scraper = EspacenetScraper(search_keywords, headless=False)  # Set headless to False to see the browser in action

    try:
        # Construct and print the search URL
        search_url = scraper.construct_search_url()
        print("Constructed Search URL:", search_url)
        
        # Get the page HTML
        html = scraper.get_page_html(retries=3)
        if html:
            print("Page HTML retrieved successfully.")

            # Perform the download sequence with max 500 results
            if scraper.download_csv(retries=3, max_results=500):
                print("CSV download initiated successfully.")
                # Wait a bit to ensure the download starts
                time.sleep(10)
                print("Download should be complete or in progress.")
            else:
                print("Failed to download CSV.")

    finally:
        # Close the browser
        scraper.close()
        print("Scraper closed.")


2025-04-21 19:18:26,780 - INFO - patching driver executable C:\Users\tasni\appdata\roaming\undetected_chromedriver\undetected_chromedriver.exe


Constructed Search URL: https://worldwide.espacenet.com/patent/search?q=ctxt = "Autonomous " AND ctxt = "Vehicles"&queryLang=en%3Ade%3Afr
Navigating to: https://worldwide.espacenet.com/patent/search?q=ctxt = "Autonomous " AND ctxt = "Vehicles"&queryLang=en%3Ade%3Afr (Attempt 1)
Page HTML retrieved successfully.
Attempting download sequence (Attempt 1)...
Looking for More Options button...
More Options button found. Clicking...
More Options clicked successfully
Looking for Download section...
Download section found. Clicking...
Download section clicked successfully
Looking for List (CSV) option...
List (CSV) option found. Clicking...
List (CSV) option clicked successfully
Waiting for download dialog to appear...
Download dialog appeared
Setting maximum results to 500...
Download button found. Clicking...
Download button clicked
Download sequence completed successfully
CSV download initiated successfully.
Download should be complete or in progress.
Scraper closed.


In [5]:
import os
import glob

# Get the Downloads folder path
downloads_folder = os.path.expanduser("~/Downloads")

# Get all CSV files in the Downloads folder
list_of_files = glob.glob(os.path.join(downloads_folder, "*.csv"))

if list_of_files:  # Ensure there are CSV files
    latest_file = max(list_of_files, key=os.path.getmtime)
    print("Latest downloaded file:", latest_file)

    # Read the latest CSV file into a DataFrame
    import pandas as pd
    df = pd.read_csv(latest_file,delimiter=';', skiprows=7)
    df.head()
else:
    print("No CSV files found in Downloads.")


Latest downloaded file: C:\Users\tasni/Downloads\Résultat_de_la_recherche_dans_Espacenet_20250421_1918.csv


In [6]:
import pandas as pd

# Assuming df is your DataFrame
df.rename(columns={
    'Titre': 'Title',
    'Inventeurs': 'Inventors',
    'Demandeurs': 'Applicants',
    'Numéro de publication': 'Publication number',
    'Priorité la plus ancienne': 'Earliest priority',
    'CIB': 'IPC',
    'CPC': 'CPC',
    'Date de publication': 'Publication date',
    'Publication la plus ancienne': 'Earliest publication',
    'Numéro de famille': 'Family number'
}, inplace=True)


In [7]:

df[['first publication date','second publication date']] = df['Publication date'].str.split(' ' , n=1 , expand= True)
df['second publication date'] = df['second publication date'].str.strip('\n')
df['second publication date'] = df['second publication date'].str.strip('\r')
df['second publication date'] = df['second publication date'].str.strip('\n')


In [8]:
#first filing country 
df[['first publication number', 'second publication number']] = df['Publication number'].str.split(' ' , n=1 , expand=True)



In [8]:
df.head()

Unnamed: 0,No,Title,Inventors,Applicants,Publication number,Earliest priority,IPC,CPC,Publication date,Earliest publication,Family number,Unnamed: 11,first publication date,second publication date,first publication number,second publication number
0,1,APPAREIL D'AUTOMATISATION À ANALYSE DE RÉSEAU ...,HILSCHER HANS-JÜRGEN [DE],HILSCHER GES FUER SYSTEMAUTOMATION MBH [DE] \r...,EP3648416A1 \r\nEP3648416B1,2018-11-05,H04L12/40 \r\nH04L12/413,H04L12/40032 (EP) \r\nH04L12/413 (EP) \r\nH04L...,2020-05-06 \r\n2023-08-16,2020-05-06,68470224,,2020-05-06,2023-08-16,EP3648416A1,\r\nEP3648416B1
1,2,SYSTEM AND METHOD FOR SELECTION OF CLOUD SERVI...,RAMAMURTHY ARUN [IN] \r\nGHAROTE MANGESH SHARA...,TATA CONSULTANCY SERVICES LTD [IN],US11405415B2 \r\nUS2021176266A1,2019-12-06,H04L29/06 \r\nH04L29/08 \r\nG06F9/455 \r\nG06F...,"G06F9/45558 (US) \r\nG06F9/5072 (EP,US) \r\nG0...",2021-06-10 \r\n2022-08-02,2021-06-09,69941192,,2021-06-10,2022-08-02,US11405415B2,\r\nUS2021176266A1
2,3,SECURITY-ENHANCED CLOUD SYSTEM AND SECURITY MA...,LEE CHUNG JONG [KR],LEE CHUNG JONG [KR],US2014250500A1 \r\nUS9124579B2,2011-09-29,H04L29/06 \r\nG06F21/32 \r\nG06K9/00 \r\nH04L2...,G06F15/16 (KR) \r\nG06F21/30 (KR) \r\nG06F21/3...,2014-09-04 \r\n2015-09-01,2013-04-04,47996604,,2014-09-04,2015-09-01,US2014250500A1,\r\nUS9124579B2
3,4,"METHOD, APPARATUS, AND SYSTEM FOR PROTECTING C...",ZHANG JINGBIN [CN] \r\nHE CHENGDONG [CN],HUAWEI TECH CO LTD [CN],US2014126723A1 \r\nUS9203614B2,2011-11-09,H04L9/08 \r\nH04L29/06 \r\nH04L9/14,"H04L9/0822 (EP,US) \r\nH04L9/0825 (EP,US) \r\n...",2014-05-08 \r\n2015-12-01,2012-12-06,47258336,,2014-05-08,2015-12-01,US2014126723A1,\r\nUS9203614B2
4,5,TECHNIQUES FOR CLOUD SECURITY MONITORING AND T...,KIRTI GANESH [US] \r\nGUPTA ROHIT [US] \r\nBIS...,ORACLE INT CORP [US],US10958679B2 \r\nUS2017295199A1,2013-12-13,H04L29/06,"H04L63/1416 (EP,US) \r\nH04L63/1425 (US) \r\nH...",2017-10-12 \r\n2021-03-23,2015-06-18,53369924,,2017-10-12,2021-03-23,US10958679B2,\r\nUS2017295199A1


In [11]:
df_patents = df

In [12]:
df_patents = df_patents.applymap(
    lambda x: x.replace('\r\n', ' ') if isinstance(x, str) else x
)


  df_patents = df_patents.applymap(


In [13]:
df_patents.rename(columns={'No': 'id'}, inplace=True)


In [14]:
if 'Unnamed: 11' in df_patents.columns:
    df_patents.drop(columns=['Unnamed: 11','Publication date'], inplace=True)


In [15]:
df_patents['Family number'] = pd.to_numeric(df_patents['Family number'], errors='coerce')


In [18]:
print(df_patents.head())
print(df_patents.shape)


   id                                              Title  \
0   1  APPAREIL D'AUTOMATISATION À ANALYSE DE RÉSEAU ...   
1   2  SYSTEM AND METHOD FOR SELECTION OF CLOUD SERVI...   
2   3  SECURITY-ENHANCED CLOUD SYSTEM AND SECURITY MA...   
3   4  METHOD, APPARATUS, AND SYSTEM FOR PROTECTING C...   
4   5  TECHNIQUES FOR CLOUD SECURITY MONITORING AND T...   

                                           Inventors  \
0                          HILSCHER HANS-JÜRGEN [DE]   
1  RAMAMURTHY ARUN [IN]  GHAROTE MANGESH SHARAD [...   
2                                LEE CHUNG JONG [KR]   
3              ZHANG JINGBIN [CN]  HE CHENGDONG [CN]   
4  KIRTI GANESH [US]  GUPTA ROHIT [US]  BISWAS KA...   

                                          Applicants  \
0  HILSCHER GES FUER SYSTEMAUTOMATION MBH [DE]  H...   
1                 TATA CONSULTANCY SERVICES LTD [IN]   
2                                LEE CHUNG JONG [KR]   
3                            HUAWEI TECH CO LTD [CN]   
4                     

In [18]:
#parallel processing chatgpt
import concurrent.futures
import os
import requests
import time
from urllib.parse import quote
import pandas as pd
from dotenv import load_dotenv

# Global token cache
TOKEN = None
TOKEN_EXPIRY = 0

# Constants for API endpoints
TOKEN_URL = "https://ops.epo.org/3.2/auth/accesstoken"
BASE_URL = "https://ops.epo.org/3.2/rest-services"

# Load credentials from .env file
load_dotenv()
CONSUMER_KEY = os.getenv("CONSUMER_KEY").strip()
CONSUMER_SECRET = os.getenv("CONSUMER_SECRET").strip()

def get_access_token() -> str:
    global TOKEN, TOKEN_EXPIRY
    if TOKEN and time.time() < TOKEN_EXPIRY:
        return TOKEN
    data = {
        "grant_type": "client_credentials",
        "client_id": CONSUMER_KEY,
        "client_secret": CONSUMER_SECRET
    }
    headers = {"Content-Type": "application/x-www-form-urlencoded"}
    response = requests.post(TOKEN_URL, data=data, headers=headers, timeout=15)
    response.raise_for_status()
    TOKEN = response.json()["access_token"]
    TOKEN_EXPIRY = time.time() + 3500
    return TOKEN

def validate_patent_number(patent: str) -> bool:
    return bool(patent and len(patent.strip()) >= 4)

def extract_jurisdictions_and_members(data: dict) -> dict:
    jurisdictions = set()
    family_members = []
    world_data = data.get('ops:world-patent-data', {})
    patent_family = world_data.get('ops:patent-family', {})
    members = patent_family.get('ops:family-member', []) or []
    if isinstance(members, dict):
        members = [members]

    for member in members:
        pub_ref = member.get('publication-reference', {})
        docs = pub_ref.get('document-id', []) or []
        if isinstance(docs, dict):
            docs = [docs]
        for doc in docs:
            if doc.get('@document-id-type') == 'docdb':
                country = doc.get('country')
                country = country.get('$') if isinstance(country, dict) else country
                doc_number = doc.get('doc-number')
                doc_number = doc_number.get('$') if isinstance(doc_number, dict) else doc_number
                kind = doc.get('kind')
                kind = kind.get('$') if isinstance(kind, dict) else kind
                if country and doc_number and kind:
                    jurisdictions.add(country)
                    family_members.append(f"{country}{doc_number}{kind}")
    return {
        'jurisdictions': sorted(jurisdictions),
        'family_members': sorted(set(family_members))
    }

def process_patent(patent: str) -> dict:
    if not validate_patent_number(patent):
        return {'jurisdictions': None, 'family_members': None}
    try:
        token = get_access_token()
        url = f"{BASE_URL}/family/publication/docdb/{quote(patent)}"
        headers = {"Authorization": f"Bearer {token}", "Accept": "application/json"}
        response = requests.get(url, headers=headers, timeout=15)
        if response.status_code in (403, 404):
            return {'jurisdictions': None, 'family_members': None}
        response.raise_for_status()
        return extract_jurisdictions_and_members(response.json())
    except Exception:
        return {'jurisdictions': None, 'family_members': None}

def process_dataframe_parallel(df: pd.DataFrame, patent_col: str, max_workers: int = 10) -> pd.DataFrame:
    if patent_col not in df.columns:
        raise ValueError(f"Column '{patent_col}' not found in DataFrame")
    patents = df[patent_col].tolist()
    results = {}

    # Use ThreadPoolExecutor for I/O-bound API calls
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        # Submit all tasks
        future_to_patent = {executor.submit(process_patent, p): p for p in patents}
        for future in concurrent.futures.as_completed(future_to_patent):
            patent = future_to_patent[future]
            try:
                results[patent] = future.result()
            except Exception:
                results[patent] = {'jurisdictions': None, 'family_members': None}
            # Optional small sleep to space requests
            time.sleep(0.1)

    df['family_jurisdictions'] = df[patent_col].map(lambda p: results[p]['jurisdictions'])
    df['family_members'] = df[patent_col].map(lambda p: results[p]['family_members'])
    return df

# Example usage:
if __name__ == "__main__":
    #df = pd.read_csv('your_patents.csv')
    processed_df = process_dataframe_parallel(df, 'first publication number', max_workers=20)
    processed_df[['first publication number', 'family_jurisdictions', 'family_members']]



In [19]:
processed_df[['first publication number', 'family_jurisdictions', 'family_members']].head(30)

Unnamed: 0,first publication number,family_jurisdictions,family_members
0,EP3648416A1,"[DE, EP]","[DE102018008674A1, EP3648416A1, EP3648416B1]"
1,US11405415B2,,
2,US2014250500A1,,
3,US2014126723A1,"[CN, EP, US, WO]","[CN103262491A, EP2704389A1, EP2704389A4, EP270..."
4,US10958679B2,,
5,US2013066940A1,,
6,US11902329B2,,
7,US11546360B2,"[AU, CA, DK, EP, IL, JP, SG, US, WO, ZA]","[AU2019201137A1, AU2019201137B2, AU2020210203A..."
8,US9300660B1,,
9,WO2019006637A1,[WO],[WO2019006637A1]


In [None]:
import concurrent.futures
import os
import requests
import time
import logging
from urllib.parse import quote
import pandas as pd
from dotenv import load_dotenv
import random
from functools import lru_cache

# Set up logging
logging.basicConfig(level=logging.INFO, 
                    format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Global token cache
TOKEN = None
TOKEN_EXPIRY = 0

# Constants for API endpoints
TOKEN_URL = "https://ops.epo.org/3.2/auth/accesstoken"
BASE_URL = "https://ops.epo.org/3.2/rest-services"

# Rate limiting settings
REQUEST_INTERVAL = 0.2  # seconds between requests
MAX_RETRIES = 3

# Load credentials from .env file
load_dotenv()
CONSUMER_KEY = os.getenv("CONSUMER_KEY").strip()
CONSUMER_SECRET = os.getenv("CONSUMER_SECRET").strip()

def get_access_token() -> str:
    global TOKEN, TOKEN_EXPIRY
    if TOKEN and time.time() < TOKEN_EXPIRY:
        return TOKEN
    
    logger.info("Getting new access token")
    data = {
        "grant_type": "client_credentials",
        "client_id": CONSUMER_KEY,
        "client_secret": CONSUMER_SECRET
    }
    headers = {"Content-Type": "application/x-www-form-urlencoded"}
    
    for attempt in range(MAX_RETRIES):
        try:
            response = requests.post(TOKEN_URL, data=data, headers=headers, timeout=15)
            response.raise_for_status()
            TOKEN = response.json()["access_token"]
            TOKEN_EXPIRY = time.time() + 3500  # Slightly less than 1 hour
            logger.info("New token acquired")
            return TOKEN
        except Exception as e:
            wait_time = (2 ** attempt) + random.uniform(0, 1)
            logger.warning(f"Failed to get token (attempt {attempt+1}/{MAX_RETRIES}): {str(e)}. Retrying in {wait_time:.2f}s")
            time.sleep(wait_time)
    
    logger.error("Failed to get access token after multiple attempts")
    raise Exception("Failed to get access token")

def validate_patent_number(patent: str) -> bool:
    return bool(patent and isinstance(patent, str) and len(patent.strip()) >= 4)

@lru_cache(maxsize=1000)
def process_patent_with_cache(patent: str, token: str) -> dict:
    """Cached version of process_patent to avoid redundant API calls"""
    return process_patent_internal(patent, token)

def process_patent_internal(patent: str, token: str) -> dict:
    """Internal function that actually processes the patent"""
    if not validate_patent_number(patent):
        logger.warning(f"Invalid patent number: '{patent}'")
        return {'jurisdictions': None, 'family_members': None}
    
    for attempt in range(MAX_RETRIES):
        try:
            url = f"{BASE_URL}/family/publication/docdb/{quote(patent)}"
            headers = {"Authorization": f"Bearer {token}", "Accept": "application/json"}
            
            logger.debug(f"Requesting data for patent: {patent}")
            response = requests.get(url, headers=headers, timeout=15)
            
            if response.status_code == 401:  # Unauthorized - token expired
                logger.warning("Token expired, getting a new one")
                return None  # Signal to the caller that we need a new token
                
            if response.status_code == 403:
                logger.warning(f"Access forbidden for patent {patent}. Possibly rate limited.")
                wait_time = (2 ** attempt) + random.uniform(0, 1)
                time.sleep(wait_time)
                continue
                
            if response.status_code == 404:
                logger.info(f"No data found for patent {patent}")
                return {'jurisdictions': None, 'family_members': None}
                
            response.raise_for_status()
            result = extract_jurisdictions_and_members(response.json())
            logger.debug(f"Successfully processed patent {patent}")
            return result
            
        except Exception as e:
            wait_time = (2 ** attempt) + random.uniform(0, 1)
            logger.warning(f"Error processing patent {patent} (attempt {attempt+1}/{MAX_RETRIES}): {str(e)}. Retrying in {wait_time:.2f}s")
            time.sleep(wait_time)
    
    logger.error(f"Failed to process patent {patent} after {MAX_RETRIES} attempts")
    return {'jurisdictions': None, 'family_members': None}

def extract_jurisdictions_and_members(data: dict) -> dict:
    try:
        jurisdictions = set()
        family_members = []
        world_data = data.get('ops:world-patent-data', {})
        patent_family = world_data.get('ops:patent-family', {})
        members = patent_family.get('ops:family-member', []) or []
        if isinstance(members, dict):
            members = [members]

        for member in members:
            pub_ref = member.get('publication-reference', {})
            docs = pub_ref.get('document-id', []) or []
            if isinstance(docs, dict):
                docs = [docs]
            for doc in docs:
                if doc.get('@document-id-type') == 'docdb':
                    country = doc.get('country')
                    country = country.get('$') if isinstance(country, dict) else country
                    doc_number = doc.get('doc-number')
                    doc_number = doc_number.get('$') if isinstance(doc_number, dict) else doc_number
                    kind = doc.get('kind')
                    kind = kind.get('$') if isinstance(kind, dict) else kind
                    if country and doc_number and kind:
                        jurisdictions.add(country)
                        family_members.append(f"{country}{doc_number}{kind}")
        return {
            'jurisdictions': sorted(jurisdictions),
            'family_members': sorted(set(family_members))
        }
    except Exception as e:
        logger.error(f"Error extracting jurisdictions and members: {str(e)}")
        return {'jurisdictions': None, 'family_members': None}

def process_patent(patent: str) -> dict:
    """Wrapper function to process a patent with proper token management"""
    # Skip None/NaN values
    if pd.isna(patent) or patent is None:
        return {'jurisdictions': None, 'family_members': None}
    
    # Get the current token
    token = get_access_token()
    
    # Try to process the patent
    result = process_patent_with_cache(patent, token)
    
    # If we got None, the token expired, get a new one and retry
    if result is None:
        token = get_access_token()  # Force a token refresh
        result = process_patent_with_cache(patent, token)
    
    return result

def process_dataframe_parallel(df: pd.DataFrame, patent_col: str, max_workers: int = 5) -> pd.DataFrame:
    """Process patents in parallel with better rate limiting and error handling"""
    if patent_col not in df.columns:
        raise ValueError(f"Column '{patent_col}' not found in DataFrame")
    
    # Make a copy to avoid modifying the original
    result_df = df.copy()
    patents = df[patent_col].tolist()
    results = {}
    
    logger.info(f"Starting to process {len(patents)} patents with {max_workers} workers")
    
    # Get an access token before we start processing
    get_access_token()
    
    # Use ThreadPoolExecutor for I/O-bound API calls
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        # Submit all tasks
        future_to_patent = {executor.submit(process_patent, p): p for p in patents}
        
        completed = 0
        for future in concurrent.futures.as_completed(future_to_patent):
            patent = future_to_patent[future]
            try:
                results[patent] = future.result()
                completed += 1
                if completed % 10 == 0:
                    logger.info(f"Processed {completed}/{len(patents)} patents")
            except Exception as e:
                logger.error(f"Unhandled exception for patent {patent}: {str(e)}")
                results[patent] = {'jurisdictions': None, 'family_members': None}
            
            # Space out requests to avoid rate limiting
            time.sleep(REQUEST_INTERVAL)
    
    logger.info(f"Completed processing {len(patents)} patents")
    
    # Apply results to the dataframe
    result_df['family_jurisdictions'] = result_df[patent_col].map(lambda p: results.get(p, {}).get('jurisdictions'))
    result_df['family_members'] = result_df[patent_col].map(lambda p: results.get(p, {}).get('family_members'))
    
    # Log statistics
    success_count = sum(1 for r in results.values() if r['jurisdictions'] is not None)
    logger.info(f"Successfully processed {success_count} out of {len(patents)} patents ({success_count/len(patents)*100:.1f}%)")
    
    return result_df

# Example usage:
if __name__ == "__main__":
    # df = pd.read_csv('your_patents.csv')
    processed_df = process_dataframe_parallel(df, 'first publication number', max_workers=5)
    processed_df[['first publication number', 'family_jurisdictions', 'family_members']]

2025-04-21 18:16:52,832 - INFO - Starting to process 500 patents with 5 workers
2025-04-21 18:16:52,833 - INFO - Getting new access token


2025-04-21 18:16:55,326 - INFO - New token acquired
2025-04-21 18:17:23,306 - ERROR - Failed to process patent US10958679B2 after 3 attempts
2025-04-21 18:17:23,634 - INFO - Processed 10/500 patents
2025-04-21 18:17:35,343 - ERROR - Failed to process patent US11546360B2 after 3 attempts
2025-04-21 18:17:42,477 - ERROR - Failed to process patent US2015100357A1 after 3 attempts
2025-04-21 18:17:42,879 - INFO - Processed 20/500 patents
2025-04-21 18:17:53,709 - ERROR - Failed to process patent US10142346B2 after 3 attempts
2025-04-21 18:18:01,450 - ERROR - Failed to process patent US2014053280A1 after 3 attempts
2025-04-21 18:18:01,450 - INFO - Processed 30/500 patents
2025-04-21 18:18:14,835 - INFO - Processed 40/500 patents
2025-04-21 18:18:23,180 - ERROR - Failed to process patent US2020007553A1 after 3 attempts
2025-04-21 18:18:23,303 - ERROR - Failed to process patent US11122426B2 after 3 attempts
2025-04-21 18:18:31,767 - INFO - Processed 50/500 patents
2025-04-21 18:18:39,115 - INF

In [19]:
import concurrent.futures
import os
import requests
import time
import logging
from urllib.parse import quote
import pandas as pd
from dotenv import load_dotenv
import random
from functools import lru_cache
import threading

# Set up logging
logging.basicConfig(level=logging.INFO, 
                    format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Global token cache
TOKEN = None
TOKEN_EXPIRY = 0

# Constants for API endpoints
TOKEN_URL = "https://ops.epo.org/3.2/auth/accesstoken"
BASE_URL = "https://ops.epo.org/3.2/rest-services"

# Rate limiting settings - much more conservative
REQUEST_INTERVAL = 1.0  # 1 second between requests
MAX_WORKERS = 3         # Limit concurrent connections
MAX_RETRIES = 5         # More retries
TIMEOUT = 30            # Longer timeout

# Rate limiter with token bucket algorithm
class RateLimiter:
    def __init__(self, rate=1, capacity=5):
        self.rate = rate            # tokens per second
        self.capacity = capacity    # maximum tokens
        self.tokens = capacity      # current tokens
        self.last_refill = time.time()
        self.lock = threading.Lock()
    
    def consume(self, tokens=1):
        with self.lock:
            self._refill()
            if self.tokens >= tokens:
                self.tokens -= tokens
                return True
            else:
                sleep_time = (tokens - self.tokens) / self.rate
                logger.debug(f"Rate limited, sleeping for {sleep_time:.2f}s")
                time.sleep(sleep_time)
                self._refill()
                self.tokens -= tokens
                return True
    
    def _refill(self):
        now = time.time()
        elapsed = now - self.last_refill
        new_tokens = elapsed * self.rate
        self.tokens = min(self.capacity, self.tokens + new_tokens)
        self.last_refill = now

# Create a rate limiter - 1 request per second, max 3 burst
rate_limiter = RateLimiter(rate=0.5, capacity=3)

# Load credentials from .env file
load_dotenv()
CONSUMER_KEY = os.getenv("CONSUMER_KEY").strip()
CONSUMER_SECRET = os.getenv("CONSUMER_SECRET").strip()

# Token lock to prevent multiple simultaneous token requests
token_lock = threading.Lock()

def get_access_token(force_refresh=False) -> str:
    global TOKEN, TOKEN_EXPIRY
    
    with token_lock:
        if not force_refresh and TOKEN and time.time() < TOKEN_EXPIRY:
            return TOKEN
        
        logger.info("Getting new access token")
        data = {
            "grant_type": "client_credentials",
            "client_id": CONSUMER_KEY,
            "client_secret": CONSUMER_SECRET
        }
        headers = {"Content-Type": "application/x-www-form-urlencoded"}
        
        for attempt in range(MAX_RETRIES):
            try:
                response = requests.post(TOKEN_URL, data=data, headers=headers, timeout=TIMEOUT)
                response.raise_for_status()
                TOKEN = response.json()["access_token"]
                TOKEN_EXPIRY = time.time() + 3500  # Slightly less than 1 hour
                logger.info("New token acquired")
                return TOKEN
            except Exception as e:
                wait_time = (2 ** attempt) + random.uniform(0, 1)
                logger.warning(f"Failed to get token (attempt {attempt+1}/{MAX_RETRIES}): {str(e)}. Retrying in {wait_time:.2f}s")
                time.sleep(wait_time)
        
        logger.error("Failed to get access token after multiple attempts")
        raise Exception("Failed to get access token")

def validate_patent_number(patent: str) -> bool:
    return bool(patent and isinstance(patent, str) and len(patent.strip()) >= 4)

@lru_cache(maxsize=1000)
def process_patent_with_cache(patent: str) -> dict:
    """Cached version of process_patent to avoid redundant API calls"""
    return process_patent_internal(patent)

def backoff_sleep(attempt, base=2.0, max_sleep=60):
    """Calculate and sleep for exponential backoff time with jitter"""
    sleep_time = min(base ** attempt + random.uniform(0, 1), max_sleep)
    logger.debug(f"Backing off for {sleep_time:.2f}s")
    time.sleep(sleep_time)

def process_patent_internal(patent: str) -> dict:
    """Internal function that actually processes the patent"""
    if not validate_patent_number(patent):
        logger.warning(f"Invalid patent number: '{patent}'")
        return {'jurisdictions': None, 'family_members': None}
    
    for attempt in range(MAX_RETRIES):
        try:
            # Consume a token from the rate limiter
            rate_limiter.consume()
            
            # Get a fresh token if needed
            token = get_access_token()
            
            url = f"{BASE_URL}/family/publication/docdb/{quote(patent)}"
            headers = {"Authorization": f"Bearer {token}", "Accept": "application/json"}
            
            logger.debug(f"Requesting data for patent: {patent}")
            response = requests.get(url, headers=headers, timeout=TIMEOUT)
            
            if response.status_code == 401:  # Unauthorized - token expired
                logger.warning("Token expired, getting a new one")
                get_access_token(force_refresh=True)
                backoff_sleep(attempt)
                continue
                
            if response.status_code == 403:
                logger.warning(f"Access forbidden for patent {patent}. Possibly rate limited.")
                backoff_sleep(attempt + 2)  # More aggressive backoff for rate limiting
                continue
                
            if response.status_code == 404:
                logger.info(f"No data found for patent {patent}")
                return {'jurisdictions': None, 'family_members': None}
                
            response.raise_for_status()
            result = extract_jurisdictions_and_members(response.json())
            logger.debug(f"Successfully processed patent {patent}")
            return result
            
        except requests.exceptions.Timeout:
            logger.warning(f"Timeout for patent {patent} (attempt {attempt+1}/{MAX_RETRIES})")
            backoff_sleep(attempt)
            continue
            
        except requests.exceptions.ConnectionError as e:
            logger.warning(f"Connection error for patent {patent} (attempt {attempt+1}/{MAX_RETRIES}): {str(e)}")
            backoff_sleep(attempt + 1)  # More aggressive backoff for connection errors
            continue
            
        except Exception as e:
            logger.warning(f"Error processing patent {patent} (attempt {attempt+1}/{MAX_RETRIES}): {str(e)}")
            backoff_sleep(attempt)
            continue
    
    logger.error(f"Failed to process patent {patent} after {MAX_RETRIES} attempts")
    return {'jurisdictions': None, 'family_members': None}

def extract_jurisdictions_and_members(data: dict) -> dict:
    try:
        jurisdictions = set()
        family_members = []
        world_data = data.get('ops:world-patent-data', {})
        patent_family = world_data.get('ops:patent-family', {})
        members = patent_family.get('ops:family-member', []) or []
        if isinstance(members, dict):
            members = [members]

        for member in members:
            pub_ref = member.get('publication-reference', {})
            docs = pub_ref.get('document-id', []) or []
            if isinstance(docs, dict):
                docs = [docs]
            for doc in docs:
                if doc.get('@document-id-type') == 'docdb':
                    country = doc.get('country')
                    country = country.get('$') if isinstance(country, dict) else country
                    doc_number = doc.get('doc-number')
                    doc_number = doc_number.get('$') if isinstance(doc_number, dict) else doc_number
                    kind = doc.get('kind')
                    kind = kind.get('$') if isinstance(kind, dict) else kind
                    if country and doc_number and kind:
                        jurisdictions.add(country)
                        family_members.append(f"{country}{doc_number}{kind}")
        return {
            'jurisdictions': sorted(jurisdictions),
            'family_members': sorted(set(family_members))
        }
    except Exception as e:
        logger.error(f"Error extracting jurisdictions and members: {str(e)}")
        return {'jurisdictions': None, 'family_members': None}

def process_patent(patent: str) -> dict:
    """Wrapper function to call the cached version"""
    # Skip None/NaN values
    if pd.isna(patent) or patent is None:
        return {'jurisdictions': None, 'family_members': None}
    return process_patent_with_cache(patent)

def process_dataframe_parallel(df: pd.DataFrame, patent_col: str, chunk_size=50) -> pd.DataFrame:
    """Process patents in smaller chunks to avoid overwhelming the API"""
    if patent_col not in df.columns:
        raise ValueError(f"Column '{patent_col}' not found in DataFrame")
    
    # Make a copy to avoid modifying the original
    result_df = df.copy()
    patents = df[patent_col].tolist()
    total_patents = len(patents)
    results = {}
    
    logger.info(f"Starting to process {total_patents} patents in chunks of {chunk_size}")
    
    # Get an access token before we start processing
    get_access_token()
    
    # Process in chunks to avoid overwhelming the API
    for chunk_start in range(0, total_patents, chunk_size):
        chunk_end = min(chunk_start + chunk_size, total_patents)
        current_chunk = patents[chunk_start:chunk_end]
        
        logger.info(f"Processing chunk {chunk_start//chunk_size + 1}/{(total_patents+chunk_size-1)//chunk_size} " +
                   f"(patents {chunk_start+1}-{chunk_end})")
        
        # Use ThreadPoolExecutor for I/O-bound API calls
        with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
            # Submit all tasks for this chunk
            future_to_patent = {executor.submit(process_patent, p): p for p in current_chunk}
            
            for future in concurrent.futures.as_completed(future_to_patent):
                patent = future_to_patent[future]
                try:
                    results[patent] = future.result()
                except Exception as e:
                    logger.error(f"Unhandled exception for patent {patent}: {str(e)}")
                    results[patent] = {'jurisdictions': None, 'family_members': None}
        
        # Calculate success rate for this chunk
        chunk_success = sum(1 for p in current_chunk if results.get(p, {}).get('jurisdictions') is not None)
        logger.info(f"Chunk complete: {chunk_success}/{len(current_chunk)} successful " +
                   f"({chunk_success/len(current_chunk)*100:.1f}%)")
        
        # If success rate is too low, pause to avoid rate limiting
        if chunk_success < len(current_chunk) * 0.5:
            pause_time = 60  # 1 minute pause
            logger.warning(f"Low success rate detected, pausing for {pause_time} seconds")
            time.sleep(pause_time)
    
    logger.info(f"Completed processing {total_patents} patents")
    
    # Apply results to the dataframe
    result_df['family_jurisdictions'] = result_df[patent_col].map(lambda p: results.get(p, {}).get('jurisdictions'))
    result_df['family_members'] = result_df[patent_col].map(lambda p: results.get(p, {}).get('family_members'))
    
    # Log statistics
    success_count = sum(1 for r in results.values() if r['jurisdictions'] is not None)
    logger.info(f"Successfully processed {success_count} out of {total_patents} patents ({success_count/total_patents*100:.1f}%)")
    
    return result_df

# Example usage:
if __name__ == "__main__":
    # df = pd.read_csv('your_patents.csv')
    processed_df = process_dataframe_parallel(df, 'first publication number', chunk_size=50)
    processed_df[['first publication number', 'family_jurisdictions', 'family_members']]

2025-04-21 18:34:44,767 - INFO - Starting to process 500 patents in chunks of 50
2025-04-21 18:34:44,768 - INFO - Getting new access token
2025-04-21 18:34:45,074 - INFO - New token acquired
2025-04-21 18:34:45,075 - INFO - Processing chunk 1/10 (patents 1-50)
2025-04-21 18:37:10,882 - INFO - Chunk complete: 50/50 successful (100.0%)
2025-04-21 18:37:10,883 - INFO - Processing chunk 2/10 (patents 51-100)
2025-04-21 18:40:33,596 - INFO - Chunk complete: 50/50 successful (100.0%)
2025-04-21 18:40:33,596 - INFO - Processing chunk 3/10 (patents 101-150)
2025-04-21 18:42:11,940 - INFO - Chunk complete: 50/50 successful (100.0%)
2025-04-21 18:42:11,941 - INFO - Processing chunk 4/10 (patents 151-200)
2025-04-21 18:43:52,078 - INFO - Chunk complete: 50/50 successful (100.0%)
2025-04-21 18:43:52,079 - INFO - Processing chunk 5/10 (patents 201-250)
2025-04-21 18:45:31,923 - INFO - Chunk complete: 50/50 successful (100.0%)
2025-04-21 18:45:31,924 - INFO - Processing chunk 6/10 (patents 251-300)


In [20]:
processed_df[['first publication number', 'family_jurisdictions', 'family_members']].head(20)

Unnamed: 0,first publication number,family_jurisdictions,family_members
0,EP3648416A1,"[DE, EP]","[DE102018008674A1, EP3648416A1, EP3648416B1]"
1,US11405415B2,"[EP, US]","[EP3832464A1, EP3832464B1, EP3832464C0, US1140..."
2,US2014250500A1,"[CN, EP, JP, KR, US, WO]","[CN103842985A, CN103842985B, EP2763048A2, EP27..."
3,US2014126723A1,"[CN, EP, US, WO]","[CN103262491A, EP2704389A1, EP2704389A4, EP270..."
4,US10958679B2,"[EP, US, WO]","[EP3080741A2, EP3080741A4, EP3080741B1, US1006..."
5,US2013066940A1,"[CN, EP, US, WO]","[CN102255933A, CN102255933B, EP2574005A1, EP25..."
6,US11902329B2,"[EP, US]","[EP3839734A1, EP3839734B1, US11902329B2, US202..."
7,US11546360B2,"[AU, CA, DK, EP, IL, JP, SG, US, WO, ZA]","[AU2019201137A1, AU2019201137B2, AU2020210203A..."
8,US9300660B1,"[EP, US, WO]","[EP3304845A1, EP3304845B1, EP3745669A1, US1083..."
9,WO2019006637A1,[WO],[WO2019006637A1]


parallelism + single token

In [16]:
import concurrent.futures
import os
import requests
import time
from urllib.parse import quote
import pandas as pd
from dotenv import load_dotenv
import random
import logging
from functools import lru_cache

# Set up logging
logging.basicConfig(level=logging.INFO, 
                    format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Global token cache
TOKEN = None
TOKEN_EXPIRY = 0

# Constants for API endpoints
TOKEN_URL = "https://ops.epo.org/3.2/auth/accesstoken"
BASE_URL = "https://ops.epo.org/3.2/rest-services"

# Fast but not too fast settings
MAX_WORKERS = 10  # High parallelism
TIMEOUT = 20      # Reasonable timeout
MAX_RETRIES = 3   # Quick retries

# Load credentials from .env file
load_dotenv()
CONSUMER_KEY = os.getenv("CONSUMER_KEY").strip()
CONSUMER_SECRET = os.getenv("CONSUMER_SECRET").strip()

def get_access_token() -> str:
    global TOKEN, TOKEN_EXPIRY
    current_time = time.time()
    if TOKEN and current_time < TOKEN_EXPIRY:
        return TOKEN
    
    logger.info("Getting new access token")
    data = {
        "grant_type": "client_credentials",
        "client_id": CONSUMER_KEY,
        "client_secret": CONSUMER_SECRET
    }
    headers = {"Content-Type": "application/x-www-form-urlencoded"}
    
    for attempt in range(MAX_RETRIES):
        try:
            response = requests.post(TOKEN_URL, data=data, headers=headers, timeout=TIMEOUT)
            response.raise_for_status()
            TOKEN = response.json()["access_token"]
            TOKEN_EXPIRY = current_time + 3500  # Slightly less than 1 hour
            logger.info("New token acquired")
            return TOKEN
        except Exception as e:
            wait_time = (2 ** attempt) + random.uniform(0, 1)
            logger.warning(f"Failed to get token (attempt {attempt+1}/{MAX_RETRIES}): {str(e)}. Retrying in {wait_time:.2f}s")
            time.sleep(wait_time)
    
    logger.error("Failed to get access token after multiple attempts")
    raise Exception("Failed to get access token")

def validate_patent_number(patent: str) -> bool:
    return bool(patent and isinstance(patent, str) and len(patent.strip()) >= 4)

@lru_cache(maxsize=1000)
def process_patent_with_cache(patent: str) -> dict:
    """Cached version of process_patent to avoid redundant API calls"""
    # Get token here once, not for every retry
    token = get_access_token()
    return process_patent_internal(patent, token)

def process_patent_internal(patent: str, token: str) -> dict:
    """Internal function that actually processes the patent"""
    if not validate_patent_number(patent):
        logger.warning(f"Invalid patent number: '{patent}'")
        return {'jurisdictions': None, 'family_members': None}
    
    for attempt in range(MAX_RETRIES):
        try:
            url = f"{BASE_URL}/family/publication/docdb/{quote(patent)}"
            headers = {"Authorization": f"Bearer {token}", "Accept": "application/json"}
            
            response = requests.get(url, headers=headers, timeout=TIMEOUT)
            
            if response.status_code == 401:  # Unauthorized - token expired
                logger.warning("Token expired, getting a new one")
                token = get_access_token()  # Get a fresh token
                continue
                
            if response.status_code == 403:
                # Fast retry for rate limiting
                wait_time = (2 ** attempt) + random.uniform(0, 1)
                logger.debug(f"Rate limited for patent {patent}, backing off for {wait_time:.2f}s")
                time.sleep(wait_time)
                continue
                
            if response.status_code == 404:
                logger.debug(f"No data found for patent {patent}")
                return {'jurisdictions': None, 'family_members': None}
                
            response.raise_for_status()
            result = extract_jurisdictions_and_members(response.json())
            return result
            
        except Exception as e:
            wait_time = 0.5 * (2 ** attempt) + random.uniform(0, 0.5)
            if attempt < MAX_RETRIES - 1:  # Only log if we'll retry
                logger.debug(f"Error processing patent {patent} (attempt {attempt+1}/{MAX_RETRIES}): {str(e)}. Retrying in {wait_time:.2f}s")
                time.sleep(wait_time)
    
    logger.warning(f"Failed to process patent {patent} after {MAX_RETRIES} attempts")
    return {'jurisdictions': None, 'family_members': None}

def extract_jurisdictions_and_members(data: dict) -> dict:
    try:
        jurisdictions = set()
        family_members = []
        world_data = data.get('ops:world-patent-data', {})
        patent_family = world_data.get('ops:patent-family', {})
        members = patent_family.get('ops:family-member', []) or []
        if isinstance(members, dict):
            members = [members]

        for member in members:
            pub_ref = member.get('publication-reference', {})
            docs = pub_ref.get('document-id', []) or []
            if isinstance(docs, dict):
                docs = [docs]
            for doc in docs:
                if doc.get('@document-id-type') == 'docdb':
                    country = doc.get('country')
                    country = country.get('$') if isinstance(country, dict) else country
                    doc_number = doc.get('doc-number')
                    doc_number = doc_number.get('$') if isinstance(doc_number, dict) else doc_number
                    kind = doc.get('kind')
                    kind = kind.get('$') if isinstance(kind, dict) else kind
                    if country and doc_number and kind:
                        jurisdictions.add(country)
                        family_members.append(f"{country}{doc_number}{kind}")
        return {
            'jurisdictions': sorted(jurisdictions),
            'family_members': sorted(set(family_members))
        }
    except Exception as e:
        logger.debug(f"Error extracting jurisdictions and members: {str(e)}")
        return {'jurisdictions': None, 'family_members': None}

def process_patent(patent: str) -> dict:
    """Wrapper function to call the cached version"""
    # Skip None/NaN values
    if pd.isna(patent) or patent is None:
        return {'jurisdictions': None, 'family_members': None}
    return process_patent_with_cache(patent)

def process_dataframe_parallel(df: pd.DataFrame, patent_col: str, max_workers: int = MAX_WORKERS) -> pd.DataFrame:
    """Process patents in parallel with optimized speed and reliability"""
    if patent_col not in df.columns:
        raise ValueError(f"Column '{patent_col}' not found in DataFrame")
    
    # Make a copy to avoid modifying the original
    result_df = df.copy()
    patents = df[patent_col].tolist()
    results = {}
    
    logger.info(f"Starting to process {len(patents)} patents with {max_workers} workers")
    
    # Get a token before starting
    get_access_token()
    
    # Use ThreadPoolExecutor for I/O-bound API calls
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        # Submit all tasks
        future_to_patent = {executor.submit(process_patent, p): p for p in patents}
        
        # Track progress
        completed = 0
        successful = 0
        for future in concurrent.futures.as_completed(future_to_patent):
            patent = future_to_patent[future]
            try:
                result = future.result()
                results[patent] = result
                completed += 1
                
                # Count successful results
                if result['jurisdictions'] is not None:
                    successful += 1
                
                # Log progress at intervals
                if completed % 20 == 0:
                    success_rate = (successful / completed) * 100
                    logger.info(f"Processed {completed}/{len(patents)} patents - {success_rate:.1f}% success rate")
                    
                    # Optional: If success rate drops too low, slow down
                    if completed >= 50 and success_rate < 30:
                        logger.warning("Low success rate detected - consider reducing parallelism or spacing requests")
                
            except Exception as e:
                logger.error(f"Unhandled exception for patent {patent}: {str(e)}")
                results[patent] = {'jurisdictions': None, 'family_members': None}
    
    logger.info(f"Completed processing {len(patents)} patents")
    
    # Apply results to the dataframe
    result_df['family_jurisdictions'] = result_df[patent_col].map(lambda p: results.get(p, {}).get('jurisdictions'))
    result_df['family_members'] = result_df[patent_col].map(lambda p: results.get(p, {}).get('family_members'))
    
    # Log statistics
    success_count = sum(1 for r in results.values() if r['jurisdictions'] is not None)
    logger.info(f"Successfully processed {success_count} out of {len(patents)} patents ({success_count/len(patents)*100:.1f}%)")
    
    return result_df

def auto_recover_process(df: pd.DataFrame, patent_col: str) -> pd.DataFrame:
    """Process with automatic recovery for failures"""
    # First attempt with high parallelism
    logger.info("Starting high-speed processing run")
    result_df = process_dataframe_parallel(df, patent_col, max_workers=MAX_WORKERS)
    
    # Check for patents that failed and retry them with more conservative settings
    failed_patents = result_df[result_df['family_jurisdictions'].isna()]
    if not failed_patents.empty:
        failed_count = len(failed_patents)
        logger.info(f"First pass completed with {failed_count} failures. Running recovery pass...")
        
        # Create a new dataframe with just the failed patents
        recovery_df = failed_patents.copy()
        
        # Process with more conservative settings
        recovered_df = process_dataframe_parallel(recovery_df, patent_col, max_workers=3)
        
        # Update the original results with any recovered data
        for idx, row in recovered_df.iterrows():
            if row['family_jurisdictions'] is not None:
                result_df.at[idx, 'family_jurisdictions'] = row['family_jurisdictions']
                result_df.at[idx, 'family_members'] = row['family_members']
        
        # Log recovery statistics
        final_failed = len(result_df[result_df['family_jurisdictions'].isna()])
        logger.info(f"Recovery complete. Recovered {failed_count - final_failed} patents. Final failure count: {final_failed}")
    
    return result_df

# Example usage:
if __name__ == "__main__":
    # df = pd.read_csv('your_patents.csv')
    
    # Option 1: Fast but may have failures
    # processed_df = process_dataframe_parallel(df, 'first publication number')
    
    # Option 2: Fast with automatic recovery of failures
    processed_df = auto_recover_process(df, 'first publication number')
    
    processed_df[['first publication number', 'family_jurisdictions', 'family_members']]

2025-04-21 19:19:31,257 - INFO - Starting high-speed processing run
2025-04-21 19:19:31,259 - INFO - Starting to process 500 patents with 10 workers
2025-04-21 19:19:31,260 - INFO - Getting new access token
2025-04-21 19:19:31,633 - INFO - New token acquired
2025-04-21 19:19:32,746 - INFO - Processed 20/500 patents - 100.0% success rate
2025-04-21 19:19:33,316 - INFO - Processed 40/500 patents - 100.0% success rate
2025-04-21 19:19:34,119 - INFO - Processed 60/500 patents - 100.0% success rate
2025-04-21 19:19:34,754 - INFO - Processed 80/500 patents - 100.0% success rate
2025-04-21 19:19:35,344 - INFO - Processed 100/500 patents - 100.0% success rate
2025-04-21 19:19:35,993 - INFO - Processed 120/500 patents - 100.0% success rate
2025-04-21 19:19:36,869 - INFO - Processed 140/500 patents - 100.0% success rate
2025-04-21 19:19:39,489 - INFO - Processed 160/500 patents - 100.0% success rate
2025-04-21 19:19:42,544 - INFO - Processed 180/500 patents - 100.0% success rate
2025-04-21 19:19

In [17]:
processed_df[['first publication number', 'family_jurisdictions', 'family_members']].head(10)

Unnamed: 0,first publication number,family_jurisdictions,family_members
0,US10254766B2,"[CA, GB, MX, US, WO]","[CA3042744A1, GB201907387D0, GB2570843A, MX201..."
1,US6394231B1,"[AT, CA, DE, EP, JP, US]","[ATE271512T1, CA2306359A1, CA2306359C, DE50007..."
2,US10196117B2,"[US, WO]","[US10106233B2, US10196117B2, US2017081002A1, U..."
3,US2017059336A1,"[CN, TW, US]","[CN106485340A, TW201708996A, TWI611279B, US201..."
4,US2024242599A1,[US],[US2024242599A1]
5,US11836985B2,[US],"[US10755111B2, US11836985B2, US2019236379A1, U..."
6,US12056529B2,"[US, WO]","[US12056529B2, US2023088692A1, WO2023045493A1]"
7,US10216190B2,[US],"[US10216190B2, US10409285B2, US2018081360A1, U..."
8,US2020125120A1,[US],"[US10216196B2, US10303182B2, US10303183B2, US1..."
9,EP3605488A1,"[CN, DK, EP, US]","[CN110850866A, DK201870686A1, EP3605488A1, EP3..."


In [20]:
import concurrent.futures
import os
import requests
import time
from urllib.parse import quote
import pandas as pd
from dotenv import load_dotenv
import random
import logging
from functools import lru_cache

# Set up logging
logging.basicConfig(level=logging.INFO, 
                    format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Global token cache
TOKEN = None
TOKEN_EXPIRY = 0
TOKEN_REQUEST_COUNT = 0  # Track requests per token
TOKEN_REQUEST_LIMIT = 150  # Get new token after this many requests

# Constants for API endpoints
TOKEN_URL = "https://ops.epo.org/3.2/auth/accesstoken"
BASE_URL = "https://ops.epo.org/3.2/rest-services"

# Fast but not too fast settings
MAX_WORKERS = 15  # High parallelism
TIMEOUT = 20      # Reasonable timeout
MAX_RETRIES = 3   # Quick retries

# Load credentials from .env file
load_dotenv()
CONSUMER_KEY = os.getenv("CONSUMER_KEY").strip()
CONSUMER_SECRET = os.getenv("CONSUMER_SECRET").strip()

def get_access_token(force_refresh=False) -> str:
    global TOKEN, TOKEN_EXPIRY, TOKEN_REQUEST_COUNT
    current_time = time.time()
    
    # Refresh token if forced, expired, or reached request limit
    if force_refresh or TOKEN_REQUEST_COUNT >= TOKEN_REQUEST_LIMIT or not TOKEN or current_time >= TOKEN_EXPIRY:
        logger.info(f"Getting new access token (request count: {TOKEN_REQUEST_COUNT})")
        data = {
            "grant_type": "client_credentials",
            "client_id": CONSUMER_KEY,
            "client_secret": CONSUMER_SECRET
        }
        headers = {"Content-Type": "application/x-www-form-urlencoded"}
        
        for attempt in range(MAX_RETRIES):
            try:
                response = requests.post(TOKEN_URL, data=data, headers=headers, timeout=TIMEOUT)
                response.raise_for_status()
                TOKEN = response.json()["access_token"]
                TOKEN_EXPIRY = current_time + 3500  # Slightly less than 1 hour
                TOKEN_REQUEST_COUNT = 0  # Reset counter
                logger.info("New token acquired")
                return TOKEN
            except Exception as e:
                wait_time = (2 ** attempt) + random.uniform(0, 1)
                logger.warning(f"Failed to get token (attempt {attempt+1}/{MAX_RETRIES}): {str(e)}. Retrying in {wait_time:.2f}s")
                time.sleep(wait_time)
        
        logger.error("Failed to get access token after multiple attempts")
        raise Exception("Failed to get access token")
    else:
        return TOKEN

def validate_patent_number(patent: str) -> bool:
    return bool(patent and isinstance(patent, str) and len(patent.strip()) >= 4)

def process_patent_internal(patent: str) -> dict:
    """Internal function that actually processes the patent"""
    global TOKEN_REQUEST_COUNT
    
    if not validate_patent_number(patent):
        logger.warning(f"Invalid patent number: '{patent}'")
        return {'jurisdictions': None, 'family_members': None}
    
    for attempt in range(MAX_RETRIES):
        try:
            # Get token and increment request counter
            token = get_access_token()
            TOKEN_REQUEST_COUNT += 1
            
            url = f"{BASE_URL}/family/publication/docdb/{quote(patent)}"
            headers = {"Authorization": f"Bearer {token}", "Accept": "application/json"}
            
            response = requests.get(url, headers=headers, timeout=TIMEOUT)
            
            if response.status_code == 401:  # Unauthorized - token expired
                logger.warning("Token expired, getting a new one")
                get_access_token(force_refresh=True)
                continue
                
            if response.status_code == 403:
                # Fast retry for rate limiting
                wait_time = (2 ** attempt) + random.uniform(0, 1)
                logger.debug(f"Rate limited for patent {patent}, backing off for {wait_time:.2f}s")
                
                # If we get rate limited, try rotating the token
                if attempt > 0:
                    get_access_token(force_refresh=True)
                    
                time.sleep(wait_time)
                continue
                
            if response.status_code == 404:
                logger.debug(f"No data found for patent {patent}")
                return {'jurisdictions': None, 'family_members': None}
                
            response.raise_for_status()
            result = extract_jurisdictions_and_members(response.json())
            return result
            
        except Exception as e:
            wait_time = 0.5 * (2 ** attempt) + random.uniform(0, 0.5)
            if attempt < MAX_RETRIES - 1:  # Only log if we'll retry
                logger.debug(f"Error processing patent {patent} (attempt {attempt+1}/{MAX_RETRIES}): {str(e)}. Retrying in {wait_time:.2f}s")
                time.sleep(wait_time)
    
    logger.warning(f"Failed to process patent {patent} after {MAX_RETRIES} attempts")
    return {'jurisdictions': None, 'family_members': None}

# Use a smaller cache size to prevent memory issues with the modified function
@lru_cache(maxsize=500)
def process_patent_with_cache(patent: str) -> dict:
    """Cached version of process_patent to avoid redundant API calls"""
    return process_patent_internal(patent)

def extract_jurisdictions_and_members(data: dict) -> dict:
    try:
        jurisdictions = set()
        family_members = []
        world_data = data.get('ops:world-patent-data', {})
        patent_family = world_data.get('ops:patent-family', {})
        members = patent_family.get('ops:family-member', []) or []
        if isinstance(members, dict):
            members = [members]

        for member in members:
            pub_ref = member.get('publication-reference', {})
            docs = pub_ref.get('document-id', []) or []
            if isinstance(docs, dict):
                docs = [docs]
            for doc in docs:
                if doc.get('@document-id-type') == 'docdb':
                    country = doc.get('country')
                    country = country.get('$') if isinstance(country, dict) else country
                    doc_number = doc.get('doc-number')
                    doc_number = doc_number.get('$') if isinstance(doc_number, dict) else doc_number
                    kind = doc.get('kind')
                    kind = kind.get('$') if isinstance(kind, dict) else kind
                    if country and doc_number and kind:
                        jurisdictions.add(country)
                        family_members.append(f"{country}{doc_number}{kind}")
        return {
            'jurisdictions': sorted(jurisdictions),
            'family_members': sorted(set(family_members))
        }
    except Exception as e:
        logger.debug(f"Error extracting jurisdictions and members: {str(e)}")
        return {'jurisdictions': None, 'family_members': None}

def process_patent(patent: str) -> dict:
    """Wrapper function to call the cached version"""
    # Skip None/NaN values
    if pd.isna(patent) or patent is None:
        return {'jurisdictions': None, 'family_members': None}
    return process_patent_with_cache(patent)

def process_dataframe_parallel(df: pd.DataFrame, patent_col: str, max_workers: int = MAX_WORKERS) -> pd.DataFrame:
    """Process patents in parallel with optimized speed and reliability"""
    if patent_col not in df.columns:
        raise ValueError(f"Column '{patent_col}' not found in DataFrame")
    
    # Make a copy to avoid modifying the original
    result_df = df.copy()
    patents = df[patent_col].tolist()
    results = {}
    
    logger.info(f"Starting to process {len(patents)} patents with {max_workers} workers")
    
    # Get a fresh token before starting
    get_access_token(force_refresh=True)
    
    # Use ThreadPoolExecutor for I/O-bound API calls
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        # Submit all tasks
        future_to_patent = {}
        for p in patents:
            future_to_patent[executor.submit(process_patent, p)] = p
            time.sleep(0.01)  # Small delay between submissions to avoid overwhelming the API
        
        # Track progress
        completed = 0
        successful = 0
        for future in concurrent.futures.as_completed(future_to_patent):
            patent = future_to_patent[future]
            try:
                result = future.result()
                results[patent] = result
                completed += 1
                
                # Count successful results
                if result['jurisdictions'] is not None:
                    successful += 1
                
                # Log progress at intervals
                if completed % 20 == 0:
                    success_rate = (successful / completed) * 100
                    logger.info(f"Processed {completed}/{len(patents)} patents - {success_rate:.1f}% success rate")
                    
                    # Force token refresh every chunk_size patents
                    if completed % TOKEN_REQUEST_LIMIT == 0:
                        logger.info(f"Reached {completed} patents, forcing token refresh")
                        get_access_token(force_refresh=True)
                
            except Exception as e:
                logger.error(f"Unhandled exception for patent {patent}: {str(e)}")
                results[patent] = {'jurisdictions': None, 'family_members': None}
    
    logger.info(f"Completed processing {len(patents)} patents")
    
    # Apply results to the dataframe
    result_df['family_jurisdictions'] = result_df[patent_col].map(lambda p: results.get(p, {}).get('jurisdictions'))
    result_df['family_members'] = result_df[patent_col].map(lambda p: results.get(p, {}).get('family_members'))
    
    # Log statistics
    success_count = sum(1 for r in results.values() if r['jurisdictions'] is not None)
    logger.info(f"Successfully processed {success_count} out of {len(patents)} patents ({success_count/len(patents)*100:.1f}%)")
    
    return result_df

def process_in_batches(df: pd.DataFrame, patent_col: str, batch_size: int = 200) -> pd.DataFrame:
    """Process the dataframe in smaller batches with fresh tokens for each batch"""
    total_rows = len(df)
    result_df = df.copy()
    
    logger.info(f"Processing {total_rows} patents in batches of {batch_size}")
    
    for start_idx in range(0, total_rows, batch_size):
        end_idx = min(start_idx + batch_size, total_rows)
        batch_df = df.iloc[start_idx:end_idx].copy()
        
        logger.info(f"Processing batch {start_idx//batch_size + 1} (rows {start_idx} to {end_idx-1})")
        
        # Force new token for each batch
        get_access_token(force_refresh=True)
        
        # Process this batch
        processed_batch = process_dataframe_parallel(batch_df, patent_col)
        
        # Update results in the main dataframe
        result_df.iloc[start_idx:end_idx, result_df.columns.get_loc('family_jurisdictions')] = processed_batch['family_jurisdictions']
        result_df.iloc[start_idx:end_idx, result_df.columns.get_loc('family_members')] = processed_batch['family_members']
        
        # Add a pause between batches to avoid rate limiting
        if end_idx < total_rows:
            pause_time = 5
            logger.info(f"Batch complete. Pausing for {pause_time} seconds before next batch.")
            time.sleep(pause_time)
    
    return result_df

def auto_recover_process(df: pd.DataFrame, patent_col: str) -> pd.DataFrame:
    """Process with automatic recovery for failures"""
    # First process in batches with token rotation
    logger.info("Starting batch processing with token rotation")
    result_df = process_in_batches(df, patent_col, batch_size=200)
    
    # Check for patents that failed and retry them with more conservative settings
    failed_patents = result_df[result_df['family_jurisdictions'].isna()]
    if not failed_patents.empty:
        failed_count = len(failed_patents)
        logger.info(f"First pass completed with {failed_count} failures. Running recovery pass...")
        
        # Create a new dataframe with just the failed patents
        recovery_df = failed_patents.copy()
        
        # Force new token for recovery
        get_access_token(force_refresh=True)
        
        # Process with more conservative settings
        recovered_df = process_dataframe_parallel(recovery_df, patent_col, max_workers=3)
        
        # Update the original results with any recovered data
        for idx, row in recovered_df.iterrows():
            if row['family_jurisdictions'] is not None:
                result_df.at[idx, 'family_jurisdictions'] = row['family_jurisdictions']
                result_df.at[idx, 'family_members'] = row['family_members']
        
        # Log recovery statistics
        final_failed = len(result_df[result_df['family_jurisdictions'].isna()])
        logger.info(f"Recovery complete. Recovered {failed_count - final_failed} patents. Final failure count: {final_failed}")
    
    return result_df

# Example usage:
if __name__ == "__main__":
    # df = pd.read_csv('your_patents.csv')
    
    # Initialize result columns
    if 'family_jurisdictions' not in df.columns:
        df['family_jurisdictions'] = None
    if 'family_members' not in df.columns:
        df['family_members'] = None
    
    # Process in batches with token rotation and recovery
    processed_df = auto_recover_process(df, 'first publication number')
    
    # Save results to avoid reprocessing
    processed_df.to_csv('processed_patents.csv', index=False)
    
    processed_df[['first publication number', 'family_jurisdictions', 'family_members']]

2025-04-22 09:56:42,642 - INFO - Starting batch processing with token rotation
2025-04-22 09:56:42,643 - INFO - Processing 500 patents in batches of 200
2025-04-22 09:56:42,644 - INFO - Processing batch 1 (rows 0 to 199)
2025-04-22 09:56:42,645 - INFO - Getting new access token (request count: 0)
2025-04-22 09:56:43,369 - INFO - New token acquired
2025-04-22 09:56:43,370 - INFO - Starting to process 200 patents with 15 workers
2025-04-22 09:56:43,370 - INFO - Getting new access token (request count: 0)
2025-04-22 09:56:43,862 - INFO - New token acquired
2025-04-22 09:56:45,955 - INFO - Processed 20/200 patents - 100.0% success rate
2025-04-22 09:56:46,285 - INFO - Processed 40/200 patents - 100.0% success rate
2025-04-22 09:56:47,191 - INFO - Processed 60/200 patents - 100.0% success rate
2025-04-22 09:56:49,003 - INFO - Processed 80/200 patents - 100.0% success rate
2025-04-22 09:56:50,006 - INFO - Getting new access token (request count: 101)
2025-04-22 09:56:50,729 - INFO - Getting 

In [21]:
processed_df[['first publication number', 'family_jurisdictions', 'family_members']].head(10)

Unnamed: 0,first publication number,family_jurisdictions,family_members
0,US10254766B2,"[CA, GB, MX, US, WO]","[CA3042744A1, GB201907387D0, GB2570843A, MX201..."
1,US6394231B1,"[AT, CA, DE, EP, JP, US]","[ATE271512T1, CA2306359A1, CA2306359C, DE50007..."
2,US10196117B2,"[US, WO]","[US10106233B2, US10196117B2, US2017081002A1, U..."
3,US2017059336A1,"[CN, TW, US]","[CN106485340A, TW201708996A, TWI611279B, US201..."
4,US2024242599A1,[US],[US2024242599A1]
5,US11836985B2,[US],"[US10755111B2, US11836985B2, US2019236379A1, U..."
6,US12056529B2,"[US, WO]","[US12056529B2, US2023088692A1, WO2023045493A1]"
7,US10216190B2,[US],"[US10216190B2, US10409285B2, US2018081360A1, U..."
8,US2020125120A1,[US],"[US10216196B2, US10303182B2, US10303183B2, US1..."
9,EP3605488A1,"[CN, DK, EP, US]","[CN110850866A, DK201870686A1, EP3605488A1, EP3..."


rotating tokens

In [22]:
import concurrent.futures
import os
import requests
import time
from urllib.parse import quote
import pandas as pd
from dotenv import load_dotenv
import random
import logging
from functools import lru_cache

# Set up logging
logging.basicConfig(level=logging.INFO, 
                    format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Load credentials from .env file
load_dotenv()
CREDENTIALS = [
    {
        "CONSUMER_KEY": os.getenv("CONSUMER_KEY_1").strip(),
        "CONSUMER_SECRET": os.getenv("CONSUMER_SECRET_1").strip()
    },
    {
        "CONSUMER_KEY": os.getenv("CONSUMER_KEY_2").strip(),
        "CONSUMER_SECRET": os.getenv("CONSUMER_SECRET_2").strip()
    }
]

# Token cache
TOKENS = [None] * len(CREDENTIALS)
TOKEN_EXPIRIES = [0] * len(CREDENTIALS)
TOKEN_COUNTS = [0] * len(CREDENTIALS)
CURRENT_INDEX = 0
TOKEN_REQUEST_LIMIT = 150  # Optional: rotate early

# API Endpoints and constants
TOKEN_URL = "https://ops.epo.org/3.2/auth/accesstoken"
BASE_URL = "https://ops.epo.org/3.2/rest-services"
MAX_WORKERS = 15
TIMEOUT = 20
MAX_RETRIES = 3

# Token manager
def get_rotating_access_token(force_refresh=False):
    global CURRENT_INDEX, TOKENS, TOKEN_EXPIRIES, TOKEN_COUNTS
    num_keys = len(CREDENTIALS)
    now = time.time()

    for i in range(num_keys):
        idx = (CURRENT_INDEX + i) % num_keys
        cred = CREDENTIALS[idx]

        if (force_refresh or not TOKENS[idx] or 
            TOKEN_COUNTS[idx] >= TOKEN_REQUEST_LIMIT or 
            now >= TOKEN_EXPIRIES[idx]):
            
            logger.info(f"Fetching new token using key index {idx}")
            data = {
                "grant_type": "client_credentials",
                "client_id": cred["CONSUMER_KEY"],
                "client_secret": cred["CONSUMER_SECRET"]
            }
            headers = {"Content-Type": "application/x-www-form-urlencoded"}

            for attempt in range(MAX_RETRIES):
                try:
                    response = requests.post(TOKEN_URL, data=data, headers=headers, timeout=TIMEOUT)
                    response.raise_for_status()
                    TOKENS[idx] = response.json()["access_token"]
                    TOKEN_EXPIRIES[idx] = now + 3500
                    TOKEN_COUNTS[idx] = 0
                    CURRENT_INDEX = (idx + 1) % num_keys
                    return TOKENS[idx]
                except Exception as e:
                    wait = (2 ** attempt) + random.random()
                    logger.warning(f"Token fetch failed (key {idx}, attempt {attempt+1}): {e}")
                    time.sleep(wait)
        else:
            TOKEN_COUNTS[idx] += 1
            CURRENT_INDEX = (idx + 1) % num_keys
            return TOKENS[idx]

    raise Exception("All credential sets failed to provide a valid token.")

# Patent processing functions
def validate_patent_number(patent: str) -> bool:
    return bool(patent and isinstance(patent, str) and len(patent.strip()) >= 4)

def process_patent_internal(patent: str) -> dict:
    if not validate_patent_number(patent):
        logger.warning(f"Invalid patent number: '{patent}'")
        return {'jurisdictions': None, 'family_members': None}

    for attempt in range(MAX_RETRIES):
        try:
            token = get_rotating_access_token()
            url = f"{BASE_URL}/family/publication/docdb/{quote(patent)}"
            headers = {"Authorization": f"Bearer {token}", "Accept": "application/json"}
            response = requests.get(url, headers=headers, timeout=TIMEOUT)

            if response.status_code == 401:
                logger.warning("Token expired. Refreshing...")
                get_rotating_access_token(force_refresh=True)
                continue

            if response.status_code == 403:
                wait_time = (2 ** attempt) + random.random()
                logger.warning(f"Rate limit hit for patent {patent}, backing off for {wait_time:.2f}s")
                time.sleep(wait_time)
                continue

            if response.status_code == 404:
                logger.debug(f"No data found for {patent}")
                return {'jurisdictions': None, 'family_members': None}

            response.raise_for_status()
            return extract_jurisdictions_and_members(response.json())

        except Exception as e:
            wait_time = 0.5 * (2 ** attempt) + random.random()
            logger.debug(f"Error processing patent {patent} (attempt {attempt+1}/{MAX_RETRIES}): {e}. Retrying in {wait_time:.2f}s")
            time.sleep(wait_time)

    logger.warning(f"Failed to process {patent} after {MAX_RETRIES} attempts")
    return {'jurisdictions': None, 'family_members': None}

@lru_cache(maxsize=500)
def process_patent_with_cache(patent: str) -> dict:
    return process_patent_internal(patent)

def extract_jurisdictions_and_members(data: dict) -> dict:
    try:
        jurisdictions = set()
        family_members = []
        world_data = data.get('ops:world-patent-data', {})
        patent_family = world_data.get('ops:patent-family', {})
        members = patent_family.get('ops:family-member', []) or []
        if isinstance(members, dict):
            members = [members]

        for member in members:
            pub_ref = member.get('publication-reference', {})
            docs = pub_ref.get('document-id', []) or []
            if isinstance(docs, dict):
                docs = [docs]
            for doc in docs:
                if doc.get('@document-id-type') == 'docdb':
                    country = doc.get('country')
                    doc_number = doc.get('doc-number')
                    kind = doc.get('kind')
                    if isinstance(country, dict): country = country.get('$')
                    if isinstance(doc_number, dict): doc_number = doc_number.get('$')
                    if isinstance(kind, dict): kind = kind.get('$')
                    if country and doc_number and kind:
                        jurisdictions.add(country)
                        family_members.append(f"{country}{doc_number}{kind}")
        return {
            'jurisdictions': sorted(jurisdictions),
            'family_members': sorted(set(family_members))
        }
    except Exception as e:
        logger.debug(f"Error extracting family info: {e}")
        return {'jurisdictions': None, 'family_members': None}

def process_patent(patent: str) -> dict:
    if pd.isna(patent) or patent is None:
        return {'jurisdictions': None, 'family_members': None}
    return process_patent_with_cache(patent)

def process_dataframe_parallel(df: pd.DataFrame, patent_col: str, max_workers: int = MAX_WORKERS) -> pd.DataFrame:
    if patent_col not in df.columns:
        raise ValueError(f"Column '{patent_col}' not found in DataFrame")
    
    result_df = df.copy()
    patents = df[patent_col].tolist()
    results = {}
    logger.info(f"Processing {len(patents)} patents with {max_workers} workers")
    get_rotating_access_token(force_refresh=True)

    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_patent = {executor.submit(process_patent, p): p for p in patents}
        completed = 0
        successful = 0
        for future in concurrent.futures.as_completed(future_to_patent):
            patent = future_to_patent[future]
            try:
                result = future.result()
                results[patent] = result
                completed += 1
                if result['jurisdictions']:
                    successful += 1
                if completed % 20 == 0:
                    logger.info(f"Processed {completed}/{len(patents)} patents - {successful/completed*100:.1f}% success")
            except Exception as e:
                logger.error(f"Unhandled error for patent {patent}: {e}")
                results[patent] = {'jurisdictions': None, 'family_members': None}

    result_df['family_jurisdictions'] = result_df[patent_col].map(lambda p: results.get(p, {}).get('jurisdictions'))
    result_df['family_members'] = result_df[patent_col].map(lambda p: results.get(p, {}).get('family_members'))
    return result_df

def process_in_batches(df: pd.DataFrame, patent_col: str, batch_size: int = 200) -> pd.DataFrame:
    total_rows = len(df)
    result_df = df.copy()
    logger.info(f"Processing {total_rows} patents in batches of {batch_size}")
    for start_idx in range(0, total_rows, batch_size):
        end_idx = min(start_idx + batch_size, total_rows)
        batch_df = df.iloc[start_idx:end_idx].copy()
        logger.info(f"Processing batch {start_idx//batch_size + 1}: rows {start_idx}–{end_idx-1}")
        get_rotating_access_token(force_refresh=True)
        processed_batch = process_dataframe_parallel(batch_df, patent_col)
        result_df.iloc[start_idx:end_idx, result_df.columns.get_loc('family_jurisdictions')] = processed_batch['family_jurisdictions']
        result_df.iloc[start_idx:end_idx, result_df.columns.get_loc('family_members')] = processed_batch['family_members']
        if end_idx < total_rows:
            logger.info("Pausing before next batch...")
            time.sleep(5)
    return result_df

def auto_recover_process(df: pd.DataFrame, patent_col: str) -> pd.DataFrame:
    logger.info("Starting primary batch processing...")
    result_df = process_in_batches(df, patent_col)
    failed = result_df[result_df['family_jurisdictions'].isna()]
    if not failed.empty:
        logger.info(f"Recovery pass for {len(failed)} failed patents...")
        recovery_df = failed.copy()
        get_rotating_access_token(force_refresh=True)
        recovered_df = process_dataframe_parallel(recovery_df, patent_col, max_workers=3)
        for idx, row in recovered_df.iterrows():
            if row['family_jurisdictions']:
                result_df.at[idx, 'family_jurisdictions'] = row['family_jurisdictions']
                result_df.at[idx, 'family_members'] = row['family_members']
    return result_df

# Main entry
if __name__ == "__main__":
    # Example:
    # df = pd.read_csv("your_patents.csv")
    
    if 'family_jurisdictions' not in df.columns:
        df['family_jurisdictions'] = None
    if 'family_members' not in df.columns:
        df['family_members'] = None

    processed_df = auto_recover_process(df, 'first publication number')
    #processed_df.to_csv("processed_patents.csv", index=False)


2025-04-22 10:15:48,165 - INFO - Starting primary batch processing...
2025-04-22 10:15:48,167 - INFO - Processing 500 patents in batches of 200
2025-04-22 10:15:48,168 - INFO - Processing batch 1: rows 0–199
2025-04-22 10:15:48,168 - INFO - Fetching new token using key index 0
2025-04-22 10:15:48,527 - INFO - Processing 200 patents with 15 workers
2025-04-22 10:15:48,528 - INFO - Fetching new token using key index 1
2025-04-22 10:15:49,860 - INFO - Processed 20/200 patents - 100.0% success
2025-04-22 10:15:50,276 - INFO - Processed 40/200 patents - 100.0% success
2025-04-22 10:15:50,932 - INFO - Processed 60/200 patents - 100.0% success
2025-04-22 10:15:51,520 - INFO - Processed 80/200 patents - 100.0% success
2025-04-22 10:15:52,130 - INFO - Processed 100/200 patents - 100.0% success
2025-04-22 10:15:52,742 - INFO - Processed 120/200 patents - 100.0% success
2025-04-22 10:15:53,570 - INFO - Processed 140/200 patents - 100.0% success
2025-04-22 10:15:54,315 - INFO - Processed 160/200 p

In [23]:
processed_df[['first publication number', 'family_jurisdictions', 'family_members']].head(10)

Unnamed: 0,first publication number,family_jurisdictions,family_members
0,US10254766B2,"[CA, GB, MX, US, WO]","[CA3042744A1, GB201907387D0, GB2570843A, MX201..."
1,US6394231B1,"[AT, CA, DE, EP, JP, US]","[ATE271512T1, CA2306359A1, CA2306359C, DE50007..."
2,US10196117B2,"[US, WO]","[US10106233B2, US10196117B2, US2017081002A1, U..."
3,US2017059336A1,"[CN, TW, US]","[CN106485340A, TW201708996A, TWI611279B, US201..."
4,US2024242599A1,[US],[US2024242599A1]
5,US11836985B2,[US],"[US10755111B2, US11836985B2, US2019236379A1, U..."
6,US12056529B2,"[US, WO]","[US12056529B2, US2023088692A1, WO2023045493A1]"
7,US10216190B2,[US],"[US10216190B2, US10409285B2, US2018081360A1, U..."
8,US2020125120A1,[US],"[US10216196B2, US10303182B2, US10303183B2, US1..."
9,EP3605488A1,"[CN, DK, EP, US]","[CN110850866A, DK201870686A1, EP3605488A1, EP3..."


rotating 3 keys

In [None]:
import concurrent.futures
import os
import requests
import time
from urllib.parse import quote
import pandas as pd
from dotenv import load_dotenv
import random
import logging
from functools import lru_cache

# Logging
logging.basicConfig(level=logging.INFO, 
                    format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Load credentials
load_dotenv()
CREDENTIALS = [
    {
        "CONSUMER_KEY": os.getenv("CONSUMER_KEY_1").strip(),
        "CONSUMER_SECRET": os.getenv("CONSUMER_SECRET_1").strip()
    },
    {
        "CONSUMER_KEY": os.getenv("CONSUMER_KEY_2").strip(),
        "CONSUMER_SECRET": os.getenv("CONSUMER_SECRET_2").strip()
    },
    {
        "CONSUMER_KEY": os.getenv("CONSUMER_KEY_3").strip(),
        "CONSUMER_SECRET": os.getenv("CONSUMER_SECRET_3").strip()
    }
]

# Constants
TOKEN_URL = "https://ops.epo.org/3.2/auth/accesstoken"
BASE_URL = "https://ops.epo.org/3.2/rest-services"
MAX_WORKERS = 15
TIMEOUT = 20
MAX_RETRIES = 3
TOKEN_REQUEST_LIMIT = 150

# Token state
TOKENS = [None] * len(CREDENTIALS)
TOKEN_EXPIRIES = [0] * len(CREDENTIALS)
TOKEN_COUNTS = [0] * len(CREDENTIALS)
CURRENT_INDEX = 0

# Rotate tokens
def get_rotating_access_token(force_refresh=False):
    global CURRENT_INDEX, TOKENS, TOKEN_EXPIRIES, TOKEN_COUNTS
    now = time.time()

    for i in range(len(CREDENTIALS)):
        idx = (CURRENT_INDEX + i) % len(CREDENTIALS)
        cred = CREDENTIALS[idx]

        if (force_refresh or not TOKENS[idx] or 
            TOKEN_COUNTS[idx] >= TOKEN_REQUEST_LIMIT or 
            now >= TOKEN_EXPIRIES[idx]):

            logger.info(f"Fetching new token using key index {idx}")
            data = {
                "grant_type": "client_credentials",
                "client_id": cred["CONSUMER_KEY"],
                "client_secret": cred["CONSUMER_SECRET"]
            }
            headers = {"Content-Type": "application/x-www-form-urlencoded"}

            for attempt in range(MAX_RETRIES):
                try:
                    response = requests.post(TOKEN_URL, data=data, headers=headers, timeout=TIMEOUT)
                    response.raise_for_status()
                    TOKENS[idx] = response.json()["access_token"]
                    TOKEN_EXPIRIES[idx] = now + 3500
                    TOKEN_COUNTS[idx] = 0
                    CURRENT_INDEX = (idx + 1) % len(CREDENTIALS)
                    return TOKENS[idx]
                except Exception as e:
                    wait = (2 ** attempt) + random.random()
                    logger.warning(f"Token fetch failed (key {idx}, attempt {attempt+1}): {e}")
                    time.sleep(wait)
        else:
            TOKEN_COUNTS[idx] += 1
            CURRENT_INDEX = (idx + 1) % len(CREDENTIALS)
            return TOKENS[idx]

    raise Exception("All credential sets failed to provide a valid token.")

def validate_patent_number(patent: str) -> bool:
    return bool(patent and isinstance(patent, str) and len(patent.strip()) >= 4)

def process_patent_internal(patent: str) -> dict:
    if not validate_patent_number(patent):
        logger.warning(f"Invalid patent number: '{patent}'")
        return {'jurisdictions': None, 'family_members': None}

    for attempt in range(MAX_RETRIES):
        try:
            token = get_rotating_access_token()
            url = f"{BASE_URL}/family/publication/docdb/{quote(patent)}"
            headers = {"Authorization": f"Bearer {token}", "Accept": "application/json"}
            response = requests.get(url, headers=headers, timeout=TIMEOUT)

            if response.status_code == 401:
                logger.warning("Token expired. Refreshing...")
                get_rotating_access_token(force_refresh=True)
                continue

            if response.status_code == 403:
                wait_time = (5 * (2 ** attempt)) + random.uniform(1, 2)
                logger.warning(f"Rate limit hit for patent {patent}, waiting {wait_time:.1f}s before retry")
                time.sleep(wait_time)
                continue

            if response.status_code == 404:
                logger.debug(f"No data found for {patent}")
                return {'jurisdictions': None, 'family_members': None}

            response.raise_for_status()
            return extract_jurisdictions_and_members(response.json())

        except Exception as e:
            wait_time = 0.5 * (2 ** attempt) + random.random()
            logger.debug(f"Error processing patent {patent} (attempt {attempt+1}): {e}. Retrying in {wait_time:.2f}s")
            time.sleep(wait_time)

    logger.warning(f"Failed to process {patent} after {MAX_RETRIES} attempts")
    return {'jurisdictions': None, 'family_members': None}

@lru_cache(maxsize=500)
def process_patent_with_cache(patent: str) -> dict:
    return process_patent_internal(patent)

def extract_jurisdictions_and_members(data: dict) -> dict:
    try:
        jurisdictions = set()
        family_members = []
        world_data = data.get('ops:world-patent-data', {})
        patent_family = world_data.get('ops:patent-family', {})
        members = patent_family.get('ops:family-member', []) or []
        if isinstance(members, dict):
            members = [members]

        for member in members:
            pub_ref = member.get('publication-reference', {})
            docs = pub_ref.get('document-id', []) or []
            if isinstance(docs, dict):
                docs = [docs]
            for doc in docs:
                if doc.get('@document-id-type') == 'docdb':
                    country = doc.get('country')
                    doc_number = doc.get('doc-number')
                    kind = doc.get('kind')
                    if isinstance(country, dict): country = country.get('$')
                    if isinstance(doc_number, dict): doc_number = doc_number.get('$')
                    if isinstance(kind, dict): kind = kind.get('$')
                    if country and doc_number and kind:
                        jurisdictions.add(country)
                        family_members.append(f"{country}{doc_number}{kind}")
        return {
            'jurisdictions': sorted(jurisdictions),
            'family_members': sorted(set(family_members))
        }
    except Exception as e:
        logger.debug(f"Error extracting family info: {e}")
        return {'jurisdictions': None, 'family_members': None}

def process_patent(patent: str) -> dict:
    if pd.isna(patent) or patent is None:
        return {'jurisdictions': None, 'family_members': None}
    return process_patent_with_cache(patent)

def process_dataframe_parallel(df: pd.DataFrame, patent_col: str, max_workers: int = MAX_WORKERS) -> pd.DataFrame:
    if patent_col not in df.columns:
        raise ValueError(f"Column '{patent_col}' not found in DataFrame")

    result_df = df.copy()
    patents = df[patent_col].tolist()
    results = {}
    logger.info(f"Processing {len(patents)} patents with {max_workers} workers")
    get_rotating_access_token(force_refresh=True)

    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_patent = {}
        for p in patents:
            future_to_patent[executor.submit(process_patent, p)] = p
            if max_workers <= 5:
                time.sleep(1)  # Throttle more in recovery mode

        completed = 0
        successful = 0
        for future in concurrent.futures.as_completed(future_to_patent):
            patent = future_to_patent[future]
            try:
                result = future.result()
                results[patent] = result
                completed += 1
                if result['jurisdictions']:
                    successful += 1
                if completed % 20 == 0:
                    logger.info(f"Processed {completed}/{len(patents)} patents - {successful/completed*100:.1f}% success")
            except Exception as e:
                logger.error(f"Unhandled error for patent {patent}: {e}")
                results[patent] = {'jurisdictions': None, 'family_members': None}

    result_df['family_jurisdictions'] = result_df[patent_col].map(lambda p: results.get(p, {}).get('jurisdictions'))
    result_df['family_members'] = result_df[patent_col].map(lambda p: results.get(p, {}).get('family_members'))
    return result_df

def process_in_batches(df: pd.DataFrame, patent_col: str, batch_size: int = 200) -> pd.DataFrame:
    total_rows = len(df)
    result_df = df.copy()
    logger.info(f"Processing {total_rows} patents in batches of {batch_size}")
    for start_idx in range(0, total_rows, batch_size):
        end_idx = min(start_idx + batch_size, total_rows)
        batch_df = df.iloc[start_idx:end_idx].copy()
        logger.info(f"Processing batch {start_idx//batch_size + 1}: rows {start_idx}–{end_idx-1}")
        get_rotating_access_token(force_refresh=True)
        processed_batch = process_dataframe_parallel(batch_df, patent_col)
        result_df.iloc[start_idx:end_idx, result_df.columns.get_loc('family_jurisdictions')] = processed_batch['family_jurisdictions']
        result_df.iloc[start_idx:end_idx, result_df.columns.get_loc('family_members')] = processed_batch['family_members']
        if end_idx < total_rows:
            logger.info("Pausing before next batch...")
            time.sleep(5)
    return result_df

def auto_recover_process(df: pd.DataFrame, patent_col: str) -> pd.DataFrame:
    logger.info("Starting primary batch processing...")
    result_df = process_in_batches(df, patent_col)

    failed = result_df[result_df['family_jurisdictions'].isna()]
    if not failed.empty:
        logger.info(f"Recovery pass for {len(failed)} failed patents...")

        failed_patents = failed.copy()
        for i in range(0, len(failed_patents), 20):
            mini_df = failed_patents.iloc[i:i+20].copy()
            get_rotating_access_token(force_refresh=True)
            recovered = process_dataframe_parallel(mini_df, patent_col, max_workers=3)
            for idx, row in recovered.iterrows():
                if row['family_jurisdictions']:
                    result_df.at[idx, 'family_jurisdictions'] = row['family_jurisdictions']
                    result_df.at[idx, 'family_members'] = row['family_members']
            logger.info(f"Processed recovery batch {i}-{min(i+20, len(failed_patents))}")
            time.sleep(10)

    return result_df

# Main runner
if __name__ == "__main__":
    # df = pd.read_csv("your_patents.csv")

    if 'family_jurisdictions' not in df.columns:
        df['family_jurisdictions'] = None
    if 'family_members' not in df.columns:
        df['family_members'] = None

    processed_df = auto_recover_process(df, 'first publication number')
    #processed_df.to_csv("processed_patents.csv", index=False)


In [24]:
# Count the number of None (NaN) values in the specified columns
none_counts = processed_df[['first publication number', 'family_jurisdictions', 'family_members']].isnull().sum()

# Display the counts
print(none_counts)

first publication number     0
family_jurisdictions        89
family_members              89
dtype: int64
