Automatic data export 

In [14]:
import pandas as pd
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from collections import Counter
from wordcloud import WordCloud
from bs4 import BeautifulSoup
import pandas as pd
import time
import random
import undetected_chromedriver as uc
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, ElementClickInterceptedException


the script below is a scraper class that automates the flow of finding and downloading patent listings from Espacenet in a few steps : 
- it launches an undetected chrome browser with special setting to look less like a robot . 
- based on the keywords and search fields the user provides it builds the correct search link .
- it navigates to the link , waits until the page is fully loaded and then pauses briefly to mimic human behaviour . 
- it clicks the 'more options' button , then the 'download' section and finall the 'list csv' option to start the search results download. 
- when the download pop-up appears , it sets the number of patents to the maximum amount allowed (500) and clicks the download button 
- if anything goes wrong and the attempt is timed out , it will refresh the page and try again up to 3 times 
- once the CSV download has been triggered successfully ,it shuts down the browser cleanly .  

In [15]:
class EspacenetScraper:
    def __init__(self, search_keywords, headless=True):
        """Initialize the scraper with configurable options and search keywords."""
        self.search_keywords = search_keywords
        options = uc.ChromeOptions()
        if headless:
            options.add_argument('--headless')  

        options.add_argument('--disable-blink-features=AutomationControlled')
        self.driver = uc.Chrome(options=options)
        self.driver.set_page_load_timeout(30)
        self.driver.set_window_size(1600, 1300)

    def construct_search_url(self):
        """Construct the search URL based on the provided keywords and their search fields."""
        base_url = 'https://worldwide.espacenet.com/patent/search?q='
        
        # Mapping of search fields to Espacenet query parameters
        field_mapping = {
            'title': 'ti',
            'abstract': 'ab',
            'claims': 'cl',
            'title,abstract or claims': 'ctxt' ,
            'all text fields' : 'ftxt',
            'title or abstract' : 'ta',
            'description' : 'desc',
            'all text fields or names' : 'nftxt',
            'title , abstract or names' : 'ntxt'
             
        }
        
        query_parts = []
        for keyword, field in self.search_keywords.items():
            field_param = field_mapping.get(field, 'ctxt')  # Default to 'ctxt' if field is unknown
            query_parts.append(f'{field_param} = "{keyword}"')
        
        query = ' AND '.join(query_parts)
        query += '&queryLang=en%3Ade%3Afr'
        
        return base_url + query

    def add_random_delay(self, min_seconds=1, max_seconds=3):
        """a random delay to mimic human behavior."""
        time.sleep(random.uniform(min_seconds, max_seconds))

    def get_page_html(self, retries=3):
        """
        Navigate to the constructed URL and return the page HTML.
        Retry the operation if a timeout occurs.

        Args:
            retries (int): Number of retry attempts.

        Returns:
            str: The page HTML, or None if all retries fail.
        """
        url = self.construct_search_url()
        for attempt in range(retries):
            try:
                print(f"Navigating to: {url} (Attempt {attempt + 1})")
                self.driver.get(url)
                WebDriverWait(self.driver, 30).until(
                    EC.presence_of_element_located((By.TAG_NAME, "body"))
                )

                # Add a random delay to mimic human behavior
                self.add_random_delay(3, 5)

                # Return the page HTML
                return self.driver.page_source

            except TimeoutException:
                print(f"Timed out waiting for the page to load. Retrying ({attempt + 1}/{retries})...")
                if attempt == retries - 1:
                    print("Max retries reached. Unable to load the page.")
                    return None
            except Exception as e:
                print(f"An error occurred: {e}")
                return None

    def download_csv(self, retries=3, max_results=500):
        """
        Complete the sequence of clicking:
        1. More Options button
        2. Download dropdown
        3. List (CSV) option
        4. Handle download dialog by:
           - Setting the "To" value to max_results (e.g., 500)
           - Clicking the Download button
        
        Args:
            retries (int): Number of retry attempts for the entire sequence.
            max_results (int): Maximum number of results to download (1-500).

        Returns:
            bool: True if the download sequence was successful, False otherwise.
        """
        for attempt in range(retries):
            try:
                print(f"Attempting download sequence (Attempt {attempt + 1})...")
                
                # Step 1: Click "More Options" button
                print("Looking for More Options button...")
                more_options_selector = "#more-options-selector--publication-list-header"
                more_options_button = WebDriverWait(self.driver, 30).until(
                    EC.element_to_be_clickable((By.CSS_SELECTOR, more_options_selector))
                )
                
                # Try to click, but handle intercepted clicks
                try:
                    print("More Options button found. Clicking...")
                    more_options_button.click()
                except ElementClickInterceptedException:
                    print("Click intercepted, trying JavaScript click...")
                    self.driver.execute_script('document.querySelector("#more-options-selector--publication-list-header").click()', more_options_button)
                    
                self.add_random_delay(2, 3)
                print('More Options clicked successfully')
                
                # Step 2: Click "Download" section in the dropdown
                print("Looking for Download section...")
                # Use a more general selector to find the Download section
                # This uses contains() to match the text rather than a fixed CSS path
                download_section_xpath = "/html/body/div[2]/div[3]/ul/section[1]"
                download_section = WebDriverWait(self.driver, 10).until(
                    EC.element_to_be_clickable((By.XPATH, download_section_xpath))
                )
                
                try:
                    print("Download section found. Clicking...")
                    download_section.click()
                except ElementClickInterceptedException:
                    print("Click intercepted, trying JavaScript click...")
                    self.driver.execute_script('document.querySelector("#simple-dropdown > div.prod-jss1034.prod-jss966.prod-jss969.prod-jss1045 > ul > section:nth-child(1)").click()', download_section)
                    
                self.add_random_delay(1, 2)
                print('Download section clicked successfully')
                
                # Step 3: Click "List (CSV)" option
                print("Looking for List (CSV) option...")
                # Use contains() with the XPATH to find the CSV option based on text
                csv_option_xpath = "/html/body/div[2]/div[3]/ul/li[2]"
                csv_option = WebDriverWait(self.driver, 10).until(
                    EC.element_to_be_clickable((By.XPATH, csv_option_xpath))
                )
                
                try:
                    print("List (CSV) option found. Clicking...")
                    csv_option.click()
                except ElementClickInterceptedException:
                    print("Click intercepted, trying JavaScript click...")
                    self.driver.execute_script('document.querySelector("#simple-dropdown > div.prod-jss1034.prod-jss966.prod-jss969.prod-jss1045 > ul > li:nth-child(3)").click()', csv_option)
                    
                self.add_random_delay(2, 3)
                print('List (CSV) option clicked successfully')
                
                # Step 4: Handle the download dialog
                print("Waiting for download dialog to appear...")
                
                # Wait for the dialog to appear
                download_dialog_xpath = "/html/body/div[2]/div[3]/div/div"
                WebDriverWait(self.driver, 10).until(
                    EC.presence_of_element_located((By.XPATH, download_dialog_xpath))
                )
                print("Download dialog appeared")
                
                # Find the "To" input field
                to_input_xpath = "/html/body/div[2]/div[3]/div/div/div/div[1]/input[2]"
                to_input = WebDriverWait(self.driver, 10).until(
                    EC.presence_of_element_located((By.XPATH, to_input_xpath))
                )
                
                # Clear the input and set it to max_results
                print(f"Setting maximum results to {max_results}...")
                to_input.clear()
                to_input.send_keys(str(max_results))
                self.add_random_delay(1, 2)
                
                # Click the Download button in the dialog
                download_button_xpath = "/html/body/div[2]/div[3]/div/div/div/button"
                download_button = WebDriverWait(self.driver, 10).until(
                    EC.element_to_be_clickable((By.XPATH, download_button_xpath))
                )
                
                try:
                    print("Download button found. Clicking...")
                    download_button.click()
                except ElementClickInterceptedException:
                    print("Click intercepted, trying JavaScript click...")
                    self.driver.execute_script('document.querySelector("body > div.prod-jss12 > div.prod-jss15.prod-jss13 > div > div > div > button").click()', download_button)
                
                print("Download button clicked")
                
                # Wait for a moment to ensure the download starts
                self.add_random_delay(3, 5)
                
                # Check if there are any error messages
                try:
                    error_message = self.driver.find_element(By.XPATH, "//div[contains(@class, 'download-modal__validation')]//span")
                    if error_message.is_displayed() and error_message.text.strip():
                        print(f"Error in download dialog: {error_message.text}")
                        return False
                except:
                    # No error message found, continue
                    pass
                
                print("Download sequence completed successfully")
                return True
                
            except TimeoutException as e:
                print(f"Timeout during download sequence: {e}")
                if attempt == retries - 1:
                    print("Max retries reached. Download sequence failed.")
                    return False
            except Exception as e:
                print(f"Error during download sequence: {e}")
                if attempt == retries - 1:
                    print("Max retries reached. Download sequence failed.")
                    return False
                
            # If we reach here, there was an error and we need to try again
            # Refresh the page before the next attempt
            try:
                self.driver.refresh()
                WebDriverWait(self.driver, 30).until(
                    EC.presence_of_element_located((By.TAG_NAME, "body"))
                )
                self.add_random_delay(3, 5)
            except Exception as e:
                print(f"Error refreshing page: {e}")

        return False

    def close(self):
        """Close the browser when done."""
        if self.driver:
            self.driver.quit()




