In [1]:
# input: google patents tables
# output: pat_metrics table

In [2]:
spark = spark_init()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/04/24 17:29:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
sp.set_option("compute.default_index_type", "distributed")

In [4]:
path = "/mnt/nas/google_patent/google_patent/"

In [5]:
# read IPC table and take the first 4 digits as ipc4 classification
ipc = spark_pq(path + "ipc")

In [6]:
ipc = ipc.withColumn("ipc4", F.substring("ipc", 1, 4))

In [7]:
patent_scope = ipc.select("patnum", "ipc4").drop_duplicates()
patent_scope = (
    patent_scope.groupby(["patnum"])
    .agg(F.countDistinct("ipc4").alias("scope"))
    .to_pandas_on_spark()
)

In [8]:
# read in patent information
pat = spark_pq(path + "patent").select("patnum", "grant_date").filter("grant_date>0")

In [9]:
pat = pat.withColumn(
    "grant_date",
    F.to_timestamp(F.col("grant_date").cast(StringType()), "yyyyMMdd").cast(DateType()),
)

In [10]:
# read citation table, keep US patents and only utility patents
cite = spark.sql(
    f"""select patnum,cast(pub_number as integer) as citation 
        from parquet.`{path+'citation'}` where country='US' order by patnum"""
).dropna()

In [11]:
# find grant date for citing patents
cite = cite.join(pat, "patnum")

In [12]:
pat = pat.withColumnRenamed("patnum", "citation").withColumnRenamed(
    "grant_date", "grant_date_cite"
)

In [13]:
# find grant date for cited patents
cite = cite.join(pat, "citation")

In [14]:
cite = cite.withColumn(
    "days", F.datediff(F.col("grant_date"), F.col("grant_date_cite"))
)

In [15]:
cite = cite.to_pandas_on_spark()

In [16]:
bcite = cite.groupby("patnum").citation.nunique().rename("bcite").reset_index()
fcite = cite.groupby("citation").patnum.nunique().rename("fcite").reset_index()

In [17]:
fcite.columns = ["patnum", "fcite"]

In [18]:
pat

DataFrame[citation: int, grant_date_cite: date]

In [19]:
x = ipc.select("patnum", F.substring("ipc", 1, 4).alias("tech"))

In [20]:
breakthru = (
    fcite.to_spark()
    .join(x, on=["patnum"])
    .join(pat.withColumnRenamed("citation", "patnum"), on=["patnum"])
).drop_duplicates()

In [21]:
breakthru = breakthru.withColumn("year", F.year("grant_date_cite"))

In [22]:
breakthru = breakthru.select(
    "patnum",'fcite',
    F.percent_rank()
    .over(Window.partitionBy(["tech", "year"]).orderBy(F.col('fcite').desc()))
    .alias("perc"),
).filter("perc<0.01").select('patnum').drop_duplicates().withColumn('breakthru',F.lit(1))

In [23]:
ipc = ipc.select("patnum", "ipc", "ipc4").to_pandas_on_spark()

In [24]:
gener = cite[cite.days < 3650]  # for generality, use recent 10 years of forw. citations
gener = gener[["patnum", "citation"]].merge(ipc)
gener.columns = ["citation", "patnum", "ipc", "ipc4"]

In [25]:
beta = gener.merge(gener.groupby(["patnum"]).ipc.nunique().rename("Tin").reset_index())
beta = beta.merge(
    gener.groupby(["patnum", "ipc4"]).ipc.nunique().rename("Tjin").reset_index()
)
beta["beta"] = beta.Tjin / beta.Tin

beta = beta[["patnum", "ipc4", "beta"]].drop_duplicates()

gener = gener[["patnum", "citation", "ipc4"]].drop_duplicates()
gener = gener.merge(beta).groupby(["patnum", "ipc4"]).beta.mean().reset_index()
gener.beta = gener.beta ** 2
gener = (1 - gener.groupby("patnum").beta.sum().rename("generality")).reset_index()

In [26]:
orgin = cite[["patnum", "citation"]].merge(ipc.rename({"patnum": "citation"}, axis=1))
orgin = orgin[["patnum", "citation", "ipc"]].drop_duplicates()
orgin = (
    orgin[["patnum", "ipc"]]
    .drop_duplicates()
    .merge(orgin.groupby(["patnum", "ipc"]).ipc.size().rename("spj").reset_index())
)
orgin = orgin[["patnum", "spj"]].merge(
    orgin.groupby(["patnum"]).spj.sum().rename("sum_spj").reset_index()
)
orgin["sum_spj"] = (orgin.spj / orgin.sum_spj) ** 2

orgin = (1 - orgin.groupby("patnum").sum_spj.sum().rename("orginality")).reset_index()

In [27]:
claims = spark.sql(
    f"""select patnum,count(claims) as num_claims 
                    from parquet.`{path+'claims'}` group by patnum"""
).to_pandas_on_spark()

In [28]:
patstat = (
    patent_scope.merge(fcite, how="left")
    .merge(bcite, how="left")
    .fillna(0)
    .merge(gener, how="left")
    .merge(orgin, how="left")
    .merge(claims, how="left")
)

In [29]:
ipc = spark.sql(
    f"""select patnum,substr(ipc,1,4)as ipc4,count(substr(ipc,1,4)) as n from parquet.`{path+'ipc'}`
                    group by patnum,ipc4 order by patnum,n desc
"""
)

In [30]:
ipc = ipc.drop_duplicates(["patnum"]).select("patnum", "ipc4").to_pandas_on_spark()

In [31]:
grant = sp.read_parquet(path + "patent", columns=["patnum", "grant_date"])

In [32]:
df = grant.merge(patstat, how="left").merge(ipc, how="left").merge(breakthru.to_pandas_on_spark(),how='left')

In [33]:
df.breakthru.fillna(0,inplace=True)

In [None]:
df.to_parquet("pat_metrics.pa")

[Stage 81:(119 + 81) / 200][Stage 84:>(0 + 47) / 200][Stage 86:> (0 + 0) / 200]]

In [None]:
df = rpq("pat_metrics.pa/")