In [1]:
import pandas as pd

In [2]:
dtypes = {
    "patent_id": "string",
    "forward_citations": "int64",
}
fwd_citations = pd.read_csv("./data/final_fwdcitation.csv", dtype=dtypes)

fwd_citations.head()

Unnamed: 0,patent_id,patent_type,patent_date,patent_title,wipo_kind,forward_citations
0,10000000,utility,2018-06-19,Coherent LADAR using intra-pixel quadrature de...,B2,13
1,10000001,utility,2018-06-19,Injection molding machine and mold thickness c...,B2,0
2,10000002,utility,2018-06-19,Method for manufacturing polymer film and co-e...,B2,0
3,10000003,utility,2018-06-19,Method for producing a container from a thermo...,B2,2
4,10000004,utility,2018-06-19,"Process of obtaining a double-oriented film, c...",B2,0


In [4]:
dtypes = {
    "patent_id": "string",
    "cpc_sequence": "int64",
}
cpc = pd.read_csv(
    "./data/g_cpc_current.tsv", 
    sep="\t",
    dtype=dtypes,
    )
cpc.head()

Unnamed: 0,patent_id,cpc_sequence,cpc_section,cpc_class,cpc_subclass,cpc_group,cpc_type
0,3950000,0,A,A63,A63C,A63C9/001,inventional
1,3950000,1,A,A63,A63C,A63C9/00,inventional
2,3950000,2,A,A63,A63C,A63C9/002,inventional
3,3950000,3,A,A63,A63C,A63C9/081,inventional
4,3950001,0,A,A63,A63C,A63C9/086,inventional


In [5]:
citations_dtypes = {
    "patent_id": "string",
    "citation_patent_id": "string",
}
citations = pd.read_csv(
    "./data/g_us_patent_citation.tsv", 
    sep="\t",
    dtype=citations_dtypes,
    )