In [16]:
if __name__ == '__main__':
    # Define the search keywords with their search fields
    search_keywords = {
        "Silicon": "title,abstract or claims",
        "crystal": "title,abstract or claims",
        "lattice": "title,abstract or claims"
        
        
    }

    # Initialize the scraper with the search keywords
    scraper = EspacenetScraper(search_keywords, headless=False)  # Set headless to False to see the browser in action

    try:
        # Construct and print the search URL
        search_url = scraper.construct_search_url()
        print("Constructed Search URL:", search_url)
        
        # Get the page HTML
        html = scraper.get_page_html(retries=3)
        if html:
            print("Page HTML retrieved successfully.")

            # Perform the download sequence with max 500 results
            if scraper.download_csv(retries=3, max_results=500):
                print("CSV download initiated successfully.")
                # Wait a bit to ensure the download starts
                time.sleep(10)
                print("Download should be complete or in progress.")
            else:
                print("Failed to download CSV.")

    finally:
        # Close the browser
        scraper.close()
        print("Scraper closed.")


Constructed Search URL: https://worldwide.espacenet.com/patent/search?q=ctxt = "Silicon" AND ctxt = "crystal" AND ctxt = "lattice"&queryLang=en%3Ade%3Afr
Navigating to: https://worldwide.espacenet.com/patent/search?q=ctxt = "Silicon" AND ctxt = "crystal" AND ctxt = "lattice"&queryLang=en%3Ade%3Afr (Attempt 1)
Page HTML retrieved successfully.
Attempting download sequence (Attempt 1)...
Looking for More Options button...
More Options button found. Clicking...
More Options clicked successfully
Looking for Download section...
Download section found. Clicking...
Download section clicked successfully
Looking for List (CSV) option...
List (CSV) option found. Clicking...
List (CSV) option clicked successfully
Waiting for download dialog to appear...
Download dialog appeared
Setting maximum results to 500...
Download button found. Clicking...
Download button clicked
Download sequence completed successfully
CSV download initiated successfully.
Download should be complete or in progress.
Scraper

