# CSCI 4253 / 5253 - Lab #4 - Patent Problem with Spark RDD - SOLUTION
<div>
 <h2> CSCI 4283 / 5253 
  <IMG SRC="https://www.colorado.edu/cs/profiles/express/themes/cuspirit/logo.png" WIDTH=50 ALIGN="right"/> </h2>
</div>

This [Spark cheatsheet](https://s3.amazonaws.com/assets.datacamp.com/blog_assets/PySpark_SQL_Cheat_Sheet_Python.pdf) is useful

In [1]:
from pyspark import SparkContext, SparkConf
import numpy as np
import operator

In [2]:
conf=SparkConf().setAppName("Lab4-rdd").setMaster("local[*]")
sc = SparkContext(conf=conf)

Using PySpark and RDD's on the https://coding.csel.io machines is slow -- most of the code is executed in Python and this is much less efficient than the java-based code using the PySpark dataframes. Be patient and trying using `.cache()` to cache the output of joins. You may want to start with a reduced set of data before running the full task. You can use the `sample()` method to extract just a sample of the data or use 

These two RDD's are called "rawCitations" and "rawPatents" because you probably want to process them futher (e.g. convert them to integer types, etc). 

The `textFile` function returns data in strings. This should work fine for this lab.

Other methods you use might return data in type `Byte`. If you haven't used Python `Byte` types before, google it. You can convert a value of `x` type byte into e.g. a UTF8 string using `x.decode('uft-8')`. Alternatively, you can use the `open` method of the gzip library to read in all the lines as UTF-8 strings like this:
```
import gzip
with gzip.open('cite75_99.txt.gz', 'rt',encoding='utf-8') as f:
    rddCitations = sc.parallelize( f.readlines() )
```
This is less efficient than using `textFile` because `textFile` would use the underlying HDFS or other file system to read the file across all the worker nodes while the using `gzip.open()...readlines()` will read all the data in the frontend and then distribute it to all the worker nodes.

In [42]:
rddCitations = sc.textFile("cite75_99.txt.gz")
rddPatents = sc.textFile("apat63_99.txt.gz")

The data looks like the following.

In [43]:
rddCitations.take(5)

['"CITING","CITED"',
 '3858241,956203',
 '3858241,1324234',
 '3858241,3398406',
 '3858241,3557384']

In [44]:
rddPatents.take(5)

['"PATENT","GYEAR","GDATE","APPYEAR","COUNTRY","POSTATE","ASSIGNEE","ASSCODE","CLAIMS","NCLASS","CAT","SUBCAT","CMADE","CRECEIVE","RATIOCIT","GENERAL","ORIGINAL","FWDAPLAG","BCKGTLAG","SELFCTUB","SELFCTLB","SECDUPBD","SECDLWBD"',
 '3070801,1963,1096,,"BE","",,1,,269,6,69,,1,,0,,,,,,,',
 '3070802,1963,1096,,"US","TX",,1,,2,6,63,,0,,,,,,,,,',
 '3070803,1963,1096,,"US","IL",,1,,2,6,63,,9,,0.3704,,,,,,,',
 '3070804,1963,1096,,"US","OH",,1,,2,6,63,,3,,0.6667,,,,,,,']

In other words, they are a single string with multiple CSV's. You will need to convert these to (K,V) pairs, probably convert the keys to `int` and so on. You'll need to `filter` out the header string as well since there's no easy way to extract all the lines except the first.

In [47]:
citingParts = rddCitations.map(lambda l: l.split(",")).filter(lambda x:x[0] != '"CITING"').map(lambda x: (int(x[0]), int(x[1]))) # (CITING, CITED)
patentsParts = rddPatents.map(lambda l: l.split(",")).filter(lambda x: x[0] != '"PATENT"').map(lambda x: (int(x[0]), x[5])) #(PATENT, POSTATE)

In [48]:
citingParts.take(5)

[(3858241, 956203),
 (3858241, 1324234),
 (3858241, 3398406),
 (3858241, 3557384),
 (3858241, 3634889)]

In [49]:
patentsParts.take(5)

[(3070801, '""'),
 (3070802, '"TX"'),
 (3070803, '"IL"'),
 (3070804, '"OH"'),
 (3070805, '"CA"')]

In [52]:
joined1 = citingParts.join(patentsParts).cache()

In [53]:
joined1.take(5)

[(3886430, (2463685, '""')),
 (3886430, (2521058, '""')),
 (3886430, (3249886, '""')),
 (3886430, (3569805, '""')),
 (3886430, (3611086, '""'))]

In [54]:
joined1_rekeyed = joined1.map(
    lambda x: (x[1][0], (x[0], x[1][1]))
)

In [55]:
joined1_rekeyed.take(5)

[(2463685, (3886430, '""')),
 (2521058, (3886430, '""')),
 (3249886, (3886430, '""')),
 (3569805, (3886430, '""')),
 (3611086, (3886430, '""'))]

In [56]:
joined2 = joined1_rekeyed.join(patentsParts).cache()

In [57]:
joined2.take(5)

[(3276012, ((3876826, '""'), '"CA"')),
 (3276012, ((4162839, '""'), '"CA"')),
 (3276012, ((3940673, '"CA"'), '"CA"')),
 (3276012, ((4064515, '""'), '"CA"')),
 (3868170, ((3957342, '""'), '"NY"'))]

In [58]:
filtered = joined2.filter(
    lambda x:
        x[1][0][1] != '""' and x[1][1] != '""' and x[1][0][1] == x[1][1]
)

In [59]:
filtered.take(5)

[(3276012, ((3940673, '"CA"'), '"CA"')),
 (3868170, ((4046545, '"NY"'), '"NY"')),
 (3868170, ((4784465, '"NY"'), '"NY"')),
 (3868170, ((5641333, '"NY"'), '"NY"')),
 (3868170, ((4304583, '"NY"'), '"NY"'))]

In [60]:
counts = filtered.map(
    lambda x: (x[1][0][0], 1)   # (CITING_PATENT, 1)
).reduceByKey(lambda a, b: a + b)

In [61]:
counts.take(5)

[(4304583, 9), (4292220, 3), (3919146, 2), (4101606, 1), (4115637, 1)]

In [62]:
top30 = counts.takeOrdered(
    30,
    key=lambda x: -x[1]
)

In [63]:
for patent, count in top30:
    print(patent, count)

5959466 125
5983822 103
6008204 100
5952345 98
5958954 96
5998655 96
5936426 94
5925042 90
5913855 90
5951547 90
5739256 90
5978329 90
5980517 90
5618907 89
5978331 89
5689485 89
5928229 89
5917082 89
5602226 88
5847160 87
5969079 86
5998471 85
5808083 85
5921954 84
5817836 84
5705574 83
5856490 81
5999972 80
5987245 79
5919970 79
