MPA analysis

Example #1: PCR (Kary Mullis, US4683202: Hall of Fame, Kelly et al. significant patents)

Plan: collect all patents with CPC class C12Q1/686 - Polymerase chain reaction [PCR]

In [None]:
import pandas as pd
import zipfile
import os
import sys

In [None]:
# open the zip file
with zipfile.ZipFile('g_cpc_current.tsv.zip', 'r') as zip_ref:
    # extract the .tsv file as a pandas dataframe
    with zip_ref.open('g_cpc_current.tsv') as file:
        df = pd.read_csv(file, sep='\t', header=0)

In [None]:
df.head()

In [None]:
pcr_patents = df[df['cpc_group']=='C12Q1/686']

In [None]:
pcr_patents

In [None]:
pcr_patent_ids = pcr_patents['patent_id'].tolist()

In [None]:
# open the zip file
with zipfile.ZipFile('g_us_patent_citation.tsv.zip', 'r') as zip_ref:
    # extract the .tsv file as a pandas dataframe
    with zip_ref.open('g_us_patent_citation.tsv') as file:
        citations = pd.read_csv(file, sep='\t', header=0)

In [None]:
citations.head()

In [None]:
citations['patent_id'] = citations['patent_id'].astype(str)
citations['citation_patent_id'] = citations['citation_patent_id'].astype(str)

In [None]:
# convert list to string list
pcr_patent_ids = [str(x) for x in pcr_patent_ids]

In [None]:
pcr_citations = citations[(citations['patent_id'].isin(pcr_patent_ids)) | (citations['citation_patent_id'].isin(pcr_patent_ids))]  

In [None]:
pcr_citations

In [None]:
pcr_citations.to_csv('pcr_citations.tsv', sep='\t', index=False)

In [None]:
pcr_patents

In [None]:
pcr_citations_between = pcr_citations[pcr_citations['patent_id'].isin(pcr_patent_ids) & pcr_citations['citation_patent_id'].isin(pcr_patent_ids)]

In [None]:
pcr_citations_between

In [None]:
import networkx as nx

In [None]:
G = nx.from_pandas_edgelist(pcr_citations_between, 'patent_id', 'citation_patent_id', create_using=nx.DiGraph)

In [None]:
nx.draw(G)

In [None]:
# main path analysis
source_nodes = [n for n in G.nodes() if G.in_degree(n) == 0]
sink_nodes = [n for n in G.nodes() if G.out_degree(n) == 0]

In [None]:
weights = {}
for source in source_nodes:
    for sink in sink_nodes:
        try:
            # Find all paths between source and sink
            paths = list(nx.all_simple_paths(G, source, sink))
            
            # Update weights for each edge in each path
            for path in paths:
                for i in range(len(path)-1):
                    edge = (path[i], path[i+1])
                    weights[edge] = weights.get(edge, 0) + 1
        except nx.NetworkXNoPath:
            continue

In [None]:
import networkx as nx
import pandas as pd
import numpy as np
from datetime import datetime

def conduct_mpa(citation_data, weight_method='spc'):
    """
    Conduct Main Path Analysis on patent citation data.
    
    Parameters:
    citation_data (pd.DataFrame): DataFrame with columns 'citing_patent', 'cited_patent'
    weight_method (str): 'spc' for Search Path Count (default)
    
    Returns:
    tuple: (main_path_edges, edge_weights, path_significance)
    """
    # Create directed graph
    G = nx.DiGraph()
    for _, row in citation_data.iterrows():
        G.add_edge(row['citing_patent'], row['cited_patent'])
    
    # Identify source and sink nodes
    source_nodes = [n for n in G.nodes() if G.in_degree(n) == 0]
    sink_nodes = [n for n in G.nodes() if G.out_degree(n) == 0]
    
    # Calculate traversal weights using SPC
    weights = {}
    for source in source_nodes:
        for sink in sink_nodes:
            try:
                # Find all paths between source and sink
                paths = list(nx.all_simple_paths(G, source, sink))
                
                # Update weights for each edge in each path
                for path in paths:
                    for i in range(len(path)-1):
                        edge = (path[i], path[i+1])
                        weights[edge] = weights.get(edge, 0) + 1
            except nx.NetworkXNoPath:
                continue
    
    # Normalize weights
    if weights:
        max_weight = max(weights.values())
        weights = {k: v/max_weight for k, v in weights.items()}
    
    # Find main path
    main_path_edges = set()
    
    # Transform weights for shortest path calculation
    # (higher traversal weight = shorter path)
    transformed_weights = {edge: 1 - weight 
                         for edge, weight in weights.items()}
    
    # Find paths between all sources and sinks
    for source in source_nodes:
        for sink in sink_nodes:
            try:
                path = nx.shortest_path(G, source, sink, 
                                      weight=lambda u, v, d: transformed_weights[(u, v)])
                
                # Add edges from path to main path
                for i in range(len(path)-1):
                    main_path_edges.add((path[i], path[i+1]))
            except nx.NetworkXNoPath:
                continue
    
    # Calculate path significance
    if main_path_edges:
        path_significance = sum(weights[edge] 
                              for edge in main_path_edges) / len(main_path_edges)
    else:
        path_significance = 0
        
    return main_path_edges, weights, path_significance

def analyze_pcr_patents(patent_data, citation_data):
    """
    Analyze PCR patent dataset using MPA.
    
    Parameters:
    patent_data (pd.DataFrame): DataFrame with patent information
    citation_data (pd.DataFrame): DataFrame with citation relationships
    
    Returns:
    dict: Analysis results
    """
    # Conduct MPA
    main_path, weights, significance = conduct_mpa(citation_data)
    
    # Analyze main path
    main_path_patents = set([p for edge in main_path for p in edge])
    
    # Get chronological order of main path patents
    main_path_info = (patent_data[patent_data['patent_id'].isin(main_path_patents)]
                     .sort_values('grant_date'))
    
    # Calculate additional metrics
    results = {
        'main_path_edges': main_path,
        'edge_weights': weights,
        'path_significance': significance,
        'num_main_path_patents': len(main_path_patents),
        'chronological_path': main_path_info['patent_id'].tolist(),
        'date_range': (main_path_info['grant_date'].min(),
                      main_path_info['grant_date'].max())
    }
    
    return results

# Example usage
if __name__ == "__main__":
    # Example data format
    patent_data = pd.DataFrame({
        'patent_id': ['P1', 'P2', 'P3', 'P4', 'P5'],
        'grant_date': ['1987-01-01', '1988-01-01', '1989-01-01', 
                      '1990-01-01', '1991-01-01']
    })
    
    citation_data = pd.DataFrame({
        'citing_patent': ['P2', 'P3', 'P3', 'P4', 'P5'],
        'cited_patent': ['P1', 'P1', 'P2', 'P3', 'P3']
    })
    
    # Run analysis
    results = analyze_pcr_patents(patent_data, citation_data)
    
    # Print results
    print("Main Path Edges:", results['main_path_edges'])
    print("Path Significance:", results['path_significance'])
    print("Chronological Development:", results['chronological_path'])