In [17]:
import os
import glob


downloads_folder = os.path.expanduser("~/Downloads")

list_of_files = glob.glob(os.path.join(downloads_folder, "*.csv"))

if list_of_files:  
    latest_file = max(list_of_files, key=os.path.getmtime)
    print("Latest downloaded file:", latest_file)

   
    
    df = pd.read_csv(latest_file,delimiter=';', skiprows=7)
    df.head()
else:
    print("No CSV files found in Downloads.")


Latest downloaded file: C:\Users\tasni/Downloads\Résultat_de_la_recherche_dans_Espacenet_20250515_2004.csv


In [18]:
import pandas as pd


df.rename(columns={
    'Titre': 'Title',
    'Inventeurs': 'Inventors',
    'Demandeurs': 'Applicants',
    'Numéro de publication': 'Publication number',
    'Priorité la plus ancienne': 'Earliest priority',
    'CIB': 'IPC',
    'CPC': 'CPC',
    'Date de publication': 'Publication date',
    'Publication la plus ancienne': 'Earliest publication',
    'Numéro de famille': 'Family number'
}, inplace=True)

df[['first publication date','second publication date']] = df['Publication date'].str.split(' ' , n=1 , expand= True)
df['second publication date'] = df['second publication date'].str.strip('\n')
df['second publication date'] = df['second publication date'].str.strip('\r')
df['second publication date'] = df['second publication date'].str.strip('\n')


