Importing Spark Context and Spark Conf from pyspark,
Importing Spark Session from pyspark.sql,
Importing np from numpy, 
Importing pd from pandas and
importing operator

In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import numpy as np
import pandas as pd
import operator

Importing column, count and distinct count from pyspark.sql.functions

In [2]:
from pyspark.sql.functions import col, count, countDistinct

Creating a spark session with builder, application name and master core

In [3]:
spark = SparkSession\
        .builder\
        .appName("Lab4-DataFrame")\
        .master("local[*]")\
        .getOrCreate()

Read in the citations and patents data and check that the data makes sense

In [4]:
citations = spark.read.load ('cite75_99.txt.gz', 
                    format = "csv", sep = ",", header = True,
                             compression = "gzip", inferSchema = "true"
                            )

In [5]:
citations.show(5)


+-------+-------+
| CITING|  CITED|
+-------+-------+
|3858241| 956203|
|3858241|1324234|
|3858241|3398406|
|3858241|3557384|
|3858241|3634889|
+-------+-------+
only showing top 5 rows



In [6]:
patents = spark.read.load('apat63_99.txt.gz',
            format="csv", sep=",", header=True,
            compression="gzip",
            inferSchema="true")

In [7]:
patents.show(5)

+-------+-----+-----+-------+-------+-------+--------+-------+------+------+---+------+-----+--------+--------+-------+--------+--------+--------+--------+--------+--------+--------+
| PATENT|GYEAR|GDATE|APPYEAR|COUNTRY|POSTATE|ASSIGNEE|ASSCODE|CLAIMS|NCLASS|CAT|SUBCAT|CMADE|CRECEIVE|RATIOCIT|GENERAL|ORIGINAL|FWDAPLAG|BCKGTLAG|SELFCTUB|SELFCTLB|SECDUPBD|SECDLWBD|
+-------+-----+-----+-------+-------+-------+--------+-------+------+------+---+------+-----+--------+--------+-------+--------+--------+--------+--------+--------+--------+--------+
|3070801| 1963| 1096|   null|     BE|   null|    null|      1|  null|   269|  6|    69| null|       1|    null|    0.0|    null|    null|    null|    null|    null|    null|    null|
|3070802| 1963| 1096|   null|     US|     TX|    null|      1|  null|     2|  6|    63| null|       0|    null|   null|    null|    null|    null|    null|    null|    null|    null|
|3070803| 1963| 1096|   null|     US|     IL|    null|      1|  null|     2|  6|    6

Filter out the elements from patent database within the state sub-column having null value

In [8]:
patent_nnull = patents.filter("POSTATE is not NULL").select("PATENT","POSTATE")

Selecting patent and state from patent database

In [9]:
patent1 = patent_nnull.select("PATENT","POSTATE")

Selecting Cited and Citing from the citations database

In [10]:
citations1 = citations.select('CITED','CITING')

Join citation1 and patent1 database from the one selected above on basis of cited and patent in ascending order of cited and save in variable group1

In [11]:
group1 = citations1.join(patent1, patent1.PATENT == citations1.CITED).orderBy('CITED',ascending = True).select('CITED','POSTATE','CITING')

Rename Cited as Cited1, Postate as Postate1 and Citing as Citing1  

In [12]:
rename1 = group1.selectExpr("CITED as CITED1", "POSTATE as POSTATE1", "CITING as CITING1")

Join citation1 and patent1 database from the one selected above on basis of citing and patent in ascending order of cited and save in variable group2

In [13]:
group2 = citations1.join(patent1, patent1.PATENT == citations1.CITING).select('CITING','POSTATE','PATENT','CITED')

Create an intermediate table from rename1 datasets and patent1 datasets and arrange it in ascending order of Cited1.

In [14]:
inter = rename1.join(patent1,patent1.PATENT == rename1.CITING1).orderBy('CITED1',ascending = True).select('CITED1','POSTATE1','CITING1','POSTATE')

In [15]:
inter.filter(inter.POSTATE1 == inter.POSTATE).orderBy('CITED1',ascending = True).show(5)

+-------+--------+-------+-------+
| CITED1|POSTATE1|CITING1|POSTATE|
+-------+--------+-------+-------+
|3070804|      OH|4694843|     OH|
|3070808|      IA|4731077|     IA|
|3070811|      CA|4407027|     CA|
|3070811|      CA|5533214|     CA|
|3070811|      CA|4385407|     CA|
+-------+--------+-------+-------+
only showing top 5 rows