citations.head()

  citations = pd.read_csv(


Unnamed: 0,patent_id,citation_sequence,citation_patent_id,citation_date,record_name,wipo_kind,citation_category
0,10000000,0,5093563,1992-03-01,Small,A,cited by examiner
1,10000000,1,5751830,1998-05-01,Hutchinson,A,cited by applicant
2,10000001,0,7804268,2010-09-01,Park,B2,cited by examiner
3,10000001,1,9022767,2015-05-01,Oono,B2,cited by examiner
4,10000001,2,9090016,2015-07-01,Takeuchi,B2,cited by examiner


In [6]:
fwd_citations = fwd_citations[["patent_id", "forward_citations"]]
cpc = cpc[["patent_id", "cpc_sequence", "cpc_subclass"]]
citations = citations[["patent_id", "citation_sequence", "citation_patent_id"]]

In [7]:
num_patents = fwd_citations["patent_id"].nunique()
print(f"Number of patents: {num_patents}")

Number of patents: 7507819


In [109]:
# Count the number of occurences of each unique pair of cpc_subclasse accross all patents
from itertools import combinations
import os

# Step 1: Group by patent_id and collect all cpc_subclasses for each patent
# This will create a dataframe where each patent_id maps to a list of its cpc_subclasses
cpc_subclasses = cpc.groupby("patent_id")["cpc_subclass"].apply(set)
cpc_subclasses = pd.merge(
    cpc_subclasses,
    fwd_citations.set_index("patent_id"), 
    left_index=True, 
    right_index=True,
    how="right",
    validate="1:1"
)
cpc_subclasses = cpc_subclasses["cpc_subclass"]
cpc_subclasses = cpc_subclasses.rename("cpc_subclasses")
cpc_subclasses.head()


patent_id
10000000                                  {G01S}
10000001                            {G05B, B29C}
10000002    {B29K, B29D, B32B, B29C, B29L, B60C}
10000003                {B29L, B29K, B29D, B29C}
10000004                      {B29L, B29K, B29C}
Name: cpc_subclasses, dtype: object

In [41]:
cpc_subclasses.astype("object")
cpc_subclassses = cpc_subclasses.fillna(set)
cpc_subclasses.head()

patent_id
10000000                                  {G01S}
10000001                            {G05B, B29C}
10000002    {B29K, B29D, B32B, B29C, B29L, B60C}
10000003                {B29L, B29K, B29D, B29C}
10000004                      {B29L, B29K, B29C}
Name: cpc_subclass, dtype: object

In [90]:
import ast
# Step 2: Generate all unique pairs of cpc_subclasses for each patent
def generate_pairs(cpc_subclass: set) -> list:
    if isinstance(cpc_subclass, float):
        return []
    if not isinstance(cpc_subclass, set):
        cpc_subclass = set(cpc_subclass)
    if not cpc_subclass:
        return []
    return list(combinations(sorted(cpc_subclass), 2))

patent_cpc_subclass_pairs = None
if not os.path.exists("./data/cpc_subclass_pairs.csv"):
    patent_cpc_subclass_pairs = cpc_subclasses.apply(generate_pairs)
    patent_cpc_subclass_pairs = patent_cpc_subclass_pairs.rename("cpc_subclass_pairs")
    patent_cpc_subclass_pairs.to_csv("./data/cpc_subclass_pairs.csv")
else:
    patent_cpc_subclass_pairs = pd.read_csv("./data/cpc_subclass_pairs.csv", index_col=0)
    patent_cpc_subclass_pairs["cpc_subclass_pairs"] = patent_cpc_subclass_pairs["cpc_subclass_pairs"].apply(ast.literal_eval, meta=('cpc_subclass_pairs', 'object'))
patent_cpc_subclass_pairs.head()

patent_id
10000000                                                   []
10000001                                       [(B29C, G05B)]
10000002    [(B29C, B29D), (B29C, B29K), (B29C, B29L), (B2...
10000003    [(B29C, B29D), (B29C, B29K), (B29C, B29L), (B2...
10000004           [(B29C, B29K), (B29C, B29L), (B29K, B29L)]
Name: cpc_subclass_pairs, dtype: object

In [91]:
# Step 3: Flatten the list of pairs into a DataFrame
patent_cpc_subclass_pairs_long = patent_cpc_subclass_pairs.explode()
patent_cpc_subclass_pairs_long = patent_cpc_subclass_pairs_long.rename("cpc_subclass_pair")
patent_cpc_subclass_pairs_long = patent_cpc_subclass_pairs_long.to_frame().reset_index()
patent_cpc_subclass_pairs_long.head()

Unnamed: 0,patent_id,cpc_subclass_pair
0,10000000,
1,10000001,"(B29C, G05B)"
2,10000002,"(B29C, B29D)"
3,10000002,"(B29C, B29K)"
4,10000002,"(B29C, B29L)"


In [92]:
# Step 4: Count occurrences of each unique pair
if not os.path.exists("./data/cpc_subclass_pair_counts.csv"):
    pair_counts = patent_cpc_subclass_pairs_long.groupby("cpc_subclass_pair").size()
    pair_counts = pair_counts.to_frame(name="count")
    pair_counts.to_csv("./data/cpc_subclass_pair_counts.csv")
else:
    pair_counts = pd.read_csv("./data/cpc_subclass_pair_counts.csv", index_col=0)

pair_counts.head()

Unnamed: 0_level_0,count
cpc_subclass_pair,Unnamed: 1_level_1
"(A01B, A01C)",1662
"(A01B, A01D)",1209
"(A01B, A01F)",133
"(A01B, A01G)",468
"(A01B, A01H)",15


In [93]:
total_pairs = pair_counts.sum()
print(f"Total number of pairs in the dataset: {total_pairs}")

num_unique_pairs = pair_counts.index.nunique()
print(f"Number of unique pairs in the dataset: {num_unique_pairs}")

Total number of pairs in the dataset: count    14580344
dtype: int64
Number of unique pairs in the dataset: 95614


In [94]:
import numpy as np
# Step 5: Assign each pair an atypicality score
def atypicality_score(n):
    if n > 0:
        return -np.log(n)
    else:
        # This should not happen since the pairs are generated from existing data. If it does, raise an error.
        raise ValueError("Count must be greater than 0")

if not os.path.exists("./data/cpc_subclass_pair_counts_with_atypicality.csv"):
    pair_counts["atypicality_score"] = pair_counts["count"].apply(atypicality_score)
    pair_counts.to_csv("./data/cpc_subclass_pair_counts_with_atypicality.csv")
else: 
    pair_counts = pd.read_csv("./data/cpc_subclass_pair_counts_with_atypicality.csv")
pair_counts.head()




Unnamed: 0_level_0,count,atypicality_score
cpc_subclass_pair,Unnamed: 1_level_1,Unnamed: 2_level_1
"(A01B, A01C)",1662,-7.415777
"(A01B, A01D)",1209,-7.097549
"(A01B, A01F)",133,-4.890349
"(A01B, A01G)",468,-6.148468
"(A01B, A01H)",15,-2.70805


In [95]:
pair_counts["atypicality_score"].describe()

count    95614.000000
mean        -2.389566
std          1.963772
min        -12.005736
25%         -3.637586
50%         -2.079442
75%         -0.693147
max         -0.000000
Name: atypicality_score, dtype: float64

In [96]:
pair_counts_dict = pair_counts.to_dict(orient="index")

In [103]:
# Step 6: Calculate the atypicality score for each patent
def calculate_atypicality_score(cpc_group_pairs):
    if isinstance(cpc_group_pairs, str):
        cpc_group_pairs = ast.literal_eval(cpc_group_pairs)

    if len(cpc_group_pairs) == 0:
        return 0.0
    # Get the counts for each pair
    scores = [pair_counts_dict.get(tuple(pair), {}).get("atypicality_score", 0) for pair in cpc_group_pairs]
    if len(scores) == 0:
        raise RuntimeError("No matching pairs found for the given cpc_group_pairs.")
    
    # Calculate the atypicality score for each pair
    atypicality_score = np.mean(scores)
    
    return atypicality_score

if not os.path.exists("./data/patent_with_atypicality.csv"):
    if not isinstance(patent_cpc_subclass_pairs, pd.DataFrame):
        patent_cpc_subclass_pairs = patent_cpc_subclass_pairs.to_frame()
    # Group by patent_id and calculate the atypicality score for each patent
    patent_cpc_subclass_pairs["atypicality_score"] = patent_cpc_subclass_pairs["cpc_subclass_pairs"].apply(calculate_atypicality_score)
    patent_cpc_subclass_pairs.to_csv("./data/patent_with_atypicality.csv")
else:
    patent_cpc_subclass_pairs = pd.read_csv("./data/patent_with_atypicality.csv", index_col=0)
    patent_cpc_subclass_pairs["cpc_subclass_pairs"] = patent_cpc_subclass_pairs["cpc_subclass_pairs"].apply(ast.literal_eval)


patent_cpc_subclass_pairs.head()

Unnamed: 0_level_0,cpc_subclass_pairs,atypicality_score
patent_id,Unnamed: 1_level_1,Unnamed: 2_level_1
10000000,[],0.0
10000001,"[(B29C, G05B)]",-7.510978
10000002,"[(B29C, B29D), (B29C, B29K), (B29C, B29L), (B2...",-8.119844
10000003,"[(B29C, B29D), (B29C, B29K), (B29C, B29L), (B2...",-9.504956
10000004,"[(B29C, B29K), (B29C, B29L), (B29K, B29L)]",-10.255515


In [110]:
# Step 8: Merge the atypicality scores with the list of cpc subclasses to get the full output
if not os.path.exists("./data/patent_cpc_subclass_pairs.csv"):
    patent_cpc_subclass_pairs_with_atypicality = pd.merge(patent_cpc_subclass_pairs, cpc_subclasses, right_index=True, left_index=True, how="inner", validate="1:1")
patent_cpc_subclass_pairs_with_atypicality.head()

Unnamed: 0_level_0,cpc_subclass_pairs,atypicality_score,cpc_subclasses
patent_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
10000000,[],0.0,{G01S}
10000001,"[(B29C, G05B)]",-7.510978,"{G05B, B29C}"
10000002,"[(B29C, B29D), (B29C, B29K), (B29C, B29L), (B2...",-8.119844,"{B29K, B29D, B32B, B29C, B29L, B60C}"
10000003,"[(B29C, B29D), (B29C, B29K), (B29C, B29L), (B2...",-9.504956,"{B29L, B29K, B29D, B29C}"
10000004,"[(B29C, B29K), (B29C, B29L), (B29K, B29L)]",-10.255515,"{B29L, B29K, B29C}"


In [112]:
# Step 9: Save the final DataFrame to a CSV file
if not os.path.exists("./data/patents_with_atypicality.csv"):
    patent_cpc_subclass_pairs_with_atypicality.to_csv("./data/patents_with_atypicality.csv")

In [114]:
# 10. Validate that the output dataset contains the expected number of patents
num_patents = patent_cpc_subclass_pairs_with_atypicality.shape[0]
print(f"Number of patents in the output dataset: {num_patents}")

Number of patents in the output dataset: 7507819


In [137]:
patent_cpc_subclass_pairs_with_atypicality.isna().sum()

cpc_subclass_pairs         0
atypicality_score          0
cpc_subclasses        763040
dtype: int64

# Technological Leap Score

## Steps:
1. Write the list of CPC subclasses for the new patent
2. Write the list of CPC subclasses for all patents it cites
3. Count the number of overlapping CPC subclasses between these two lists
4. Count the number of total CPC subclasses in the new patent and all patents it cites (union of both lists)
5. Similarity score = 
$$\text{Similarity Score} = \frac{\text{Number of Overlapping CPC Subclasses}}{\text{Total CPC Subclasses in New Patent and Cited Patents}}$$
6. Technological Leap Score = 1 - Similarity Score

In [119]:
# 1. Create a dictionary mapping patent IDs to a list of patent IDs they cite
citations_list = citations.groupby("patent_id")["citation_patent_id"].apply(list)
citations_dict = citations_list.to_dict()

In [138]:
# 2. Create a dictionary mapping patent IDs to a list of their CPC subclasses
cpc_subclasses_dict = cpc_subclasses.fillna("n/a").to_dict()

In [140]:
# 3. Get the intersection of CPC subclasses between a patent and its backward citations
def get_intersect_cpc_subclasses(row):
    patent_id = row.name
    patent_cpc_subclasses = cpc_subclasses_dict.get(patent_id, {})
    if not patent_cpc_subclasses or patent_cpc_subclasses == "n/a":
        return []
    backward_citations = citations_dict.get(patent_id, [])
    if not backward_citations:
        return []
    backward_citations_cpc_subclasses = []
    for citation in backward_citations:
        citation_cpc_subclasses = cpc_subclasses_dict.get(citation, {})
        if not citation_cpc_subclasses or patent_cpc_subclasses == "n/a":
            continue
        if citation_cpc_subclasses:
            backward_citations_cpc_subclasses.extend(citation_cpc_subclasses)
    if not backward_citations_cpc_subclasses:
        return []
    return list(set(patent_cpc_subclasses) & set(backward_citations_cpc_subclasses))

patent_cpc_subclass_pairs_with_atypicality["intersect_cpc_subclasses"] = patent_cpc_subclass_pairs_with_atypicality.apply(get_intersect_cpc_subclasses, axis=1)
patent_cpc_subclass_pairs_with_atypicality.head()

Unnamed: 0_level_0,cpc_subclass_pairs,atypicality_score,cpc_subclasses,intersect_cpc_subclasses
patent_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
10000000,[],0.0,{G01S},[G01S]
10000001,"[(B29C, G05B)]",-7.510978,"{G05B, B29C}",[B29C]
10000002,"[(B29C, B29D), (B29C, B29K), (B29C, B29L), (B2...",-8.119844,"{B29K, B29D, B32B, B29C, B29L, B60C}","[B29K, B29C, B32B]"
10000003,"[(B29C, B29D), (B29C, B29K), (B29C, B29L), (B2...",-9.504956,"{B29L, B29K, B29D, B29C}","[B29L, B29C]"
10000004,"[(B29C, B29K), (B29C, B29L), (B29K, B29L)]",-10.255515,"{B29L, B29K, B29C}","[B29L, B29C]"


In [143]:
# 4. Get the union of CPC subclasses between a patent and its backward citations
def get_union_cpc_subclasses(row):
    patent_id = row.name
    patent_cpc_subclasses = cpc_subclasses_dict.get(patent_id, {})
    if not patent_cpc_subclasses or patent_cpc_subclasses == "n/a":
        return []
    backward_citations = citations_dict.get(patent_id, [])
    if not backward_citations:
        return patent_cpc_subclasses
    backward_citations_cpc_subclasses = []
    for citation in backward_citations:
        citation_cpc_subclasses = cpc_subclasses_dict.get(citation, {})
        if citation_cpc_subclasses and citation_cpc_subclasses != "n/a":
            backward_citations_cpc_subclasses.extend(citation_cpc_subclasses)
    return list(set(patent_cpc_subclasses) | set(backward_citations_cpc_subclasses))

patent_cpc_subclass_pairs_with_atypicality["union_cpc_subclasses"] = patent_cpc_subclass_pairs_with_atypicality.apply(get_union_cpc_subclasses, axis=1)
patent_cpc_subclass_pairs_with_atypicality.head()

Unnamed: 0_level_0,cpc_subclass_pairs,atypicality_score,cpc_subclasses,intersect_cpc_subclasses,union_cpc_subclasses
patent_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
10000000,[],0.0,{G01S},[G01S],"[Y02A, G01S]"
10000001,"[(B29C, G05B)]",-7.510978,"{G05B, B29C}",[B29C],"[G05B, B29C]"
10000002,"[(B29C, B29D), (B29C, B29K), (B29C, B29L), (B2...",-8.119844,"{B29K, B29D, B32B, B29C, B29L, B60C}","[B29K, B29C, B32B]","[B29K, B29D, C09J, B32B, B29C, B29L, H05K, B60..."
10000003,"[(B29C, B29D), (B29C, B29K), (B29C, B29L), (B2...",-9.504956,"{B29L, B29K, B29D, B29C}","[B29L, B29C]","[B29K, B29D, B29C, B29L, B60K]"
10000004,"[(B29C, B29K), (B29C, B29L), (B29K, B29L)]",-10.255515,"{B29L, B29K, B29C}","[B29L, B29C]","[B29K, B32B, B65B, B65D, B29C, B29L]"


In [144]:
# 5. Calculate the Technological Leap Score
# Technological Leap Score = 1 - (Jaccard Similarity)
def calculate_tech_leap_score(patent):
    # If a patent and all its backward citations have no CPC subclasses, this means it has no technological leap.
    if not patent["union_cpc_subclasses"]:
        return 0.0
    # If a patent has no CPC subclasses in common with its backward citations, it has maximum technological leap.
    elif not patent["intersect_cpc_subclasses"]:
        return 1.0
    else:
        return 1 - (len(patent["intersect_cpc_subclasses"]) / len(patent["union_cpc_subclasses"]))
patent_cpc_subclass_pairs_with_atypicality["tech_leap"] = patent_cpc_subclass_pairs_with_atypicality.apply(calculate_tech_leap_score, axis=1)
patent_cpc_subclass_pairs_with_atypicality.head()

Unnamed: 0_level_0,cpc_subclass_pairs,atypicality_score,cpc_subclasses,intersect_cpc_subclasses,union_cpc_subclasses,tech_leap
patent_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
10000000,[],0.0,{G01S},[G01S],"[Y02A, G01S]",0.5
10000001,"[(B29C, G05B)]",-7.510978,"{G05B, B29C}",[B29C],"[G05B, B29C]",0.5
10000002,"[(B29C, B29D), (B29C, B29K), (B29C, B29L), (B2...",-8.119844,"{B29K, B29D, B32B, B29C, B29L, B60C}","[B29K, B29C, B32B]","[B29K, B29D, C09J, B32B, B29C, B29L, H05K, B60...",0.666667
10000003,"[(B29C, B29D), (B29C, B29K), (B29C, B29L), (B2...",-9.504956,"{B29L, B29K, B29D, B29C}","[B29L, B29C]","[B29K, B29D, B29C, B29L, B60K]",0.6
10000004,"[(B29C, B29K), (B29C, B29L), (B29K, B29L)]",-10.255515,"{B29L, B29K, B29C}","[B29L, B29C]","[B29K, B32B, B65B, B65D, B29C, B29L]",0.666667


In [145]:
# 6. Save the final DataFrame to a CSV file
if not os.path.exists("./data/patents_with_atypicality_and_tech_leap.csv"):
    patent_cpc_subclass_pairs_with_atypicality.to_csv("./data/patents_with_atypicality_and_tech_leap.csv")