df['first publication date'] = pd.to_datetime(
    df['first publication date'].str.strip(), 
    format='mixed'
)
#first filing country 
df[['first publication number', 'second publication number']] = df['Publication number'].str.split(' ' , n=1 , expand=True)

df['second publication number']=df['second publication number'].str.strip('\n')
df['first publication country'] = df['first publication number'].str[:2]
df['second publication country'] = df['second publication number'].str[:2]
if 'Unnamed: 11' in df.columns:
    df = df.drop('Unnamed: 11', axis=1)
    
df['first filing year'] = df['first publication date'].dt.year


df['Earliest priority'] = pd.to_datetime(df['Earliest priority'])
df['earliest priority year'] = df['Earliest priority'].dt.year

df['applicant country'] = df['Applicants'].str.extract(r'\[([A-Z]{2})\]')

df = df.dropna(subset=['Inventors'])
df['Applicants'] = df['Applicants'].fillna(df['Inventors'])
df = df.reset_index(drop=True)
df = df.dropna(subset=['Inventors','Applicants','IPC'])

#filling missing CPC values 
df['CPC'] = df['CPC'].fillna('unkown')
df['IPC'] = df['IPC'].str.split(r'\s+')
import re

def split_cpc(classification):
    # Split only at ") " but keep the ")"
    parts = re.split(r'\)\s+', classification)  
    return [p + ')' if not p.endswith(')') else p for p in parts]  # Ensure each part ends with ')'


df['CPC'] = df['CPC'].apply(split_cpc)



this script automates pulling patent-family data from espacenet using API . 
first it loads the consumer key and consumer_secret from .enf and caches an authentification token , then for each patent number in the dataframe it : 
- validates the format with a quick length check . 
- builds the request URL for the family endpoint (URL encoding the publication number)
- sends the request to fetch patent family info in JSON 
- parses the JSON to collect all family member publication numbers and unique country codes .
- batches these calls in groups of 100 with small delays between requests to respect rate limit .
- maps the results back into two new dataframe columns family_jurisdictions and family_members and returns the enriched dataframe for export or further analysis .  

In [19]:
import os
import requests
import time
from urllib.parse import quote
import pandas as pd
from dotenv import load_dotenv

# Global token cache
TOKEN = None
TOKEN_EXPIRY = 0

# Constants for API endpoints
TOKEN_URL = "https://ops.epo.org/3.2/auth/accesstoken"
BASE_URL = "https://ops.epo.org/3.2/rest-services"

# Load credentials from .env file
load_dotenv()
CONSUMER_KEY = os.getenv("CONSUMER_KEY").strip()
CONSUMER_SECRET = os.getenv("CONSUMER_SECRET").strip()

def get_access_token() -> str:
    """Get or refresh the OAuth access token."""
    global TOKEN, TOKEN_EXPIRY
    if TOKEN and time.time() < TOKEN_EXPIRY:
        return TOKEN
    data = {
        "grant_type": "client_credentials",
        "client_id": CONSUMER_KEY,
        "client_secret": CONSUMER_SECRET
    }
    headers = {"Content-Type": "application/x-www-form-urlencoded"}
    response = requests.post(TOKEN_URL, data=data, headers=headers, timeout=15)
    response.raise_for_status()
    TOKEN = response.json()["access_token"]
    TOKEN_EXPIRY = time.time() + 3500  # approximately 58 minutes
    return TOKEN

