# CSCI 4253 / 5253 - Lab #4 - Patent Problem with Spark RDD - SOLUTION
<div>
 <h2> CSCI 4283 / 5253 
  <IMG SRC="https://www.colorado.edu/cs/profiles/express/themes/cuspirit/logo.png" WIDTH=50 ALIGN="right"/> </h2>
</div>

This [Spark cheatsheet](https://s3.amazonaws.com/assets.datacamp.com/blog_assets/PySpark_SQL_Cheat_Sheet_Python.pdf) is useful

In [1]:
from pyspark import SparkContext, SparkConf
import numpy as np
import operator

In [2]:
conf=SparkConf().setAppName("Lab4-rdd").setMaster("local[*]")
sc = SparkContext(conf=conf)

Using PySpark and RDD's on the https://coding.csel.io machines is slow -- most of the code is executed in Python and this is much less efficient than the java-based code using the PySpark dataframes. Be patient and trying using `.cache()` to cache the output of joins. You may want to start with a reduced set of data before running the full task. You can use the `sample()` method to extract just a sample of the data or use 

These two RDD's are called "rawCitations" and "rawPatents" because you probably want to process them futher (e.g. convert them to integer types, etc). 

The `textFile` function returns data in strings. This should work fine for this lab.

Other methods you use might return data in type `Byte`. If you haven't used Python `Byte` types before, google it. You can convert a value of `x` type byte into e.g. a UTF8 string using `x.decode('uft-8')`. Alternatively, you can use the `open` method of the gzip library to read in all the lines as UTF-8 strings like this:
```
import gzip
with gzip.open('cite75_99.txt.gz', 'rt',encoding='utf-8') as f:
    rddCitations = sc.parallelize( f.readlines() )
```
This is less efficient than using `textFile` because `textFile` would use the underlying HDFS or other file system to read the file across all the worker nodes while the using `gzip.open()...readlines()` will read all the data in the frontend and then distribute it to all the worker nodes.

In [3]:
rddCitations = sc.textFile("cite75_99.txt.gz")
rddPatents = sc.textFile("apat63_99.txt.gz")

The data looks like the following.

In [4]:
rddCitations.take(5)

['"CITING","CITED"',
 '3858241,956203',
 '3858241,1324234',
 '3858241,3398406',
 '3858241,3557384']

In [5]:
rddPatents.take(5)

['"PATENT","GYEAR","GDATE","APPYEAR","COUNTRY","POSTATE","ASSIGNEE","ASSCODE","CLAIMS","NCLASS","CAT","SUBCAT","CMADE","CRECEIVE","RATIOCIT","GENERAL","ORIGINAL","FWDAPLAG","BCKGTLAG","SELFCTUB","SELFCTLB","SECDUPBD","SECDLWBD"',
 '3070801,1963,1096,,"BE","",,1,,269,6,69,,1,,0,,,,,,,',
 '3070802,1963,1096,,"US","TX",,1,,2,6,63,,0,,,,,,,,,',
 '3070803,1963,1096,,"US","IL",,1,,2,6,63,,9,,0.3704,,,,,,,',
 '3070804,1963,1096,,"US","OH",,1,,2,6,63,,3,,0.6667,,,,,,,']

In other words, they are a single string with multiple CSV's. You will need to convert these to (K,V) pairs, probably convert the keys to `int` and so on. You'll need to `filter` out the header string as well since there's no easy way to extract all the lines except the first.

In [36]:
# --- Step 1: RDD dev mode (as suggested in the README) ---
# RDD joins are slow, so I develop/debug on a 5% sample first.
# Same exact logic applies to the full dataset if needed, simply remove .sample(False, 0.05, seed=42)

rddPatents_dev = rddPatents
rddCitations_dev = rddCitations

import csv

# Debugging fix: header rows can slip through in RDD text files.
# The safest filter is: keep only rows that begin with a digit.
def is_data_row(line):
    return len(line) > 0 and line[0].isdigit()

def parse_patent(line):
    row = next(csv.reader([line]))
    patent = int(row[0])
    state = row[5] if row[5] != "" else None
    return (patent, state)

def parse_citation(line):
    parts = line.split(",")
    citing = int(parts[0])
    cited = int(parts[1])
    return (cited, citing)  # key=CITED for the first join

### Step 1: Set up a development sample + parsing helpers

For the RDD solution, I used a 5% sample of the patents and citations data while developing and debugging, as suggested in the README. RDD joins are much slower than DataFrame joins, so working on a smaller subset helps confirm the logic without waiting a long time for each join step.

In this chunk, I also define small helper functions for parsing. Because the input files are raw text with a header row, I filter out non-data rows by keeping only lines that begin with a digit. This prevents type conversion errors when casting the patent ID and citation IDs to integers.

**Debugging note:**  
While parsing the patent RDD, an initial header filter based on string matching allowed a header row to pass through, causing a type conversion error when casting the patent ID to an integer. This was resolved by filtering input rows to only those that begin with a digit, which reliably distinguishes data rows from headers in the raw text file. This approach is more robust for RDD-based parsing.

In [39]:
# --- Step 2: Build patent->state mapping + attach states to citations ---
# Goal: produce an intermediate RDD where each citation has both states available
# (CITING, (CITED, CITED_STATE, CITING_STATE)).

pat_states = (
    rddPatents_dev
    .filter(is_data_row)
    .map(parse_patent)
    .filter(lambda x: x[1] is not None)     # keep only patents with a real state
    .cache()
)

cit_by_cited = (
    rddCitations_dev
    .filter(is_data_row)
    .map(parse_citation)                   # (CITED, CITING)
    .cache()
)

# Join 1: attach CITED_STATE
cited_join = (
    cit_by_cited
    .leftOuterJoin(pat_states)             # (CITED, (CITING, CITED_STATE))
    .cache()
)
cited_join.count()  # force caching so we don't recompute the join later

# Re-key by CITING so we can attach CITING_STATE
by_citing = (
    cited_join
    .map(lambda x: (x[1][0], (x[0], x[1][1])))  # (CITING, (CITED, CITED_STATE))
    .cache()
)

# Join 2: attach CITING_STATE
both_join = (
    by_citing
    .leftOuterJoin(pat_states)                 # (CITING, ((CITED, CITED_STATE), CITING_STATE))
    .cache()
)
both_join.count()  # force caching again (RDD joins are slow)

both_join.take(10)

[(3858264, ((3203002, None), None)),
 (4922568, ((3203002, None), None)),
 (3858291, ((3768550, None), None)),
 (3858324, ((2423786, None), None)),
 (3858348, ((1384972, None), None)),
 (3858402, ((3720066, None), None)),
 (3858429, ((3051216, None), None)),
 (3858444, ((3710874, None), None)),
 (3858444, ((3613853, None), None)),
 (3858480, ((3435722, None), None))]

### Step 2: Build patent → state mapping and attach states onto citations (two joins)

In this chunk, I construct the intermediate dataset needed to count same-state citations. First, I extract a clean mapping of `(PATENT, POSTATE)` from the patents file, keeping only rows that have valid state information. This ensures later state comparisons are meaningful.

Next, I join state information onto the citations data in two steps. I key citations by `CITED` to attach the **cited patent’s state** (`CITED_STATE`) using a left outer join, since some cited patents may not exist in the patents table. Then I re-key by `CITING` and join again to attach the **citing patent’s state** (`CITING_STATE`). I cache and materialize the join outputs using `count()` so Spark does not recompute these expensive joins later.

In [38]:
# --- Step 3: Count same-state citations + report results ---
# SAME_STATE = count of citations where cited and citing states are both present and match.

same_state_counts = (
    both_join
    .filter(lambda x: x[1][0][1] is not None and x[1][1] is not None)  # has CITED_STATE and CITING_STATE
    .filter(lambda x: x[1][0][1] == x[1][1])                           # states match
    .map(lambda x: (x[0], 1))                                          # (CITING, 1)
    .reduceByKey(lambda a, b: a + b)                                   # (CITING, SAME_STATE)
    .cache()
)

top10 = same_state_counts.takeOrdered(10, key=lambda x: -x[1])

print("PATENT\tSAME_STATE")
for pid, cnt in top10:
    print(f"{pid}\t{cnt}")

# --- Optional: match the “(PATENT, [row..., SAME_STATE])” reporting style ---
# This is just for display so the output looks like the reference/example.

def parse_full_patent(line):
    row = next(csv.reader([line]))
    patent = int(row[0])
    return (patent, row[1:])  # everything except PATENT

patents_full = (
    rddPatents_dev
    .filter(is_data_row)
    .map(parse_full_patent)
    .cache()
)

top10_rdd = sc.parallelize(top10)  # (PATENT, SAME_STATE)

final_join = (
    top10_rdd
    .join(patents_full)                          # (PATENT, (SAME_STATE, full_row))
    .map(lambda x: (x[0], x[1][1] + [x[1][0]]))   # append SAME_STATE at end
)

for row in final_join.take(10):
    print(row)

PATENT	SAME_STATE
5069283	2
5544893	2
4578858	1
4000494	1
5601846	1
4257009	1
4431513	1
4434258	1
4439634	1
4456845	1
(5069283, ['1991', '11659', '1989', 'US', 'OK', '624265', '2', '26', '166', '6', '64', '38', '7', '1', '0.4082', '0.6274', '4.4286', '10.5526', '0.0286', '0.0263', '0.1429', '0.1429', 2])
(4578858, ['1986', '9587', '1984', 'US', 'TX', '601765', '2', '22', '29', '5', '52', '15', '1', '1', '0', '0.6933', '7', '8', '0.9091', '0.6667', '0', '0', 1])
(4000494, ['1976', '6206', '1975', 'US', 'PA', '256580', '2', '9', '346', '4', '49', '8', '3', '0.75', '0', '0', '3.3333', '9.25', '0.1667', '0.125', '1', '1', 1])
(5601846, ['1997', '13556', '1995', 'US', 'NY', '709931', '2', '22', '424', '3', '31', '110', '21', '0.9364', '0.4943', '0.8031', '1.381', '14.1636', '0', '0', '0.9524', '0.9524', 1])
(4257009, ['1981', '7746', '1979', 'US', 'NJ', '466175', '2', '2', '330', '4', '41', '5', '9', '1', '0.6914', '0.8', '13.4444', '12.2', '1', '0.6', '0', '0', 1])
(4431513, ['1984', '8810

### Step 34: Count SAME_STATE citations and report top results

In this chunk, I compute the final value required by the assignment: the number of same-state citations per citing patent. A same-state citation is defined as a citation where both the citing and cited patents have state information and the states match.

Using the joined RDD from the previous chunk, I filter out records with missing states, filter again to keep only matching-state pairs, and then map each remaining row to `(CITING, 1)`. I use `reduceByKey` to sum these values and produce `(CITING, SAME_STATE)` counts.

Finally, I report the top 10 patents by SAME_STATE. For display purposes, I also join the top-10 results back to the full patent rows so the output can be printed in the same “(PATENT, [row…, SAME_STATE])” style as the example/reference output.