# CSCI 4253 / 5253 - Lab #4 - Patent Problem with Spark RDD - SOLUTION
<div>
 <h2> CSCI 4283 / 5253 
  <IMG SRC="https://www.colorado.edu/cs/profiles/express/themes/cuspirit/logo.png" WIDTH=50 ALIGN="right"/> </h2>
</div>

This [Spark cheatsheet](https://s3.amazonaws.com/assets.datacamp.com/blog_assets/PySpark_SQL_Cheat_Sheet_Python.pdf) is useful

In [1]:
from pyspark import SparkContext, SparkConf
import numpy as np
import operator

In [2]:
conf=SparkConf().setAppName("Lab4-rdd").setMaster("local[*]")
sc = SparkContext(conf=conf)

Using PySpark and RDD's on the https://coding.csel.io machines is slow -- most of the code is executed in Python and this is much less efficient than the java-based code using the PySpark dataframes. Be patient and trying using `.cache()` to cache the output of joins. You may want to start with a reduced set of data before running the full task. You can use the `sample()` method to extract just a sample of the data or use 

These two RDD's are called "rawCitations" and "rawPatents" because you probably want to process them futher (e.g. convert them to integer types, etc). 

The `textFile` function returns data in strings. This should work fine for this lab.

Other methods you use might return data in type `Byte`. If you haven't used Python `Byte` types before, google it. You can convert a value of `x` type byte into e.g. a UTF8 string using `x.decode('uft-8')`. Alternatively, you can use the `open` method of the gzip library to read in all the lines as UTF-8 strings like this:
```
import gzip
with gzip.open('cite75_99.txt.gz', 'rt',encoding='utf-8') as f:
    rddCitations = sc.parallelize( f.readlines() )
```
This is less efficient than using `textFile` because `textFile` would use the underlying HDFS or other file system to read the file across all the worker nodes while the using `gzip.open()...readlines()` will read all the data in the frontend and then distribute it to all the worker nodes.

In [3]:
rddCitations = sc.textFile("cite75_99.txt.gz")
rddPatents = sc.textFile("apat63_99.txt.gz")

The data looks like the following.

In [4]:
rddCitations.take(5)

['"CITING","CITED"',
 '3858241,956203',
 '3858241,1324234',
 '3858241,3398406',
 '3858241,3557384']

In [5]:
rddPatents.take(5)

['"PATENT","GYEAR","GDATE","APPYEAR","COUNTRY","POSTATE","ASSIGNEE","ASSCODE","CLAIMS","NCLASS","CAT","SUBCAT","CMADE","CRECEIVE","RATIOCIT","GENERAL","ORIGINAL","FWDAPLAG","BCKGTLAG","SELFCTUB","SELFCTLB","SECDUPBD","SECDLWBD"',
 '3070801,1963,1096,,"BE","",,1,,269,6,69,,1,,0,,,,,,,',
 '3070802,1963,1096,,"US","TX",,1,,2,6,63,,0,,,,,,,,,',
 '3070803,1963,1096,,"US","IL",,1,,2,6,63,,9,,0.3704,,,,,,,',
 '3070804,1963,1096,,"US","OH",,1,,2,6,63,,3,,0.6667,,,,,,,']

In other words, they are a single string with multiple CSV's. You will need to convert these to (K,V) pairs, probably convert the keys to `int` and so on. You'll need to `filter` out the header string as well since there's no easy way to extract all the lines except the first.

Below block of code is all about cleaning up the raw patent and citation data so it’s easier to work with. For the citations file, the first line is just a header, so we remove it and then split each row into two numbers: the citing patent and the cited patent. We clean out extra spaces and quotes, and end up with pairs like (citing_id, cited_id). For the patents file, we again drop the header, split each row by commas, and make sure we only keep valid rows with enough columns. From this cleaned data, we create two different views: one called patents_full, which keeps the patent ID as the key and all the other details as the value, and another called patents_state, which keeps only U.S. patents that actually list a state, storing them as (patent_id, state). Finally, a few sample rows are printed out just to confirm everything looks right.

In [6]:
#Citations 
citations_header = rddCitations.first()
rddCitations = (rddCitations
                .filter(lambda line: line != citations_header)
                .map(lambda line: line.split(","))
                .filter(lambda x: len(x) >= 2)
                .map(lambda x: (int(x[0].strip().strip('"')),
                                int(x[1].strip().strip('"')))))   # (CITING, CITED)

#Patents
patents_header = rddPatents.first()
rddPatents = (rddPatents
              .filter(lambda line: line != patents_header)
              .map(lambda line: line.split(","))
              .filter(lambda cols: len(cols) >= 6))

#two views:
# 1) full row without PATENT as key
patents_full = rddPatents.map(lambda cols: (int(cols[0].strip().strip('"')), cols[1:]))
# 2) only (PATENT, STATE) for US + non-empty state
patents_state = (rddPatents
                 .filter(lambda cols: cols[4].strip('"') == "US" and cols[5].strip('"') != "")
                 .map(lambda cols: (int(cols[0].strip().strip('"')),
                                    cols[5].strip('"'))))

print("Citations parsed:", rddCitations.take(3))
print("Patents_full parsed:", patents_full.take(2))
print("Patents_state parsed:", patents_state.take(3))


Citations parsed: [(3858241, 956203), (3858241, 1324234), (3858241, 3398406)]
Patents_full parsed: [(3070801, ['1963', '1096', '', '"BE"', '""', '', '1', '', '269', '6', '69', '', '1', '', '0', '', '', '', '', '', '', '']), (3070802, ['1963', '1096', '', '"US"', '"TX"', '', '1', '', '2', '6', '63', '', '0', '', '', '', '', '', '', '', '', ''])]
Patents_state parsed: [(3070802, 'TX'), (3070803, 'IL'), (3070804, 'OH')]


It connects the citations data with the patent state information. First, it flips the citation pairs so that each record starts with the cited patent ID instead of the citing one. Then it performs a left outer join with patents_state, which means we try to attach the state of the cited patent whenever that information is available (and if the state is missing, it just stays as None). After the join, the data is reshaped so that the citing patent ID is the main key, and its value is a pair containing the cited patent ID and the cited patent’s state. Finally, a few rows are printed to check if the join worked as expected.

In [7]:
# Re-key by CITED then join with patents_state
cited_states = (rddCitations
                .map(lambda t: (t[1], t[0]))                 # (CITED, CITING)
                .leftOuterJoin(patents_state)                # (CITED, (CITING, cited_state|None))
                .map(lambda x: (x[1][0], (x[0], x[1][1]))))  # (CITING, (CITED, cited_state))

print("#1 Join sample:", cited_states.take(5))


#1 Join sample: [(3859474, (3377434, 'NJ')), (3859505, (1694570, None)), (3983361, (1694570, None)), (3859539, (3504978, None)), (4057734, (3504978, None))]


Now we perform second join. We first start with cited_states, which already has each citing patent along with the cited patent and the cited patent’s state. Then we perform a leftOuterJoin with patents_state, using the citing patent ID as the key. This adds the state of the citing patent (if it exists) to each record.

In [8]:
# (CITING, ((CITED, cited_state), citing_state))
citing_cited = cited_states.leftOuterJoin(patents_state)

print("#2 Join sample:", citing_cited.take(5))


#2 Join sample: [(4034670, ((2868118, None), 'IL')), (4034670, ((2995084, None), 'IL')), (4034670, ((3106154, 'FL'), 'IL')), (4034670, ((3690254, None), 'IL')), (4034670, ((2291046, None), 'IL'))]


Next, we calculate how many citations each patent makes to other patents in the same state. First, we filter out any citations where either the citing or cited patent doesn’t have a state, keeping only the ones where both states exist and match. Then, for each of these valid citations, we create a pair with the citing patent and the number 1, representing one same-state citation. Finally, we sum these counts for each citing patent using reduceByKey, giving the total number of same-state citations per patent. The result is a dataset of patents with their corresponding same-state citation counts.

In [9]:
same_state_counts = (citing_cited
    .filter(lambda x: x[1][0][1] is not None and x[1][1] is not None and x[1][0][1] == x[1][1])
    .map(lambda x: (x[0], 1))
    .reduceByKey(lambda a,b: a+b))

print("same_state_counts sample:", same_state_counts.take(5))


same_state_counts sample: [(4871703, 19), (5984940, 1), (5608869, 2), (5886693, 8), (3858459, 3)]


In this part, we attach the same-state citation counts back to the full patent information. We perform a left outer join so that every patent keeps its full data, and if a patent doesn’t have any same-state citations, it gets a count of 0. We then combine each patent’s full row with its count as a new final column. After that, we sort the patents in descending order of same-state citations, breaking ties by patent ID in ascending order. Finally, we take and print the top 15 patents with the most same-state citations.


In [10]:
patents_with_counts = (patents_full
    .leftOuterJoin(same_state_counts)   # (PATENT, ([...], COUNT|None))
    .map(lambda x: (x[0], x[1][0] + [x[1][1] if x[1][1] is not None else 0])))

# Sort by count desc, patent ID asc
top15 = (patents_with_counts
         .sortBy(lambda t: (-t[1][-1], t[0]))
         .take(15))

for row in top15:
    print(row)


(5959466, ['1999', '14515', '1997', '"US"', '"CA"', '5310', '2', '', '326', '4', '46', '159', '0', '1', '', '0.6186', '', '4.8868', '0.0455', '0.044', '', '', 125])
(5983822, ['1999', '14564', '1998', '"US"', '"TX"', '569900', '2', '', '114', '5', '55', '200', '0', '0.995', '', '0.7201', '', '12.45', '0', '0', '', '', 103])
(6008204, ['1999', '14606', '1998', '"US"', '"CA"', '749584', '2', '', '514', '3', '31', '121', '0', '1', '', '0.7415', '', '5', '0.0085', '0.0083', '', '', 100])
(5952345, ['1999', '14501', '1997', '"US"', '"CA"', '749584', '2', '', '514', '3', '31', '118', '0', '1', '', '0.7442', '', '5.1102', '0', '0', '', '', 98])
(5958954, ['1999', '14515', '1997', '"US"', '"CA"', '749584', '2', '', '514', '3', '31', '116', '0', '1', '', '0.7397', '', '5.181', '0', '0', '', '', 96])
(5998655, ['1999', '14585', '1998', '"US"', '"CA"', '', '1', '', '560', '1', '14', '114', '0', '1', '', '0.7387', '', '5.1667', '', '', '', '', 96])
(5936426, ['1999', '14466', '1997', '"US"', '"CA"