Now we iterate the count when state matches for each patent and print it out and store it in the count  i.e. count stands for same state citations.

In [32]:
count = inter.filter(inter.POSTATE1 == inter.POSTATE).groupBy('CITED1').count().orderBy('CITED1',ascending = True).select('CITED1','count')

Now, we join the patents table with the count table and arrange it in descending order of the count and show top 10 count of the same state citations

In [33]:
patents.join(count, count.CITED1 == patents.PATENT).orderBy('count', ascending = False).select('PATENT','GYEAR','GDATE','APPYEAR','COUNTRY','POSTATE','ASSIGNEE','ASSCODE','CLAIMS','NCLASS','CAT','SUBCAT','CMADE','CRECEIVE','RATIOCIT','GENERAL','ORIGINAL','FWDAPLAG','BCKGTLAG','SELFCTUB','SELFCTLB','SECDUPBD','SECDLWBD','count').show(10)  

+-------+-----+-----+-------+-------+-------+--------+-------+------+------+---+------+-----+--------+--------+-------+--------+--------+--------+--------+--------+--------+--------+-----+
| PATENT|GYEAR|GDATE|APPYEAR|COUNTRY|POSTATE|ASSIGNEE|ASSCODE|CLAIMS|NCLASS|CAT|SUBCAT|CMADE|CRECEIVE|RATIOCIT|GENERAL|ORIGINAL|FWDAPLAG|BCKGTLAG|SELFCTUB|SELFCTLB|SECDUPBD|SECDLWBD|count|
+-------+-----+-----+-------+-------+-------+--------+-------+------+------+---+------+-----+--------+--------+-------+--------+--------+--------+--------+--------+--------+--------+-----+
|3845770| 1974| 5422|   1972|     US|     CA|   22715|      2|  null|   424|  3|    31| null|     339|    null| 0.5152|    null| 17.0118|    null|    null|    null|  0.7818|  0.7611|  268|
|5111638| 1992|11820|   1991|     US|     IL|  251075|      2|    16|    53|  6|    68|   39|     254|  0.4872| 0.6286|  0.7867|  4.3307| 32.0256|  0.1818|  0.0513|  0.1633|  0.1614|  247|
|4733521| 1988|10315|   1986|     US|     IL|  251075| 

Now, we join the patents table with the count table and arrange it in ascending order of the patents

In [25]:
patents.join(count, count.CITED1 == patents.PATENT).orderBy('PATENT', ascending = True).select('PATENT','GYEAR','GDATE','APPYEAR','COUNTRY','POSTATE','ASSIGNEE','ASSCODE','CLAIMS','NCLASS','CAT','SUBCAT','CMADE','CRECEIVE','RATIOCIT','GENERAL','ORIGINAL','FWDAPLAG','BCKGTLAG','SELFCTUB','SELFCTLB','SECDUPBD','SECDLWBD','count').show(10)  

+-------+-----+-----+-------+-------+-------+--------+-------+------+------+---+------+-----+--------+--------+-------+--------+--------+--------+--------+--------+--------+--------+-----+
| PATENT|GYEAR|GDATE|APPYEAR|COUNTRY|POSTATE|ASSIGNEE|ASSCODE|CLAIMS|NCLASS|CAT|SUBCAT|CMADE|CRECEIVE|RATIOCIT|GENERAL|ORIGINAL|FWDAPLAG|BCKGTLAG|SELFCTUB|SELFCTLB|SECDUPBD|SECDLWBD|count|
+-------+-----+-----+-------+-------+-------+--------+-------+------+------+---+------+-----+--------+--------+-------+--------+--------+--------+--------+--------+--------+--------+-----+
|3070804| 1963| 1096|   null|     US|     OH|    null|      1|  null|     2|  6|    63| null|       3|    null| 0.6667|    null|    null|    null|    null|    null|    null|    null|    1|
|3070808| 1963| 1096|   null|     US|     IA|    null|      1|  null|   623|  3|    39| null|       4|    null|  0.375|    null|    null|    null|    null|    null|    null|    null|    1|
|3070811| 1963| 1096|   null|     US|     CA|    null| 