# CSCI 4253 / 5253 - Lab #4 - Patent Problem with Spark RDD - SOLUTION
<div>
 <h2> CSCI 4283 / 5253 
  <IMG SRC="https://www.colorado.edu/cs/profiles/express/themes/cuspirit/logo.png" WIDTH=50 ALIGN="right"/> </h2>
</div>

This [Spark cheatsheet](https://s3.amazonaws.com/assets.datacamp.com/blog_assets/PySpark_SQL_Cheat_Sheet_Python.pdf) is useful

In [1]:
from pyspark import SparkContext, SparkConf
import numpy as np
import operator

In [2]:
conf=SparkConf().setAppName("Lab4-rdd").setMaster("local[*]")
sc = SparkContext(conf=conf)

Using PySpark and RDD's on the https://coding.csel.io machines is slow -- most of the code is executed in Python and this is much less efficient than the java-based code using the PySpark dataframes. Be patient and trying using `.cache()` to cache the output of joins. You may want to start with a reduced set of data before running the full task. You can use the `sample()` method to extract just a sample of the data or use 

These two RDD's are called "rawCitations" and "rawPatents" because you probably want to process them futher (e.g. convert them to integer types, etc). 

The `textFile` function returns data in strings. This should work fine for this lab.

Other methods you use might return data in type `Byte`. If you haven't used Python `Byte` types before, google it. You can convert a value of `x` type byte into e.g. a UTF8 string using `x.decode('uft-8')`. Alternatively, you can use the `open` method of the gzip library to read in all the lines as UTF-8 strings like this:
```
import gzip
with gzip.open('cite75_99.txt.gz', 'rt',encoding='utf-8') as f:
    rddCitations = sc.parallelize( f.readlines() )
```
This is less efficient than using `textFile` because `textFile` would use the underlying HDFS or other file system to read the file across all the worker nodes while the using `gzip.open()...readlines()` will read all the data in the frontend and then distribute it to all the worker nodes.

In [3]:
# sc for Spark Content will create the RDD
rddCitations = sc.textFile("cite75_99.txt.gz")
rddPatents = sc.textFile("apat63_99.txt.gz")

The data looks like the following.

In [4]:
rddCitations.take(5)

['"CITING","CITED"',
 '3858241,956203',
 '3858241,1324234',
 '3858241,3398406',
 '3858241,3557384']

In [5]:
rddPatents.take(5)

['"PATENT","GYEAR","GDATE","APPYEAR","COUNTRY","POSTATE","ASSIGNEE","ASSCODE","CLAIMS","NCLASS","CAT","SUBCAT","CMADE","CRECEIVE","RATIOCIT","GENERAL","ORIGINAL","FWDAPLAG","BCKGTLAG","SELFCTUB","SELFCTLB","SECDUPBD","SECDLWBD"',
 '3070801,1963,1096,,"BE","",,1,,269,6,69,,1,,0,,,,,,,',
 '3070802,1963,1096,,"US","TX",,1,,2,6,63,,0,,,,,,,,,',
 '3070803,1963,1096,,"US","IL",,1,,2,6,63,,9,,0.3704,,,,,,,',
 '3070804,1963,1096,,"US","OH",,1,,2,6,63,,3,,0.6667,,,,,,,']

In other words, they are a single string with multiple CSV's. You will need to convert these to (K,V) pairs, probably convert the keys to `int` and so on. You'll need to `filter` out the header string as well since there's no easy way to extract all the lines except the first.

### Approach:
1) Parse the csv file from rddPatents into two RDDs.
- The first one `patents_all` includes all columns of rddPatents, stored as a Python list per record. This RDD will be joined with the co-state counts at the end of the code so that the resulting database includes all of the columns from patents (p.*) along with the co-state citation counts.
- The second one `patents_filtered` subsets `patents_all` to those records that are not missing the state variable (`POSTATE`). This is necessary because we cannot produce the co-state citation count when the state when this variable is missing. This RDD is also limited to only the columns we need in order to make joining and filtering more efficient.
- The parsed RDD do not contain the header row.
- The outputted patent numbers are now integers instead of strings, which produces consistent numeric keys.
- When creating the RDD, partition into 200 chucks using `partitionBy(nump)` where `nump=200`, so that the operations can run in parallel. Specifically, `.partitionBy(numP)` will hash-partition the key–value pairs so that all records with the same key end up in the same partition. This enables efficient joins by minimizing data shuffles.
- `.persist(StorageLevel.MEMORY_AND_DISK)` keeps the RDD in cache so that it does not need to be recomputed when it is used later in the processing.

