# CSCI 4253 / 5253 - Lab #4 - Patent Problem with Spark RDD - SOLUTION
<div>
 <h2> CSCI 4283 / 5253 
  <IMG SRC="https://www.colorado.edu/cs/profiles/express/themes/cuspirit/logo.png" WIDTH=50 ALIGN="right"/> </h2>
</div>

This [Spark cheatsheet](https://s3.amazonaws.com/assets.datacamp.com/blog_assets/PySpark_SQL_Cheat_Sheet_Python.pdf) is useful

In [1]:
from pyspark import SparkContext, SparkConf
import numpy as np
import operator

In [2]:
conf=SparkConf().setAppName("Lab4-rdd").setMaster("local[*]")
sc = SparkContext(conf=conf)

Using PySpark and RDD's on the https://coding.csel.io machines is slow -- most of the code is executed in Python and this is much less efficient than the java-based code using the PySpark dataframes. Be patient and trying using `.cache()` to cache the output of joins. You may want to start with a reduced set of data before running the full task. You can use the `sample()` method to extract just a sample of the data or use 

These two RDD's are called "rawCitations" and "rawPatents" because you probably want to process them futher (e.g. convert them to integer types, etc). 

The `textFile` function returns data in strings. This should work fine for this lab.

Other methods you use might return data in type `Byte`. If you haven't used Python `Byte` types before, google it. You can convert a value of `x` type byte into e.g. a UTF8 string using `x.decode('uft-8')`. Alternatively, you can use the `open` method of the gzip library to read in all the lines as UTF-8 strings like this:
```
import gzip
with gzip.open('cite75_99.txt.gz', 'rt',encoding='utf-8') as f:
    rddCitations = sc.parallelize( f.readlines() )
```
This is less efficient than using `textFile` because `textFile` would use the underlying HDFS or other file system to read the file across all the worker nodes while the using `gzip.open()...readlines()` will read all the data in the frontend and then distribute it to all the worker nodes.

In [3]:
# rddCitations = sc.textFile("cite75_99.txt.gz").sample(False, 0.05)
# rddPatents = sc.textFile("apat63_99.txt.gz").sample(False, 0.05)

rddCitations = sc.textFile("cite75_99.txt.gz")
rddPatents = sc.textFile("apat63_99.txt.gz")

The data looks like the following.

In [33]:
rddCitations.take(5)

['3858245,2072303',
 '3858250,3324482',
 '3858250,3331084',
 '3858251,1106732',
 '3858255,3284812']

In [34]:
rddPatents.take(5)

['"PATENT","GYEAR","GDATE","APPYEAR","COUNTRY","POSTATE","ASSIGNEE","ASSCODE","CLAIMS","NCLASS","CAT","SUBCAT","CMADE","CRECEIVE","RATIOCIT","GENERAL","ORIGINAL","FWDAPLAG","BCKGTLAG","SELFCTUB","SELFCTLB","SECDUPBD","SECDLWBD"',
 '3070825,1963,1096,,"US","IL",,1,,401,1,12,,7,,0.6531,,,,,,,',
 '3070852,1963,1096,,"US","KY",,2,,49,5,59,,7,,0.2449,,,,,,,',
 '3070906,1963,1096,,"US","NY",,2,,434,6,69,,6,,0.6667,,,,,,,',
 '3070939,1963,1096,,"US","MN",,1,,56,6,61,,0,,,,,,,,,']

In other words, they are a single string with multiple CSV's. You will need to convert these to (K,V) pairs, probably convert the keys to `int` and so on. You'll need to `filter` out the header string as well since there's no easy way to extract all the lines except the first.

Removing header from citations

In [4]:
citationsHeader = rddCitations.first() #extract header
citationWoHeader = rddCitations.filter(lambda row: row != citationsHeader)   #filter out header

In [36]:
citationWoHeader.take(5)

['3858250,3324482',
 '3858250,3331084',
 '3858251,1106732',
 '3858255,3284812',
 '3858260,3090973']

Removing header from patents

In [5]:
patentsHeader = rddPatents.first() #extract header
patentsWoHeader = rddPatents.filter(lambda row: row != patentsHeader)   #filter out header

In [38]:
patentsWoHeader.take(5)

['3070825,1963,1096,,"US","IL",,1,,401,1,12,,7,,0.6531,,,,,,,',
 '3070852,1963,1096,,"US","KY",,2,,49,5,59,,7,,0.2449,,,,,,,',
 '3070906,1963,1096,,"US","NY",,2,,434,6,69,,6,,0.6667,,,,,,,',
 '3070939,1963,1096,,"US","MN",,1,,56,6,61,,0,,,,,,,,,',
 '3070950,1963,1096,,"US","SC",,2,,57,6,63,,4,,0,,,,,,,']

Removing the null entries from patents state column

In [6]:
patentsWoNull = patentsWoHeader.filter(lambda row: row[5] != '""')

In [41]:
patentsWoNull.take(5)

['3070825,1963,1096,,"US","IL",,1,,401,1,12,,7,,0.6531,,,,,,,',
 '3070852,1963,1096,,"US","KY",,2,,49,5,59,,7,,0.2449,,,,,,,',
 '3070906,1963,1096,,"US","NY",,2,,434,6,69,,6,,0.6667,,,,,,,',
 '3070939,1963,1096,,"US","MN",,1,,56,6,61,,0,,,,,,,,,',
 '3070950,1963,1096,,"US","SC",,2,,57,6,63,,4,,0,,,,,,,']

Creating key,value pair for patents and citation table

In [7]:
patentsData = patentsWoNull.map(lambda row : row.split(',')).map(lambda row: (row[0], row[5])).cache()
citationData = citationWoHeader.map(lambda row : row.split(',')).map(lambda row: (row[0], row[1])).cache()

In [49]:
patentsData.take(5)

[('3070825', '"IL"'),
 ('3070852', '"KY"'),
 ('3070906', '"NY"'),
 ('3070939', '"MN"'),
 ('3070950', '"SC"')]

In [50]:
citationData.take(5)

[('3858250', '3324482'),
 ('3858250', '3331084'),
 ('3858251', '1106732'),
 ('3858255', '3284812'),
 ('3858260', '3090973')]

Joining citation citing and patents table

In [8]:
join1 = citationData.join(patentsData).filter(lambda row: row[1][1] != '""').cache()

In [57]:
join1.take(5)

[('3858358', ('2290979', '"TX"')),
 ('3858582', ('2589426', '"CA"')),
 ('3858582', ('3090383', '"CA"')),
 ('3859113', ('2412138', '"OH"')),
 ('3859165', ('3247068', '"PA"'))]

Arranging the key,value pair to have cited as the key

In [9]:
join1Data = join1.map(lambda row: (row[1][0], (row[0],row[1][1]))).cache()

In [62]:
join1Data.take(5)

[('2290979', ('3858358', '"TX"')),
 ('2589426', ('3858582', '"CA"')),
 ('3090383', ('3858582', '"CA"')),
 ('2412138', ('3859113', '"OH"')),
 ('3247068', ('3859165', '"PA"'))]

Joining cited of citation table with patents

In [10]:
join2 = join1Data.join(patentsData).filter(lambda row: row[1][1] != '""').cache()

In [64]:
join2.take(5)

[('3754884', (('3875766', '"NY"'), '"TN"')),
 ('3405757', (('3908746', '"MI"'), '"CA"')),
 ('3779851', (('3915781', '"CT"'), '"CA"')),
 ('3511549', (('3933402', '"WI"'), '"NJ"')),
 ('3823753', (('3939886', '"WI"'), '"IL"'))]

In [None]:
Rearranging the columns, ((Cited,Cited_State),(Citing,Citing_State))

In [11]:
join2Data = join2.map(lambda row: (row[1][0],(row[0],row[1][1]))).cache()

In [72]:
join2Data.take(5)

[(('3875766', '"NY"'), ('3754884', '"TN"')),
 (('3908746', '"MI"'), ('3405757', '"CA"')),
 (('3915781', '"CT"'), ('3779851', '"CA"')),
 (('3933402', '"WI"'), ('3511549', '"NJ"')),
 (('3939886', '"WI"'), ('3823753', '"IL"'))]

Filtering the columns to have same state for Citing and Cited. And assigning 1 count to each row

In [12]:
filter1 = join2Data.filter(lambda row: row[0][1] == row[1][1]).map(lambda row: (row[0],1))

In [91]:
filter1.take(5)

[(('5211683', '"MN"'), 1),
 (('5908029', '"MN"'), 1),
 (('5755682', '"MN"'), 1),
 (('5941908', '"MN"'), 1),
 (('4909979', '"MN"'), 1)]

Counting the last column and giving output in the descending order

In [13]:
count = sorted(filter1.countByKey().items(), key = lambda x: x[1], reverse=True)

In [14]:
countAns = sc.parallelize(count)

In [16]:
countAns.take(5)

[(('5959466', '"CA"'), 125),
 (('5983822', '"TX"'), 103),
 (('6008204', '"CA"'), 100),
 (('5952345', '"CA"'), 98),
 (('5998655', '"CA"'), 96)]

Below lines of code is written to display the output as shown in the documentation

In [15]:
patentMatch = patentsWoNull.map(lambda row : row.split(',')).map(lambda row: ((row[0],row[5]), row[1:])).filter(lambda x: x[0][1] != '""').cache()

In [18]:
finalAns = patentMatch.join(countAns).map(lambda x: (x[0][0],x[1][0],x[1][1])).sortBy(lambda x: x[2], ascending=False).cache()

In [21]:
for line in finalAns.take(10):
    print(line)

('5959466', ['1999', '14515', '1997', '"US"', '"CA"', '5310', '2', '', '326', '4', '46', '159', '0', '1', '', '0.6186', '', '4.8868', '0.0455', '0.044', '', ''], 125)
('5983822', ['1999', '14564', '1998', '"US"', '"TX"', '569900', '2', '', '114', '5', '55', '200', '0', '0.995', '', '0.7201', '', '12.45', '0', '0', '', ''], 103)
('6008204', ['1999', '14606', '1998', '"US"', '"CA"', '749584', '2', '', '514', '3', '31', '121', '0', '1', '', '0.7415', '', '5', '0.0085', '0.0083', '', ''], 100)
('5952345', ['1999', '14501', '1997', '"US"', '"CA"', '749584', '2', '', '514', '3', '31', '118', '0', '1', '', '0.7442', '', '5.1102', '0', '0', '', ''], 98)
('5998655', ['1999', '14585', '1998', '"US"', '"CA"', '', '1', '', '560', '1', '14', '114', '0', '1', '', '0.7387', '', '5.1667', '', '', '', ''], 96)
('5958954', ['1999', '14515', '1997', '"US"', '"CA"', '749584', '2', '', '514', '3', '31', '116', '0', '1', '', '0.7397', '', '5.181', '0', '0', '', ''], 96)
('5936426', ['1999', '14466', '1997',