# CSCI 4253 / 5253 - Lab #4 - Patent Problem with Spark DataFrames
<div>
 <h2> CSCI 4283 / 5253 
  <IMG SRC="https://www.colorado.edu/cs/profiles/express/themes/cuspirit/logo.png" WIDTH=50 ALIGN="right"/> </h2>
</div>

This [Spark cheatsheet](https://s3.amazonaws.com/assets.datacamp.com/blog_assets/PySpark_SQL_Cheat_Sheet_Python.pdf) is useful as is [this reference on doing joins in Spark dataframe](http://www.learnbymarketing.com/1100/pyspark-joins-by-example/).

The [DataBricks company has one of the better reference manuals for PySpark](https://docs.databricks.com/spark/latest/dataframes-datasets/index.html) -- they show you how to perform numerous common data operations such as joins, aggregation operations following `groupBy` and the like.

In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

The following aggregation functions may be useful -- [these can be used to aggregate results of `groupby` operations](https://docs.databricks.com/spark/latest/dataframes-datasets/introduction-to-dataframes-python.html#example-aggregations-using-agg-and-countdistinct). More documentation is at the [PySpark SQL Functions manual](https://spark.apache.org/docs/2.3.0/api/python/pyspark.sql.html#module-pyspark.sql.functions). Feel free to use other functions from that library.

In [2]:
from pyspark.sql.functions import col, count, countDistinct

Create our session as described in the tutorials

In [3]:
spark = SparkSession \
    .builder \
    .appName("Lab4-Dataframe") \
    .master("local[*]")\
    .getOrCreate()

Read in the citations and patents data and check that the data makes sense. Note that unlike in the RDD solution, the data is automatically inferred to be Integer() types.

In [4]:
citations = spark.read.load('cite75_99.txt.gz',
            format="csv", sep=",", header=True,
            compression="gzip",
            inferSchema="true")

In [5]:
citations.show(5)

+-------+-------+
| CITING|  CITED|
+-------+-------+
|3858241| 956203|
|3858241|1324234|
|3858241|3398406|
|3858241|3557384|
|3858241|3634889|
+-------+-------+
only showing top 5 rows



In [6]:
patents = spark.read.load('apat63_99.txt.gz',
            format="csv", sep=",", header=True,
            compression="gzip",
            inferSchema="true")

In [7]:
patents.show(5)

+-------+-----+-----+-------+-------+-------+--------+-------+------+------+---+------+-----+--------+--------+-------+--------+--------+--------+--------+--------+--------+--------+
| PATENT|GYEAR|GDATE|APPYEAR|COUNTRY|POSTATE|ASSIGNEE|ASSCODE|CLAIMS|NCLASS|CAT|SUBCAT|CMADE|CRECEIVE|RATIOCIT|GENERAL|ORIGINAL|FWDAPLAG|BCKGTLAG|SELFCTUB|SELFCTLB|SECDUPBD|SECDLWBD|
+-------+-----+-----+-------+-------+-------+--------+-------+------+------+---+------+-----+--------+--------+-------+--------+--------+--------+--------+--------+--------+--------+
|3070801| 1963| 1096|   null|     BE|   null|    null|      1|  null|   269|  6|    69| null|       1|    null|    0.0|    null|    null|    null|    null|    null|    null|    null|
|3070802| 1963| 1096|   null|     US|     TX|    null|      1|  null|     2|  6|    63| null|       0|    null|   null|    null|    null|    null|    null|    null|    null|    null|
|3070803| 1963| 1096|   null|     US|     IL|    null|      1|  null|     2|  6|    6

In [8]:
citation_database = citations.alias('citation_database')
patent_database = patents.alias('patent_database')

To treat the data received from the files as tables naming them with some Alias names

In [None]:
# Map Reduce Phase - 1
mapper1 = citation_database.join(patent_database, patent_database.PATENT == citation_database.CITING, how="full")
mapper1 = mapper1.filter(mapper1.CITING.isNotNull())
reducer1 = mapper1.drop('PATENT', 'GYEAR', 'GDATE', 'APPYEAR', 'COUNTRY', 'ASSIGNEE',
                                         'ASSCODE', 'CLAIMS', 'NCLASS', 'CAT', 'SUBCAT', 'CMADE',
                                         'CRECEIVE', 'RATIOCIT', 'GENERAL', 'ORIGINAL', 'FWDAPLAG',
                                         'BCKGTLAG', 'SELFCTUB', 'SELFCTLB', 'SECDUPBD', 'SECDLWBD')
reducer1 = reducer1.withColumnRenamed("POSTATE", "POSTATE_1")

In the Map reduce phase 1, we join the data received from the files, similar to the join we made in the lab 3.

After performing the full outer join, we filter the null values.

After filter the null values, drop the columns which are not required for our result.

Rename the column of the state, so we don't get duplicate column names in the further steps.

In [10]:
# Map Reduce Phase - 2
mapper2 = reducer1.join(patent_database, patent_database.PATENT == reducer1.CITED, how="right")
mapper2 = mapper2.filter(mapper2.CITING.isNotNull())
reducer2 = mapper2.drop('PATENT', 'GYEAR', 'GDATE', 'APPYEAR', 'COUNTRY', 'ASSIGNEE',
                                     'ASSCODE', 'CLAIMS', 'NCLASS', 'CAT', 'SUBCAT', 'CMADE',
                                     'CRECEIVE', 'RATIOCIT', 'GENERAL', 'ORIGINAL', 'FWDAPLAG',
                                     'BCKGTLAG', 'SELFCTUB', 'SELFCTLB', 'SECDUPBD', 'SECDLWBD')

In the phase 2, it is similar to the phase 1, where we perform the join and eliminate null valueas and drop the unnecessary columns

In [11]:
# Map Reduce Phase - 3
mapper3 = reducer2.filter(
        reducer2.POSTATE_1 == reducer2.POSTATE)
mapper3_sort= mapper3.sort("CITING")
mapper3_sort = mapper3_sort.filter(mapper3_sort.POSTATE_1.isNotNull())
mapper3_sort = mapper3_sort.filter(mapper3_sort.POSTATE.isNotNull())
distinct_columns = mapper3_sort.groupBy('CITING').count().select('CITING', col('count').alias('n'))

In the phase 3, we filter out the citatons which are from same state and count them and make it as a separate column.

In [12]:
reducer3 = patent_database.join(distinct_columns, patent_database.PATENT == distinct_columns.CITING, how="full")
final_table = reducer3.drop('CITING')

In the final reducer phase we perform the join of result and initial patent data and drop the additional columns.

In [13]:
assert final_table.filter(final_table.PATENT == 6009541).collect()[0][-1] == 43
assert final_table.filter(final_table.PATENT == 5959466).collect()[0][-1] == 125