# CSCI 4253 / 5253 - Lab #4 - Patent Problem with Spark RDD - SOLUTION
<div>
 <h2> CSCI 4283 / 5253 
  <IMG SRC="https://www.colorado.edu/cs/profiles/express/themes/cuspirit/logo.png" WIDTH=50 ALIGN="right"/> </h2>
</div>

This [Spark cheatsheet](https://s3.amazonaws.com/assets.datacamp.com/blog_assets/PySpark_SQL_Cheat_Sheet_Python.pdf) is useful

In [2]:
from pyspark import SparkContext, SparkConf
import numpy as np
import operator

In [3]:
conf=SparkConf().setAppName("Lab4-rdd").setMaster("local[*]")
sc = SparkContext(conf=conf)

Using PySpark and RDD's on the https://coding.csel.io machines is slow -- most of the code is executed in Python and this is much less efficient than the java-based code using the PySpark dataframes. Be patient and trying using `.cache()` to cache the output of joins. You may want to start with a reduced set of data before running the full task. You can use the `sample()` method to extract just a sample of the data or use 

These two RDD's are called "rawCitations" and "rawPatents" because you probably want to process them futher (e.g. convert them to integer types, etc). 

The `textFile` function returns data in strings. This should work fine for this lab.

Other methods you use might return data in type `Byte`. If you haven't used Python `Byte` types before, google it. You can convert a value of `x` type byte into e.g. a UTF8 string using `x.decode('uft-8')`. Alternatively, you can use the `open` method of the gzip library to read in all the lines as UTF-8 strings like this:
```
import gzip
with gzip.open('cite75_99.txt.gz', 'rt',encoding='utf-8') as f:
    rddCitations = sc.parallelize( f.readlines() )
```
This is less efficient than using `textFile` because `textFile` would use the underlying HDFS or other file system to read the file across all the worker nodes while the using `gzip.open()...readlines()` will read all the data in the frontend and then distribute it to all the worker nodes.

In [4]:
rddCitations = sc.textFile("cite75_99.txt.gz")
rddPatents = sc.textFile("apat63_99.txt.gz")

The data looks like the following.

In [5]:
rddCitations.take(5)

['"CITING","CITED"',
 '3858241,956203',
 '3858241,1324234',
 '3858241,3398406',
 '3858241,3557384']

In [6]:
rddPatents.take(5)

['"PATENT","GYEAR","GDATE","APPYEAR","COUNTRY","POSTATE","ASSIGNEE","ASSCODE","CLAIMS","NCLASS","CAT","SUBCAT","CMADE","CRECEIVE","RATIOCIT","GENERAL","ORIGINAL","FWDAPLAG","BCKGTLAG","SELFCTUB","SELFCTLB","SECDUPBD","SECDLWBD"',
 '3070801,1963,1096,,"BE","",,1,,269,6,69,,1,,0,,,,,,,',
 '3070802,1963,1096,,"US","TX",,1,,2,6,63,,0,,,,,,,,,',
 '3070803,1963,1096,,"US","IL",,1,,2,6,63,,9,,0.3704,,,,,,,',
 '3070804,1963,1096,,"US","OH",,1,,2,6,63,,3,,0.6667,,,,,,,']

In other words, they are a single string with multiple CSV's. You will need to convert these to (K,V) pairs, probably convert the keys to `int` and so on. You'll need to `filter` out the header string as well since there's no easy way to extract all the lines except the first.

In [7]:
# First, we will need to extract the data out of the Patent and Citation rdd's by splitting on the commas (,).

def patentSplit(line):
    line = line.split(',')
    return (line[0], line[5])

def citationSplit(line):  
    line = line.split(',')
    return (line[0], line[1].split('\n')[0])

patentData = rddPatents.map(patentSplit)
citationData = rddCitations.map(citationSplit)

In [8]:
patentData.take(5)

[('"PATENT"', '"POSTATE"'),
 ('3070801', '""'),
 ('3070802', '"TX"'),
 ('3070803', '"IL"'),
 ('3070804', '"OH"')]

In [9]:
citationData.take(5)

[('"CITING"', '"CITED"'),
 ('3858241', '956203'),
 ('3858241', '1324234'),
 ('3858241', '3398406'),
 ('3858241', '3557384')]

In [10]:
# Then, we need to join the patentData and citationData to get each Citing State.

firstStep = citationData.join(patentData).cache()
firstStep.take(5)

[('3860575', ('3109843', '""')),
 ('3973741', ('533868', '"CA"')),
 ('3973741', ('2848778', '"CA"')),
 ('3973741', ('3026059', '"CA"')),
 ('3973741', ('3506213', '"CA"'))]

In [11]:
# Now, we exchange Citing and Cited as key, 
# then join again to get each Cited State, 
# and swap back to the original order with Citing as the key.

def swapData(x):
    citing, (cited, citing_state) = x
    return (cited, (citing, citing_state))

def dataSwap(x):
    cited, ((citing, citing_state), cited_state) = x
    return (citing, (citing_state, cited, cited_state))

dataState = firstStep.map(swapData)
tempStep = dataState.join(patentData).cache()
stateData = tempStep.map(dataSwap)

In [12]:
dataState.take(5)

[('3109843', ('3860575', '""')),
 ('533868', ('3973741', '"CA"')),
 ('2848778', ('3973741', '"CA"')),
 ('3026059', ('3973741', '"CA"')),
 ('3506213', ('3973741', '"CA"'))]

In [13]:
tempStep.take(5)

[('4994411', (('5377077', '"TX"'), '""')),
 ('4994411', (('5352632', '""'), '""')),
 ('4994411', (('5984699', '""'), '""')),
 ('4994411', (('5789803', '"ID"'), '""')),
 ('4994411', (('5550711', '"TX"'), '""'))]

In [14]:
stateData.take(5)

[('5377077', ('"TX"', '4994411', '""')),
 ('5352632', ('""', '4994411', '""')),
 ('5984699', ('""', '4994411', '""')),
 ('5789803', ('"ID"', '4994411', '""')),
 ('5550711', ('"TX"', '4994411', '""'))]

In [15]:
# Since now we have the desired info in the need order, we will filter empty values (""),
# filter entries with no matching state,
# then introduce a counter for each key to count the occurences for same-state citations count.

def filters(x):
    citing, (citing_state, cited, cited_state) = x
    return True if (citing_state != '""' and cited_state != '""' and (citing_state == cited_state)) else False

def counter(x):
    citing, (citing_state, cited, cited_state) = x
    return (citing, 1)

citingCount = stateData.filter(filters)
Citing_Count = citingCount.map(counter).reduceByKey(lambda acc, val: acc + val)
citingData = Citing_Count.sortBy(lambda x: x[1], ascending = False)

citingData.take(10)

[('5959466', 125),
 ('5983822', 103),
 ('6008204', 100),
 ('5952345', 98),
 ('5998655', 96),
 ('5958954', 96),
 ('5936426', 94),
 ('5978329', 90),
 ('5739256', 90),
 ('5913855', 90)]

In [16]:
# We now prepare for the final step by updating the original Patent data as a key-value pair,
# with Patent number as key, and the whole line of data as the value,
# and using an outer left-join, to ensure entries with 0 count are preserved. 

def patentKV(line):
    line_split = line.split(',')
    return (line_split[0], ",".join(line_split[1: ]))

def emptyData(x):
    (key, (rest, count)) = x
    if(count is None):
        count = 0
    return (key, (rest, count))

keyValue = rddPatents.map(patentKV)
tempStep = keyValue.leftOuterJoin(citingData).cache()
finalData = tempStep.map(emptyData)

finalData.take(10)

[('3071120', ('1963,1096,,"US","OH",,2,,123,5,53,,0,,,,,,,,,', 0)),
 ('3071392', ('1963,1096,,"US","OH",,1,,280,5,55,,0,,,,,,,,,', 0)),
 ('3071527', ('1963,1096,,"US","NY",,6,,376,4,44,,3,,0,,,,,,,', 0)),
 ('3071546', ('1963,1096,,"US","OH",,2,,508,5,59,,2,,0.5,,,,,,,', 0)),
 ('3071759', ('1963,1096,,"US","PA",,2,,340,2,21,,0,,,,,,,,,', 0)),
 ('3072304', ('1963,1103,,"PK","",,1,,222,1,19,,2,,0,,,,,,,', 0)),
 ('3072336', ('1963,1103,,"CH","",,3,,236,6,69,,0,,,,,,,,,', 0)),
 ('3073391', ('1963,1110,,"US","LA",,2,,166,6,64,,1,,0,,,,,,,', 0)),
 ('3073396', ('1963,1110,,"US","OH",,2,,173,5,51,,1,,0,,,,,,,', 0)),
 ('3073463', ('1963,1110,,"US","MN",,1,,414,5,51,,1,,0,,,,,,,', 0))]

In [17]:
# Finally, we sort finalData to get the final output, ordered by same-state citations count, and in descending order.

def sortOrder(x):
    (key, (rest, count)) = x
    return count

finalOutput = finalData.sortBy(lambda x: sortOrder(x), ascending = False)
finalOutput.take(10)

[('5959466',
  ('1999,14515,1997,"US","CA",5310,2,,326,4,46,159,0,1,,0.6186,,4.8868,0.0455,0.044,,',
   125)),
 ('5983822',
  ('1999,14564,1998,"US","TX",569900,2,,114,5,55,200,0,0.995,,0.7201,,12.45,0,0,,',
   103)),
 ('6008204',
  ('1999,14606,1998,"US","CA",749584,2,,514,3,31,121,0,1,,0.7415,,5,0.0085,0.0083,,',
   100)),
 ('5952345',
  ('1999,14501,1997,"US","CA",749584,2,,514,3,31,118,0,1,,0.7442,,5.1102,0,0,,',
   98)),
 ('5958954',
  ('1999,14515,1997,"US","CA",749584,2,,514,3,31,116,0,1,,0.7397,,5.181,0,0,,',
   96)),
 ('5998655',
  ('1999,14585,1998,"US","CA",,1,,560,1,14,114,0,1,,0.7387,,5.1667,,,,', 96)),
 ('5936426',
  ('1999,14466,1997,"US","CA",5310,2,,326,4,46,178,0,1,,0.58,,11.2303,0.0765,0.073,,',
   94)),
 ('5913855',
  ('1999,14417,1997,"US","CA",733846,2,,606,3,32,242,0,1,,0.7403,,8.3595,0,0,,',
   90)),
 ('5739256',
  ('1998,13983,1995,"US","CA",70060,2,15,528,1,15,453,0,1,,0.8232,,15.1104,0.1124,0.1082,,',
   90)),
 ('5925042',
  ('1999,14445,1997,"US","CA",733846