def validate_patent_number(patent: str) -> bool:
    """Perform a basic validation for the patent number format."""
    if not patent or len(patent.strip()) < 4:
        return False
    return True

def extract_jurisdictions_and_members(data: dict) -> dict:
    """
    Extract jurisdictions (as a sorted list) and family member publication numbers
    (formatted as country+doc-number+kind) from the JSON response.
    """
    try:
        jurisdictions = set()
        family_members = []
        world_data = data.get('ops:world-patent-data', {})
        patent_family = world_data.get('ops:patent-family', {})
        members = patent_family.get('ops:family-member', [])
        if isinstance(members, dict):
            members = [members]

        for member in members:
            pub_ref = member.get('publication-reference', {})
            docs = pub_ref.get('document-id', [])
            if isinstance(docs, dict):
                docs = [docs]

            for doc in docs:
                if doc.get('@document-id-type') == 'docdb':
                    country = doc.get('country')
                    if isinstance(country, dict):
                        country = country.get('$')
                    doc_number = doc.get('doc-number')
                    if isinstance(doc_number, dict):
                        doc_number = doc_number.get('$')
                    kind = doc.get('kind')
                    if isinstance(kind, dict):
                        kind = kind.get('$')

                    if country and doc_number and kind:
                        jurisdictions.add(country)
                        family_members.append(f"{country}{doc_number}{kind}")

        return {
            'jurisdictions': sorted(jurisdictions),
            'family_members': sorted(set(family_members))
        }

    except Exception as e:
        print(f"Error parsing response: {e}")
        return {'jurisdictions': None, 'family_members': None}

def process_patent(patent: str) -> dict:
    """
    Process a single patent by sending a request to the patent family endpoint,
    then extract family jurisdictions and family member publication numbers.
    Returns a dict with two keys: 'jurisdictions' and 'family_members'.
    """
    if not validate_patent_number(patent):
        print(f"Invalid patent number: {patent}")
        return {'jurisdictions': None, 'family_members': None}
    try:
        token = get_access_token()
        url = f"{BASE_URL}/family/publication/docdb/{quote(patent)}" 
        headers = {
            "Authorization": f"Bearer {token}",
            "Accept": "application/json"
        }
        response = requests.get(url, headers=headers, timeout=15)
        if response.status_code == 403:
            print(f"Access forbidden for patent {patent}")
            return {'jurisdictions': None, 'family_members': None}
        if response.status_code == 404:
            print(f"Patent {patent} not found")
            return {'jurisdictions': None, 'family_members': None}
        response.raise_for_status()
        data = response.json()
        return extract_jurisdictions_and_members(data)
    except Exception as e:
        print(f"Error processing patent {patent}: {e}")
        return {'jurisdictions': None, 'family_members': None}

def process_dataframe(df: pd.DataFrame, patent_col: str) -> pd.DataFrame:
    """
    For a DataFrame containing a column of patent numbers,
    process each patent (in batches) and add two new columns:
      - 'family_jurisdictions': sorted list of jurisdictions for the patent's family
      - 'family_members': sorted list of publication numbers for family members
    """
    if patent_col not in df.columns:
        raise ValueError(f"Column '{patent_col}' not found in DataFrame")
    result_df = df.copy()
    patents = result_df[patent_col].tolist()
    total = len(patents)
    batch_size = 100
    request_delay = 1.2  # seconds delay between requests
    results = {}

    for i in range(0, total, batch_size):
        batch = patents[i:i + batch_size]
        print(f"\nProcessing batch {i//batch_size + 1}/{(total - 1)//batch_size + 1}")
        for patent in batch:
            results[patent] = process_patent(patent)
            time.sleep(request_delay)
        if i + batch_size < total:
            print("Pausing between batches...")
            time.sleep(1)
            
    # Map the processed results to new DataFrame columns
    result_df['family_jurisdictions'] = result_df[patent_col].map(
        lambda p: results.get(p, {}).get('jurisdictions')
    )
    result_df['family_members'] = result_df[patent_col].map(
        lambda p: results.get(p, {}).get('family_members')
    )
    return result_df

