In [2]:
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("PatentDataAnalysis").getOrCreate()


In [34]:
df = spark.read.csv("Weekly_Patent_Application_Granted.csv", header=True, inferSchema=True)
df.show(5)

+------------------+----------------+------------+---------------------+------------------+--------------+--------------------+------------------+-----------+------------+
|PUBLICATION_NUMBER|PUBLICATION_DATE|IPO_LOCATION|APPLICATION_TYPE_DESC|APPLICATION_NUMBER|DATE_OF_FILING|  TITLE_OF_INVENTION|FIELD_OF_INVENTION|NO_OF_PAGES|NO_OF_CLAIMS|
+------------------+----------------+------------+---------------------+------------------+--------------+--------------------+------------------+-----------+------------+
|           50/2016|      02/12/2016|     Kolkata| CONVENTION APPLIC...|      108/KOL/2009|    20/01/2009|MULTI-SPEED TRANS...|        MECHANICAL|         26|          21|
|           50/2016|      02/12/2016|     Kolkata| PCT NATIONAL PHAS...|   1777/KOLNP/2010|    17/05/2010|PROCESS FOR PREPA...|         CHEMISTRY|         38|          12|
|           52/2016|      16/12/2016|     Kolkata| PCT NATIONAL PHAS...|   2277/KOLNP/2010|    22/06/2010|TEXTILE SEMIFINIS...|           PO

In [35]:
df.printSchema()

root
 |-- PUBLICATION_NUMBER: string (nullable = true)
 |-- PUBLICATION_DATE: string (nullable = true)
 |-- IPO_LOCATION: string (nullable = true)
 |-- APPLICATION_TYPE_DESC: string (nullable = true)
 |-- APPLICATION_NUMBER: string (nullable = true)
 |-- DATE_OF_FILING: string (nullable = true)
 |-- TITLE_OF_INVENTION: string (nullable = true)
 |-- FIELD_OF_INVENTION: string (nullable = true)
 |-- NO_OF_PAGES: string (nullable = true)
 |-- NO_OF_CLAIMS: string (nullable = true)



In [36]:
from pyspark.sql.functions import col

# Drop duplicates
df = df.dropDuplicates()
df = df.na.drop(subset=["PUBLICATION_DATE", "DATE_OF_FILING", "FIELD_OF_INVENTION"])


In [37]:
from pyspark.sql.functions import expr

df = df.withColumn("PUBLICATION_DATE",
                   expr("try_to_timestamp(PUBLICATION_DATE, 'dd/MM/yyyy')"))
df = df.withColumn("DATE_OF_FILING",
                   expr("try_to_timestamp(DATE_OF_FILING, 'dd/MM/yyyy')"))


In [38]:
from pyspark.sql.functions import col

df = df.filter(col("PUBLICATION_DATE").isNotNull() & col("DATE_OF_FILING").isNotNull())


In [39]:
from pyspark.sql.functions import year, month, datediff

df = df.withColumn("YEAR", year(col("PUBLICATION_DATE")))
df = df.withColumn("FILING_YEAR", year(col("DATE_OF_FILING")))
df = df.withColumn("FILING_MONTH", month(col("DATE_OF_FILING")))
df = df.withColumn("Time_to_Grant", datediff(col("PUBLICATION_DATE"), col("DATE_OF_FILING")))

# Filter out negative differences
df = df.filter(col("Time_to_Grant") >= 0)


In [40]:
df = df.filter(col("Time_to_Grant") >= 0)

In [41]:
df.printSchema()

root
 |-- PUBLICATION_NUMBER: string (nullable = true)
 |-- PUBLICATION_DATE: timestamp (nullable = true)
 |-- IPO_LOCATION: string (nullable = true)
 |-- APPLICATION_TYPE_DESC: string (nullable = true)
 |-- APPLICATION_NUMBER: string (nullable = true)
 |-- DATE_OF_FILING: timestamp (nullable = true)
 |-- TITLE_OF_INVENTION: string (nullable = true)
 |-- FIELD_OF_INVENTION: string (nullable = true)
 |-- NO_OF_PAGES: string (nullable = true)
 |-- NO_OF_CLAIMS: string (nullable = true)
 |-- YEAR: integer (nullable = true)
 |-- FILING_YEAR: integer (nullable = true)
 |-- FILING_MONTH: integer (nullable = true)
 |-- Time_to_Grant: integer (nullable = true)



In [42]:
import pandas as pd
pdf = df.toPandas()

pdf.to_csv("cleaned_data.csv", index=False)

In [43]:
print("Saved output")

Saved output
