# CSCI 4253 / 5253 - Lab #4 - Patent Problem with Spark DataFrames
<div>
 <h2> CSCI 4283 / 5253 
  <IMG SRC="https://www.colorado.edu/cs/profiles/express/themes/cuspirit/logo.png" WIDTH=50 ALIGN="right"/> </h2>
</div>

This [Spark cheatsheet](https://s3.amazonaws.com/assets.datacamp.com/blog_assets/PySpark_SQL_Cheat_Sheet_Python.pdf) is useful as is [this reference on doing joins in Spark dataframe](http://www.learnbymarketing.com/1100/pyspark-joins-by-example/).

The [DataBricks company has one of the better reference manuals for PySpark](https://docs.databricks.com/spark/latest/dataframes-datasets/index.html) -- they show you how to perform numerous common data operations such as joins, aggregation operations following `groupBy` and the like.

In [2]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

The following aggregation functions may be useful -- [these can be used to aggregate results of `groupby` operations](https://docs.databricks.com/spark/latest/dataframes-datasets/introduction-to-dataframes-python.html#example-aggregations-using-agg-and-countdistinct). More documentation is at the [PySpark SQL Functions manual](https://spark.apache.org/docs/2.3.0/api/python/pyspark.sql.html#module-pyspark.sql.functions). Feel free to use other functions from that library.

In [3]:
from pyspark.sql.functions import col, count, countDistinct

Create our session as described in the tutorials

In [4]:
spark = SparkSession \
    .builder \
    .appName("Lab4-Dataframe") \
    .master("local[*]")\
    .getOrCreate()

Read in the citations and patents data and check that the data makes sense. Note that unlike in the RDD solution, the data is automatically inferred to be Integer() types.

In [5]:
citations = spark.read.load('cite75_99.txt.gz',
            format="csv", sep=",", header=True,
            compression="gzip",
            inferSchema="true")

In [6]:
citations.show(5)

+-------+-------+
| CITING|  CITED|
+-------+-------+
|3858241| 956203|
|3858241|1324234|
|3858241|3398406|
|3858241|3557384|
|3858241|3634889|
+-------+-------+
only showing top 5 rows



In [7]:
patents = spark.read.load('apat63_99.txt.gz',
            format="csv", sep=",", header=True,
            compression="gzip",
            inferSchema="true")

In [8]:
patents.show(5)

+-------+-----+-----+-------+-------+-------+--------+-------+------+------+---+------+-----+--------+--------+-------+--------+--------+--------+--------+--------+--------+--------+
| PATENT|GYEAR|GDATE|APPYEAR|COUNTRY|POSTATE|ASSIGNEE|ASSCODE|CLAIMS|NCLASS|CAT|SUBCAT|CMADE|CRECEIVE|RATIOCIT|GENERAL|ORIGINAL|FWDAPLAG|BCKGTLAG|SELFCTUB|SELFCTLB|SECDUPBD|SECDLWBD|
+-------+-----+-----+-------+-------+-------+--------+-------+------+------+---+------+-----+--------+--------+-------+--------+--------+--------+--------+--------+--------+--------+
|3070801| 1963| 1096|   NULL|     BE|   NULL|    NULL|      1|  NULL|   269|  6|    69| NULL|       1|    NULL|    0.0|    NULL|    NULL|    NULL|    NULL|    NULL|    NULL|    NULL|
|3070802| 1963| 1096|   NULL|     US|     TX|    NULL|      1|  NULL|     2|  6|    63| NULL|       0|    NULL|   NULL|    NULL|    NULL|    NULL|    NULL|    NULL|    NULL|    NULL|
|3070803| 1963| 1096|   NULL|     US|     IL|    NULL|      1|  NULL|     2|  6|    6

### We create a smaller DataFrame called "patents_states" containing only PATENT, COUNTRY, POSTATE

### These are the columns we’ll need later to check if citations happen within the same state.  


In [9]:
patents_states = patents.select("PATENT", "COUNTRY", "POSTATE")
patents_states.show(5)

+-------+-------+-------+
| PATENT|COUNTRY|POSTATE|
+-------+-------+-------+
|3070801|     BE|   NULL|
|3070802|     US|     TX|
|3070803|     US|     IL|
|3070804|     US|     OH|
|3070805|     US|     CA|
+-------+-------+-------+
only showing top 5 rows



### We rename columns in patents_states to avoid conflicts later (PC_PATENT, PC_POSTATE).  
### Then we do a join. 
- Match citations.CITED with PC_PATENT.  
- This gives us the state information of the cited patent.  

Now each citation row includes where the cited patent is from.  


In [10]:
pc = patents_states.withColumnRenamed("PATENT", "PC_PATENT") \
                   .withColumnRenamed("POSTATE", "PC_POSTATE") \
                   .withColumnRenamed("COUNTRY", "PC_COUNTRY")

cited_join = citations.join(pc, citations.CITED == col("PC_PATENT"), how="left")

# show the joined table (example)
cited_join.select("CITING", "CITED", "PC_POSTATE").show(10, truncate=False)


+-------+-------+----------+
|CITING |CITED  |PC_POSTATE|
+-------+-------+----------+
|3858242|1515701|NULL      |
|3858242|3319261|OH        |
|3858241|3634889|OH        |
|3858241|956203 |NULL      |
|3858241|1324234|NULL      |
|3858243|2949611|NULL      |
|3858243|3146465|MI        |
|3858241|3398406|FL        |
|3858241|3557384|MA        |
|3858242|3668705|WI        |
+-------+-------+----------+
only showing top 10 rows



### We do a second join, this time renaming columns to P2_PATENT, P2_POSTATE.  
We join citations.CITING with P2_PATENT.  

Now each row has both:  
- The state of the cited patent (PC_POSTATE)  
- The state of the citing patent (P2_POSTATE)  


In [11]:
p2 = patents_states.withColumnRenamed("PATENT", "P2_PATENT") \
                   .withColumnRenamed("POSTATE", "P2_POSTATE") \
                   .withColumnRenamed("COUNTRY", "P2_COUNTRY")

cited_citing_join = cited_join.join(p2, cited_join.CITING == col("P2_PATENT"), how="left")

cited_citing_join.select("CITING", "CITED", "PC_POSTATE", "P2_POSTATE").show(10, truncate=False)


+-------+-------+----------+----------+
|CITING |CITED  |PC_POSTATE|P2_POSTATE|
+-------+-------+----------+----------+
|3858242|1515701|NULL      |MI        |
|3858242|3319261|OH        |MI        |
|3858242|3668705|WI        |MI        |
|3858242|3707004|WI        |MI        |
|3858243|2949611|NULL      |NULL      |
|3858243|3146465|MI        |NULL      |
|3858241|3634889|OH        |MA        |
|3858241|956203 |NULL      |MA        |
|3858241|1324234|NULL      |MA        |
|3858241|3398406|FL        |MA        |
+-------+-------+----------+----------+
only showing top 10 rows



### We clean and filter the data:  
- Remove empty state values.  
- Convert state codes to uppercase (to avoid mismatch like "ca" vs "CA").  
- Keep only rows where PC_POSTATE == P2_POSTATE.  

The result is citations where both patents are from the same U.S. state.  


In [12]:
from pyspark.sql.functions import trim, upper

filtered = cited_citing_join.filter(
    (trim(col("PC_POSTATE")) != "") &
    (trim(col("P2_POSTATE")) != "") &
    (upper(trim(col("PC_POSTATE"))) == upper(trim(col("P2_POSTATE"))))
)

filtered = filtered.cache()

filtered.select("CITING", "CITED", "PC_POSTATE", "P2_POSTATE").show(10, truncate=False)


+-------+-------+----------+----------+
|CITING |CITED  |PC_POSTATE|P2_POSTATE|
+-------+-------+----------+----------+
|4178878|3464385|AK        |AK        |
|3974004|3745074|AL        |AL        |
|3974004|3585090|AL        |AL        |
|3974004|3692600|AL        |AL        |
|3974004|3762972|AL        |AL        |
|4554823|3373564|AL        |AL        |
|4698246|3972467|AL        |AL        |
|4701360|3972467|AL        |AL        |
|5078406|5026073|AL        |AL        |
|5701722|5623808|AL        |AL        |
+-------+-------+----------+----------+
only showing top 10 rows



We join the cnt DataFrame (counts) with the main patents DataFrame.  
- If a patent doesn’t appear in cnt, we fill its SAME_STATE count with 0 (coalesce).  
- Finally, we sort patents by SAME_STATE in descending order.  

The result shows the top patents that cite the most within their own state.  

In [13]:
from pyspark.sql.functions import count

cnt = filtered.groupBy("CITING").agg(count("*").alias("SAME_STATE"))

cnt.show(10, truncate=False)


+-------+----------+
|CITING |SAME_STATE|
+-------+----------+
|4240165|3         |
|5096364|2         |
|5122917|5         |
|5203482|2         |
|5583013|9         |
|5409826|2         |
|4053654|1         |
|4829378|1         |
|5393360|2         |
|4781565|2         |
+-------+----------+
only showing top 10 rows



In [20]:
from pyspark.sql.functions import coalesce, lit

augmented = patents.join(cnt, patents.PATENT == cnt.CITING, how="left").drop("CITING")
augmented = augmented.withColumn("SAME_STATE", coalesce(col("SAME_STATE"), lit(0)))

top10 = augmented.orderBy(col("SAME_STATE").desc()).select("*")
top10.show(10, truncate=False)


+-------+-----+-----+-------+-------+-------+--------+-------+------+------+---+------+-----+--------+--------+-------+--------+--------+--------+--------+--------+--------+--------+----------+
|PATENT |GYEAR|GDATE|APPYEAR|COUNTRY|POSTATE|ASSIGNEE|ASSCODE|CLAIMS|NCLASS|CAT|SUBCAT|CMADE|CRECEIVE|RATIOCIT|GENERAL|ORIGINAL|FWDAPLAG|BCKGTLAG|SELFCTUB|SELFCTLB|SECDUPBD|SECDLWBD|SAME_STATE|
+-------+-----+-----+-------+-------+-------+--------+-------+------+------+---+------+-----+--------+--------+-------+--------+--------+--------+--------+--------+--------+--------+----------+
|5959466|1999 |14515|1997   |US     |CA     |5310    |2      |NULL  |326   |4  |46    |159  |0       |1.0     |NULL   |0.6186  |NULL    |4.8868  |0.0455  |0.044   |NULL    |NULL    |125       |
|5983822|1999 |14564|1998   |US     |TX     |569900  |2      |NULL  |114   |5  |55    |200  |0       |0.995   |NULL   |0.7201  |NULL    |12.45   |0.0     |0.0     |NULL    |NULL    |103       |
|6008204|1999 |14606|1998   |U