if __name__ == "__main__":

    try:
        processed_df = process_dataframe(df, 'first publication number')
        print("\nFinal Results:")
        print(processed_df[['first publication number', 'family_jurisdictions', 'family_members']])
        # Optionally, export the results to CSV
        #processed_df.to_csv('patent_jurisdictions.csv', index=False)
    except Exception as e:
        print(f"Processing failed: {e}")



Processing batch 1/5
Pausing between batches...

Processing batch 2/5
Pausing between batches...

Processing batch 3/5
Error processing patent CN110488413A: HTTPSConnectionPool(host='ops.epo.org', port=443): Max retries exceeded with url: /3.2/rest-services/family/publication/docdb/CN110488413A (Caused by ConnectTimeoutError(<urllib3.connection.HTTPSConnection object at 0x000001FF8434C290>, 'Connection to ops.epo.org timed out. (connect timeout=15)'))
Pausing between batches...

Processing batch 4/5
Pausing between batches...

Processing batch 5/5
Error processing patent CN101070621A: HTTPSConnectionPool(host='ops.epo.org', port=443): Max retries exceeded with url: /3.2/rest-services/family/publication/docdb/CN101070621A (Caused by ConnectTimeoutError(<urllib3.connection.HTTPSConnection object at 0x000001FF84659D90>, 'Connection to ops.epo.org timed out. (connect timeout=15)'))
Error processing patent JPS6023934A: HTTPSConnectionPool(host='ops.epo.org', port=443): Max retries exceeded

the script below looks in your /Downloads folder for any files ending in .csv , pics out the one with the newest modification timestamp , and prints its path. 
if it finds one , it then uses pandas to read that csv into a dataframe . 

In [20]:
df.head()

Unnamed: 0,No,Title,Inventors,Applicants,Publication number,Earliest priority,IPC,CPC,Publication date,Earliest publication,Family number,first publication date,second publication date,first publication number,second publication number,first publication country,second publication country,first filing year,earliest priority year,applicant country
0,1,DISPOSITIFS SPINTRONIQUES CONTENANT DES DOPANT...,DUKOVSKI ILIJA [US] \r\nMEARS ROBERT J [US] \r...,MEARS TECHNOLOGIES INC [US],CA2646325A1,2006-03-17,"[H01L29/15, H01L29/66, H10N50/10]","[B82Y10/00 (EP), B82Y25/00 (EP), B82Y40/00 (EP...",2007-09-27,2007-09-27,38278899,2007-09-27,,CA2646325A1,,CA,,2007,2006,US
1,2,SILICON SINGLE CRYSTAL,ADACHI SADAO,NIPPON TELEGRAPH & TELEPHONE,JPS58140112A,1982-02-16,"[H01L21/84, H01L27/12, H01L21/20, H01L21/205, ...","[H01L21/0242 (EP), H01L21/02532 (EP)]",1983-08-19,1983-08-19,12073855,1983-08-19,,JPS58140112A,,JP,,1983,1982,
2,3,Method for improving silicon surface lattice s...,LI XIANG,ZHEJIANG FORTUNE ENERGY CO LTD,CN105762223A,2014-12-17,"[C30B33/10, H01L31/18]",[Y02P70/50 (EP)],2016-07-13,2016-07-13,56336973,2016-07-13,,CN105762223A,,CN,,2016,2014,
3,4,"SILICON CARBIDE SINGLE CRYSTAL, METHOD FOR PRO...",KOYANAGI NAOKI \r\nKOKOI HISAO,SHOWA DENKO KK,JP2011111372A \r\nJP5398492B2,2009-11-27,[C30B29/36],[unkown)],2011-06-09 \r\n2014-01-29,2011-06-09,44233971,2011-06-09,2014-01-29,JP2011111372A,\r\nJP5398492B2,JP,\r\n,2011,2009,
4,5,Method for preparing high electron mobility hy...,SHEN WENZHONG CHEN [CN] \r\nWENZHONG SHEN [CN]...,UNIV SHANGHAI JIAOTONG [CN],CN100442443C \r\nCN1737999A,2005-09-08,"[C23C16/24, C23C16/44, H01L21/205]",[unkown)],2006-02-22 \r\n2008-12-10,2006-02-22,36080747,2006-02-22,2008-12-10,CN100442443C,\r\nCN1737999A,CN,\r\n,2006,2005,CN


