# CSCI 4253 / 5253 - Lab #4 - Patent Problem with Spark RDD - SOLUTION
<div>
 <h2> CSCI 4283 / 5253 
  <IMG SRC="https://www.colorado.edu/cs/profiles/express/themes/cuspirit/logo.png" WIDTH=50 ALIGN="right"/> </h2>
</div>

This [Spark cheatsheet](https://s3.amazonaws.com/assets.datacamp.com/blog_assets/PySpark_SQL_Cheat_Sheet_Python.pdf) is useful

In [1]:
from pyspark import SparkContext, SparkConf
import numpy as np
import operator

In [2]:
conf=SparkConf().setAppName("Lab4-rdd").setMaster("local[*]")
sc = SparkContext(conf=conf)

Using PySpark and RDD's on the https://coding.csel.io machines is slow -- most of the code is executed in Python and this is much less efficient than the java-based code using the PySpark dataframes. Be patient and trying using `.cache()` to cache the output of joins. You may want to start with a reduced set of data before running the full task. You can use the `sample()` method to extract just a sample of the data or use 

These two RDD's are called "rawCitations" and "rawPatents" because you probably want to process them futher (e.g. convert them to integer types, etc). 

The `textFile` function returns data in strings. This should work fine for this lab.

Other methods you use might return data in type `Byte`. If you haven't used Python `Byte` types before, google it. You can convert a value of `x` type byte into e.g. a UTF8 string using `x.decode('uft-8')`. Alternatively, you can use the `open` method of the gzip library to read in all the lines as UTF-8 strings like this:
```
import gzip
with gzip.open('cite75_99.txt.gz', 'rt',encoding='utf-8') as f:
    rddCitations = sc.parallelize( f.readlines() )
```
This is less efficient than using `textFile` because `textFile` would use the underlying HDFS or other file system to read the file across all the worker nodes while the using `gzip.open()...readlines()` will read all the data in the frontend and then distribute it to all the worker nodes.

In [3]:
rddCitations = sc.textFile("cite75_99.txt.gz")
rddPatents = sc.textFile("apat63_99.txt.gz")

The data looks like the following.

In [4]:
rddCitations.take(5)

['"CITING","CITED"',
 '3858241,956203',
 '3858241,1324234',
 '3858241,3398406',
 '3858241,3557384']

In [5]:
rddPatents.take(5)

['"PATENT","GYEAR","GDATE","APPYEAR","COUNTRY","POSTATE","ASSIGNEE","ASSCODE","CLAIMS","NCLASS","CAT","SUBCAT","CMADE","CRECEIVE","RATIOCIT","GENERAL","ORIGINAL","FWDAPLAG","BCKGTLAG","SELFCTUB","SELFCTLB","SECDUPBD","SECDLWBD"',
 '3070801,1963,1096,,"BE","",,1,,269,6,69,,1,,0,,,,,,,',
 '3070802,1963,1096,,"US","TX",,1,,2,6,63,,0,,,,,,,,,',
 '3070803,1963,1096,,"US","IL",,1,,2,6,63,,9,,0.3704,,,,,,,',
 '3070804,1963,1096,,"US","OH",,1,,2,6,63,,3,,0.6667,,,,,,,']

In other words, they are a single string with multiple CSV's. You will need to convert these to (K,V) pairs, probably convert the keys to `int` and so on. You'll need to `filter` out the header string as well since there's no easy way to extract all the lines except the first.

### 1. Imports & Simple Helpers for Cleaning and Saving CSV Data in Spark
- parse_csv_line: Parse rows safely.
- drop_header: Remove header.
- to_int_or_none: Clean numbers.
- Storage Level & persist: Save processed data in Spark so it can be reused quickly.
### 2.Parsing and Cleaning Citations & Patents Data in Spark
- Citations: keep only who cites who.
- Patents: keep only ID, country, and state.
- Both are cleaned and stored so Spark can reuse them fast.

In [None]:
import csv
from io import StringIO
from pyspark.storagelevel import StorageLevel

def parse_csv_line(line: str):
    return next(csv.reader(StringIO(line)))

def drop_header(rdd):
    header = rdd.first()
    return rdd.filter(lambda x: x != header)

def to_int_or_none(s: str):
    try:
        return int(s)
    except Exception:
        return None

citations_data = drop_header(rddCitations).map(parse_csv_line)
citations = (
    citations_data
    .map(lambda f: (to_int_or_none(f[0]), to_int_or_none(f[1])))
    .filter(lambda t: t[0] is not None and t[1] is not None)
    .persist(StorageLevel.MEMORY_AND_DISK) 
)

patents_data = drop_header(rddPatents).map(parse_csv_line)
patents_min = (
    patents_data
    .map(lambda f: (to_int_or_none(f[0]),
                    f[4] if len(f) > 4 else "",
                    f[5] if len(f) > 5 else ""))
    .filter(lambda t: t[0] is not None) 
    .persist(StorageLevel.MEMORY_AND_DISK)
)

### 3. creating a clean address book that tells us which U.S. state each patent belongs to.

In [7]:
state_by_patent = (
    patents_min
    .filter(lambda t: t[1] == "US" and t[2] != "")
    .map(lambda t: (t[0], t[2]))  # (patent_id, state)
    .persist(StorageLevel.MEMORY_AND_DISK)
)

### 4. attaching the U.S. state information to both the citing and cited patents in each citation.

In [8]:
by_cited = citations.map(lambda t: (t[1], t[0]))  # (cited, citing)
with_cited_state = by_cited.leftOuterJoin(state_by_patent).persist(StorageLevel.MEMORY_AND_DISK) #(cited, (citing, cited_state))

In [9]:
step1 = with_cited_state.map(lambda x: (x[1][0], (x[0], x[1][1])))  # (citing, (cited, cited_state))
with_both = step1.leftOuterJoin(state_by_patent).persist(StorageLevel.MEMORY_AND_DISK) # (citing, ((cited, cited_state), citing_state))

### 5.reshaping the citation data into a simple table (Cited, Cited State, Citing, Citing State) and print 20 rows to double-check it looks right.

In [10]:
intermediate_rdd = with_both.map(lambda kv: (
    kv[1][0][0],   # Cited
    kv[1][0][1] if kv[1][0][1] is not None else None,  # Cited_State
    kv[0],         # Citing
    kv[1][1] if kv[1][1] is not None else None         # Citing_State
))

print("Cited\tState\tCiting\tState")
for row in intermediate_rdd.take(20):
    print(f"{row[0]}\t{row[1]}\t{row[2]}\t{row[3]}")

Cited	State	Citing	State
2085758	None	4138233	None
1959374	None	4138233	None
3443362	None	4138233	None
1976214	None	4138233	None
3915672	PA	4138233	None
2682313	None	4138233	None
3984215	FL	4138233	None
3350849	None	4138233	None
2019485	None	4138233	None
3443361	MD	4138233	None
2181767	None	4138233	None
4052177	None	4138233	None
2195431	None	4138233	None
2519618	None	5415732	None
4334962	FL	5415732	None
3974022	None	5415732	None
3634128	UT	5415732	None
3702807	None	5415732	None
1006197	None	5415732	None
1717927	None	5415732	None


### 6. counting, for each citing patent, how many of its citations point to patents in the same state.

In [11]:
same_state_counts = (
    intermediate_rdd
    .filter(lambda r: (r[1] is not None) and (r[3] is not None) and (r[1] == r[3]))
    .map(lambda r: (r[2], 1))                # (Citing, 1)
    .reduceByKey(lambda a, b: a + b)         # (Citing, same_state_cites)
    .persist(StorageLevel.MEMORY_AND_DISK)
)

### 7. creating a final table where every patent has its state and a count of same-state citations, with 0 for those that don’t have any.

In [12]:
patent_to_state = patents_min.map(lambda t: (t[0], t[2]))  # (PATENT, POSTATE)
augmented_min = (
    patent_to_state.leftOuterJoin(same_state_counts)  # (PATENT, (POSTATE, count))
    .map(lambda kv: (kv[0], kv[1][0], 0 if kv[1][1] is None else kv[1][1]))
)

### 8. listing the top 10 patents that cite the most other patents from the same state.

In [13]:
top10 = augmented_min.takeOrdered(10, key=lambda x: (-x[2], x[0]))
print("\nTop 10 patents(PATENT, POSTATE, same_state_cites):")
for rec in top10:
    print(rec)


Top 10 patents(PATENT, POSTATE, same_state_cites):
(5959466, 'CA', 125)
(5983822, 'TX', 103)
(6008204, 'CA', 100)
(5952345, 'CA', 98)
(5958954, 'CA', 96)
(5998655, 'CA', 96)
(5936426, 'CA', 94)
(5739256, 'CA', 90)
(5913855, 'CA', 90)
(5925042, 'CA', 90)