2) Parse the rddCitations csv file into one new RDD.
- The citations RDD is parsed the same way as the patents above, regarding removing the header row, partitioning and cache.
- Removed any rows with missing CITED or CITING patent numbers.
- Output CITING and CITED as integers instead of strings.

3) Map the variables on citations to swap them so that we can join on CITED. Use partitioning.
4) Join with `patents_filtered` with key = CITED to add the cited patent's state to each citation record.
5) Re-key by CITING to join to get the citing patent's state. This will let us use the citing patent's number.
6) Join with `patents_filtered` with key = CITING to add the citing patent's state to each record.
7) Filter to records where the cited state is equal to the citing state.
8) Count co-state citations per citing patent. Use reduceByKey to count co-state citations per citing patent.
9) Finally, complete a left join where `patents_all` (left) is joined with the co-state citation counts.
10) Construct the final output rows by creating a single list per record that includes all columns plus the count.
11) Create the table
- Use `.takeOrdered` to retrieve the top 10 patents by co-state citation count.
- Get the header from `rddPatents`.
- Organize the data into a table using a function `show_table`.
- Show the table

In [6]:
import csv
from operator import add
from pyspark import StorageLevel

def clean(s):
    return "" if s is None else s.strip()

def parse_patent_full(line):
    row = next(csv.reader([line]))
    if row and row[0].strip().upper().replace('"','') == "PATENT":
        return None
    if len(row) <= 5:
        return None
    patent_str = row[0].strip().replace('"','')
    if patent_str == "":
        return None
    return (int(patent_str), row)  # (PATENT:int, full_row_list)

def parse_citation_csv(line):
    row = next(csv.reader([line]))
    if row and row[0].strip().upper().replace('"','') in ("CITING", "CITPAT"):
        return None
    if len(row) < 2:
        return None
    citing_str = row[0].strip().replace('"','')
    cited_str  = row[1].strip().replace('"','')
    if citing_str == "" or cited_str == "":
        return None
    return (int(citing_str), int(cited_str))  # (CITING:int, CITED:int)

numP = 200

# Elements of patents_all are (patent_id, full_row_list)
# x[0] = PATENT (int)
# x[1] = row (a Python list of all CSV fields)
# where inside the row list is:
# x[1][0] = "PATENT" value
# x[1][1] = "GYEAR"
# ...
# x[1][5] = "POSTATE"
patents_all = (rddPatents.map(parse_patent_full)
               .filter(lambda x: x is not None)
               .partitionBy(numP)
               .persist(StorageLevel.MEMORY_AND_DISK))


# For joins: only patents with non-empty POSTATE 
# Elements of patents_filtered are (patent_id, postate)
# kv[0] = PATENT
# kv[1] = POSTATE (trimmed)
patents_filtered = (patents_all
                    .mapValues(lambda row: clean(row[5]))
                    .filter(lambda kv: kv[1] != "")
                    .persist(StorageLevel.MEMORY_AND_DISK))


# Elements of citations are (citing_patent, cited_patent)
# x[0] = CITING
# x[1] = CITED
citations = (rddCitations.map(parse_citation_csv)
             .filter(lambda x: x is not None)
             .persist(StorageLevel.MEMORY_AND_DISK))


# CITED -> cited_state
# citations_by_cited swaps them so we can join on CITED.
# Elements of citations_by_cited are (CITED, CITING)
# x[0] = CITED
# x[1] = CITING
citations_by_cited = citations.map(lambda x: (x[1], x[0])).partitionBy(numP)


# Join on key = CITED:
# left side: (CITED, CITING)
# right side: (CITED, cited_state)
# After join, each element is (CITED, (CITING, cited_state))
# x[0] = CITED
# x[1] = (CITING, cited_state)
# x[1][0] = CITING
# x[1][1] = cited_state
with_cited_state = citations_by_cited.join(patents_filtered)

