In [1]:
# Dawnstar, a patent expiration workflow.
#
# Programmatically find projected expiration dates 
# and other assumed status details for compound patents.
#
#   Requiring Python == 3.11.

import requests
from io import StringIO
import aiohttp
import asyncio
import os
import sys
import time
from datetime import datetime
import argparse
import json
import pandas as pd
from collections import defaultdict
import bs4
from rdkit import Chem, DataStructs
from rdkit.Chem import AllChem
import random

import psycopg2
from secret import db_info

DATA_FOLDER = './compound_data'
HEADERS = {'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) \
           AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36'}

def get_compounds(infile: str, canonical: bool) -> list:
    """Process user input-file and return compound SMILES after sanitizing."""
    compounds = []
    if infile.split('.')[-1] == 'sdf':
        all_mols = Chem.SDMolSupplier(infile)
    else:
        with open(infile, 'r') as f:
            all_mols = [Chem.MolFromSmiles(line.strip()) for line in f]
    for mol in all_mols:
        if mol:
            if canonical:
                smi = Chem.MolToSmiles(mol, canonical=True)
            else:
                smi = Chem.MolToSmiles(mol, isomericSmiles=True, kekuleSmiles=True)
            if smi:
                compounds.append(smi)
    return compounds

def validate_date(date: str) -> bool:
    """Validate potential before and after date cutoffs.
    Want date to be before NOW and after Jan. 1st, 1970."""
    try:
        if len(date) == 8:
            yr, mo, dy = int(date[0:4]), int(date[4:6]), int(date[6:8])
            date = datetime(year=yr, month=mo, day=dy)
            if date >= datetime(year=1970, month=1, day=1) and date < datetime.now():
                return True
    except Exception as e:
        print(e)
    return False

def safe_select(event: bs4.element.Tag, item: str) -> str | int:
    """Process HTML tags in patent timeline to ensure accurate scraping."""
    try:
        val = event.select_one(item).text.strip()
        val = None if not val else val
    except:
        val = None
    return val

def build_query(compound: str, is_smiles: bool, search_type: str, base_filters: bool, after: None | str, before: None | str) -> tuple:
    """Build two query URLs for a given compound, one for search and one for XHR download.
    Returns tuple of length two with each URL."""
    if is_smiles:
        to_replace = {'%': r'%25', '+': r'%2b', '=': r'%3d', '/': r'%2f', '#': r'%23'}
        for ch in to_replace.keys():
            compound = compound.replace(ch, to_replace[ch])
    
    eq1 = r'%3d'
    sim_sub = ''
    if search_type == 'substructure':
        sim_sub = f'SSS{eq1}'
    elif search_type == 'similarity':
        sim_sub = '~'
    
    base_filter = ''
    if base_filters:
        base_filter = '&country=US&language=ENGLISH&type=PATENT'
    
    after_filter, before_filter = '', ''
    if after:
        after_filter = f'&after=priority:{after}'
    if before:
        before_filter = f'&before=priority:{before}'
    
    query = f'q=CL{eq1}{sim_sub}({compound}){base_filter}{after_filter}{before_filter}'

    search_url = 'https://patents.google.com/?' + query

    eq2, amp, col, pct3d, b1, b2, pct2b = r'%3D', r'%26', r'%3A', r'%253d', r'%5B', r'%5D', r'%252b'
    query = query.replace(eq1, pct3d).replace('=', eq2).replace('&', amp).replace(':', col).replace('[', b1).replace(']', b2).replace(r'%2b', pct2b)
    download_url = f'https://patents.google.com/xhr/query?url=' + query + '&exp=&download=true'

    return search_url, download_url

def parse_timeline(page_text: str) -> tuple:
    """Process webpage text of a single patent for a given compound.
    Extract timeline events, in particular status and expiration."""
    title = date = status = status_detail = None
    soup = bs4.BeautifulSoup(page_text, 'html.parser')
    title, events = soup.find('title').text.split()[0], soup.find_all('dd', {'itemprop': "events"})
    soup = None
    for event in events:
        e_type = safe_select(event, 'span[itemprop="type"]')
        if e_type is not None and e_type == 'legal-status':
            e_title = safe_select(event, 'span[itemprop="title"]')
            e_date = safe_select(event, 'time[itemprop="date"]')
            if e_date is not None and e_date == 'Status':
                status = e_title
            elif e_date is not None:
                status_detail = e_title
                date = e_date
    return title, date, status, status_detail