In [21]:
OR_df = df.head(50)
#OR_df.to_csv('OR_df.csv' , index=False)
result_or_df=df.head(50)

this script uses EPO automates retrieval and processing of patent citation and classification data from Espacenet , it loads API credentials . 
first it loads the API credentials from a .env file and maintains an authentification token to authenticate requests . 
for each publication number in the dataframe , the get_patent_biblio function sends a request to EPO API endpoint to fetch the raw XML bibliographic records , then parses that XML to pull out every cited patent's publication number by grouping country code + document number + kind code . 
next , it loops over these publication number and fetches each patent's XML using the same enpoint and extract all IPC class codes  and aggregates these IPC codes back into our dataframe . 
finally it filters for any rows that yeilded at least one IPC , ensure we have at least 30 rows and display a sample of the results . 

In [22]:
import os
import time
import requests
import pandas as pd
import xml.etree.ElementTree as ET
from dotenv import load_dotenv

# Global token cache
TOKEN = None
TOKEN_EXPIRY = 0

# Constants for API endpoints
TOKEN_URL = "https://ops.epo.org/3.2/auth/accesstoken"
BASE_URL = "https://ops.epo.org/3.2/rest-services"

# Load credentials from .env file
load_dotenv()
CONSUMER_KEY = os.getenv("CONSUMER_KEY").strip()
CONSUMER_SECRET = os.getenv("CONSUMER_SECRET").strip()

def get_access_token() -> str:
    """Get or refresh the OAuth access token."""
    global TOKEN, TOKEN_EXPIRY
    if TOKEN and time.time() < TOKEN_EXPIRY:
        return TOKEN
    data = {
        "grant_type": "client_credentials",
        "client_id": CONSUMER_KEY,
        "client_secret": CONSUMER_SECRET
    }
    headers = {"Content-Type": "application/x-www-form-urlencoded"}
    response = requests.post(TOKEN_URL, data=data, headers=headers, timeout=15)
    response.raise_for_status()
    TOKEN = response.json()["access_token"]
    # Token expires in ~58 minutes; refresh slightly before expiry.
    TOKEN_EXPIRY = time.time() + 3500  
    return TOKEN

def get_patent_biblio(publication_number: str) -> str:
    """
    Fetch bibliographic data for a given patent number from the EPO OPS API.
    
    Args:
        publication_number (str): The publication number (e.g., "CN112508743A")
        
    Returns:
        str: The XML response text.
    """
    token = get_access_token()
    # Construct the static endpoint URL using the provided publication number
    url = f"{BASE_URL}/published-data/publication/docdb/{publication_number}/biblio"
    headers = {
        "Authorization": f"Bearer {token}",
        "Accept": "application/xml"
    }
    response = requests.get(url, headers=headers, timeout=15)
    response.raise_for_status()
    return response.text

def retrieve_citation_publication_numbers(xml_string: str) -> list:
    """
    Parses an EPO patent XML string and retrieves citation publication numbers 
    from each citation's <document-id> element with document-id-type="docdb".
    The publication number is constructed as: country + doc-number + kind.
    
    Args:
        xml_string (str): The XML string containing patent data.
    
    Returns:
        list of str: A list of citation publication numbers.
    """
    ns = {
        'ex': "http://www.epo.org/exchange",
        'ops': "http://ops.epo.org"
    }
    
    publication_numbers = []
    root = ET.fromstring(xml_string)
    
    citations = root.findall(".//ex:bibliographic-data/ex:references-cited/ex:citation", ns)
    
    for citation in citations:
        docdb = citation.find(".//ex:document-id[@document-id-type='docdb']", ns)
        if docdb is not None:
            country = docdb.findtext("ex:country", default="", namespaces=ns)
            doc_number = docdb.findtext("ex:doc-number", default="", namespaces=ns)
            kind = docdb.findtext("ex:kind", default="", namespaces=ns)
            pub_number = f"{country}{doc_number}{kind}"
            if pub_number:
                publication_numbers.append(pub_number)
    
    return publication_numbers