# CITING -> citing_state
# Re-key by CITING to join to get the citing patent’s state.
# by_citing elements are (CITING, cited_state)
# x[0] = CITING
# x[1] = cited_state
by_citing = with_cited_state.map(lambda x: (x[1][0], x[1][1])).partitionBy(numP)

# Join on key = CITING:
# left: (CITING, cited_state)
# right: (CITING, citing_state)
# After join, each element is (CITING, (cited_state, citing_state))
# x[0] = CITING
# x[1] = (cited_state, citing_state)
# x[1][0] = cited_state
# x[1][1] = citing_state
with_both_states = by_citing.join(patents_filtered)

# State filter
filtered = with_both_states.filter(lambda x: x[1][0] == x[1][1])

# Count co-state citations per citing patent
# Use reduceByKey to count co-state citations per citing patent
# Each element of co_state_counts is (CITING, CO_CITED_COUNT)
# x[0] = PATENT (citing patent id)
# x[1] = count of co-state citations
co_state_counts = (filtered
                   .map(lambda x: (x[0], 1))
                   .reduceByKey(add, numPartitions=numP)
                   .persist(StorageLevel.MEMORY_AND_DISK))

# LEFT JOIN onto all patents and fill missing with 0
# Join key = PATENT:
# left: (PATENT, full_row_list)
# right: (PATENT, count)
# Result: (PATENT, (full_row_list, count))
# x[0] = PATENT
# x[1] = (full_row_list, maybe_count)
# x[1][0] = full patent row list
# x[1][1] = count or None if no match
joined = patents_all.leftOuterJoin(co_state_counts)

# single list per record that includes all columns plus the count
rows_with_count = joined.map(
    lambda x: x[1][0] + [str(x[1][1] if x[1][1] is not None else 0)]
)

# Top 10 patents by co-state citation count
result_rdd_final = rows_with_count.takeOrdered(10, key=lambda row: -int(row[-1]))

# Get the header and add SAME_STATE
header = next(csv.reader([rddPatents.first()]))
header = [h.replace('"', '') for h in header]   # remove quotes
header.append("SAME_STATE")

# Organize the data into a table
def show_table(header, rows, n=10, max_col_width=10):
    # replace "" with "null" like Spark
    def norm(x):
        x = "" if x is None else str(x)
        x = "null" if x.strip() == "" else x
        return x

    rows = rows[:n]
    rows = [[norm(x) for x in r] for r in rows]

    # compute widths
    widths = []
    for j, h in enumerate(header):
        col_vals = [h] + [r[j] if j < len(r) else "" for r in rows]
        w = min(max(len(v) for v in col_vals), max_col_width)
        widths.append(w)

    def fmt_row(r):
        cells = []
        for j, w in enumerate(widths):
            v = r[j] if j < len(r) else ""
            v = v if len(v) <= w else v[:w-3] + "..."
            cells.append(v.ljust(w))
        return "| " + " | ".join(cells) + " |"

    border = "+-" + "-+-".join("-"*w for w in widths) + "-+"

    print(border)
    print(fmt_row(header))
    print(border)
    for r in rows:
        print(fmt_row(r))
    print(border)
    print(f"only showing top {len(rows)} rows")

# Show the table
show_table(header, result_rdd_final, n=10)

+---------+-------+-------+---------+---------+---------+----------+---------+--------+--------+-----+--------+-------+----------+----------+---------+----------+----------+----------+----------+----------+----------+----------+------------+
| PATENT  | GYEAR | GDATE | APPYEAR | COUNTRY | POSTATE | ASSIGNEE | ASSCODE | CLAIMS | NCLASS | CAT | SUBCAT | CMADE | CRECEIVE | RATIOCIT | GENERAL | ORIGINAL | FWDAPLAG | BCKGTLAG | SELFCTUB | SELFCTLB | SECDUPBD | SECDLWBD | SAME_STATE |
+---------+-------+-------+---------+---------+---------+----------+---------+--------+--------+-----+--------+-------+----------+----------+---------+----------+----------+----------+----------+----------+----------+----------+------------+
| 5959466 | 1999  | 14515 | 1997    | US      | CA      | 5310     | 2       | null   | 326    | 4   | 46     | 159   | 0        | 1        | null    | 0.6186   | null     | 4.8868   | 0.0455   | 0.044    | null     | null     | 125        |
| 5983822 | 1999  | 14564 | 1998