async def get_url_async(session: aiohttp.ClientSession, url: str, headers: dict) -> str:
    """Makes async request to get single patent webpage text.
    Raises error if response is not 200."""
    async with session.get(url=url, headers=headers) as response:
        response.raise_for_status()
        return await response.text()

async def get_timeline_async(session: aiohttp.ClientSession, url: str, headers: dict) -> tuple | None:
    """Async wrapper for parse_timeline function."""
    page_text = await get_url_async(session=session, url=url, headers=headers)
    title, date, status, status_detail = parse_timeline(page_text=page_text)
    if title and date and (status or status_detail):
        return (title, (date, status, status_detail))
    return None

async def get_all_async(urls: list, headers: dict) -> list:
    """Collect information for all patents in async fashion."""
    l = []
    async with aiohttp.ClientSession() as session:
        for i, url in enumerate(urls):
            l.append(get_timeline_async(session=session, url=url, headers=headers))
            if i % 5 == 0 and i > 0:
                print(f'Processed URL {i}.')
        return await asyncio.gather(*l)

In [39]:
infile = './compound_data/metformin.txt'
is_smiles = True
search_type = 'exact' #'similarity', 'substructure'
after = None
before = None

In [40]:
compounds = get_compounds(infile=infile, canonical=True)

In [41]:
s = compounds[0]
s

'CN(C)C(=N)N=C(N)N'

In [48]:
import base64

In [51]:
db_info = {
    'user': 'vpolyakov', 
    'password': 'Pi3dato1', 
    'dbname': 'waffles', 
    'host': 'hyperion', 
    'port': 5432
}

In [8]:
conn = psycopg2.connect(**db_info)
curs = conn.cursor()
curs.execute('set search_path to sc')

In [53]:
curs.execute('select count(id) from cdb_compound')
curs.fetchall()

[(4808,)]

In [3]:
curs.execute('select count(id) from compound')
curs.fetchall()

[(7404489,)]

In [9]:
s = 'CN(C)C(=N)N=C(N)N'
curs.execute('select p.num \
             from compound c, patent p, field_freq f \
             where c.id = f.compound_id and p.id = f.patent_id and f.field_id = 2 \
             and c.smiles @= %s', (s, ))

r = [x[0] for x in curs.fetchall()]
r

[]

In [10]:
curs.execute('select p.num \
             from compound c, patent p, field_freq f \
             where c.id = f.compound_id and p.id = f.patent_id and f.field_id = 2 \
             and c.smiles@>%s', (s, ))

r = [x[0] for x in curs.fetchall()]
r[0:3]

['US20150050241A1', 'US20150111731A1', 'US20150072859A1']

In [33]:
thresh = 0.40
m = Chem.MolFromSmiles(s)
b_mfp = DataStructs.BitVectToBinaryText( AllChem.GetMorganFingerprintAsBitVect(m, 2, nBits=1024) )

curs.execute(f'set rdkit.tanimoto_threshold={thresh}')
curs.execute('select p.num \
             from compound c, patent p, field_freq f \
             where c.id = f.compound_id and p.id = f.patent_id and f.field_id = 2 \
             and c.mfp%%bfp_from_binary_text(%s)', (b_mfp, ))

r = [x[0] for x in curs.fetchall()]
r[0:3]

[]

In [54]:
curs.close()
conn.close()

# Using USPTO PEDS API to find patent information

In [12]:
# USPTO PEDS API
# url = 'https://ped.uspto.gov/api/queries'
# headers = {
#     'Content-Type': 'application/json',
#     'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) \
#         AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36'
#     }
# solr_query = {
#     'df': 'appEarlyPubNumber',
#     'mm': '100%',
#     'fl': '*',
#     'facet': 'false',
#     }

# found = []
# cols = ['appEarlyPubNumber', 'patentNumber', 'appStatus', 'patentTitle', 'patentIssueDate']
# with requests.session() as s:
#     for patent in patents:
#         for status in ['Patented Case', 'Abandoned']:
#             solr_query['searchText'] = 'appEarlyPubNumber:' + patent
#             solr_query['fq'] = ['appStatus:\"' + status + '\"']

#             response = s.post(url, json=solr_query, headers=headers)
#             if response:
#                 d = response.json()
                
#                 res_d = d['queryResults']['searchResponse']['response']
#                 if res_d['numFound'] >= 1:
#                     for doc in res_d['docs']:
#                         found.append([doc[x] for x in cols])

# pd.DataFrame(found, columns=cols)