def retrieve_ipc_classifications(xml_string: str) -> list:
    """
    Parses the given patent XML string and extracts the IPC classification texts
    from the <classifications-ipcr> element. For each classification text:
      - Everything after (and including) the '/' character is removed.
      - All spaces are removed from the remaining text.
      
    Args:
        xml_string (str): The XML string from the OPS API.
        
    Returns:
        list of str: A list of cleaned IPC classification texts.
    """
    ns = {
        'ex': "http://www.epo.org/exchange",
        'ops': "http://ops.epo.org"
    }
    
    ipcs = []
    root = ET.fromstring(xml_string)
    
    for cl in root.findall(".//ex:classifications-ipcr/ex:classification-ipcr", ns):
        text = cl.findtext("ex:text", default="", namespaces=ns)
        if text:
            # Remove everything after the first '/'
            cleaned_text = text.strip().split('/')[0].strip()
            # Remove all spaces from the cleaned text
            cleaned_text = cleaned_text.replace(" ", "")
            ipcs.append(cleaned_text)
    
    return ipcs

def get_citations_ipc_for_patent(publication_number: str) -> list:
    """
    For a given citation publication number, fetch bibliographic data and
    return its IPC classifications.
    
    Args:
        publication_number (str): A citation publication number.
        
    Returns:
        list: A list of cleaned IPC classification texts.
    """
    try:
        xml_data = get_patent_biblio(publication_number)
        ipc_classifications = retrieve_ipc_classifications(xml_data)
        return ipc_classifications
    except Exception as e:
        print(f"Error fetching IPC for {publication_number}: {e}")
        return []

def get_all_citations_ipc(citation_nums: list) -> list:
    """
    Given a list of citation publication numbers, retrieve the IPC classifications
    for each citation and aggregate them into one list.
    
    Args:
        citation_nums (list): List of citation publication numbers.
        
    Returns:
        list: Aggregated list of cleaned IPC classification texts from the citations.
    """
    ipc_results = []
    for num in citation_nums:
        ipc = get_citations_ipc_for_patent(num)
        ipc_results.extend(ipc)
    return ipc_results




In [None]:
if __name__ == "__main__":
   

    # Step 1: get citation numbers and IPC lists
    result_or_df['citation_numbers'] = (
        result_or_df['first publication number']
        .apply(lambda pub: retrieve_citation_publication_numbers(get_patent_biblio(pub)))
    )
    result_or_df['citations_ipc'] = (
        result_or_df['citation_numbers']
        .apply(get_all_citations_ipc)
    )

    # Step 2: keep only rows where we actually got at least one IPC
    valid_df = result_or_df[result_or_df['citations_ipc'].apply(lambda lst: len(lst) > 0)]

    # Step 3: check we have ≥ 30
    n_valid = len(valid_df)
    if n_valid < 30:
        raise RuntimeError(
            f"Only found {n_valid} rows with citations IPC data. "
            "You need at least 30 — please add more publication numbers."
        )

    # Step 4: pick exactly 30 of them
    # — random sample for diversity; or use .head(30) for the first 30 in order
    final_df = valid_df.sample(n=30, random_state=42).reset_index(drop=True)

    # (Optional) inspect
    final_df[['first publication number', 'citations_ipc']].head()



ConnectionError: HTTPSConnectionPool(host='ops.epo.org', port=443): Max retries exceeded with url: /3.2/auth/accesstoken (Caused by NameResolutionError("<urllib3.connection.HTTPSConnection object at 0x000001FF84653440>: Failed to resolve 'ops.epo.org' ([Errno 11001] getaddrinfo failed)"))

: 