# CSCI 4253 / 5253 - Lab #4 - Patent Problem with Spark RDD - SOLUTION
<div>
 <h2> CSCI 4283 / 5253 
  <IMG SRC="https://www.colorado.edu/cs/profiles/express/themes/cuspirit/logo.png" WIDTH=50 ALIGN="right"/> </h2>
</div>

This [Spark cheatsheet](https://s3.amazonaws.com/assets.datacamp.com/blog_assets/PySpark_SQL_Cheat_Sheet_Python.pdf) is useful

In [1]:
from pyspark import SparkContext, SparkConf
import numpy as np
import operator

In [2]:
conf=SparkConf().setAppName("Lab4-rdd").setMaster("local[*]")
sc = SparkContext(conf=conf)

Using PySpark and RDD's on the https://coding.csel.io machines is slow -- most of the code is executed in Python and this is much less efficient than the java-based code using the PySpark dataframes. Be patient and trying using `.cache()` to cache the output of joins. You may want to start with a reduced set of data before running the full task. You can use the `sample()` method to extract just a sample of the data or use 

These two RDD's are called "rawCitations" and "rawPatents" because you probably want to process them futher (e.g. convert them to integer types, etc). 

The `textFile` function returns data in strings. This should work fine for this lab.

Other methods you use might return data in type `Byte`. If you haven't used Python `Byte` types before, google it. You can convert a value of `x` type byte into e.g. a UTF8 string using `x.decode('uft-8')`. Alternatively, you can use the `open` method of the gzip library to read in all the lines as UTF-8 strings like this:
```
import gzip
with gzip.open('cite75_99.txt.gz', 'rt',encoding='utf-8') as f:
    rddCitations = sc.parallelize( f.readlines() )
```
This is less efficient than using `textFile` because `textFile` would use the underlying HDFS or other file system to read the file across all the worker nodes while the using `gzip.open()...readlines()` will read all the data in the frontend and then distribute it to all the worker nodes.

In [3]:
rddCitations = sc.textFile("cite75_99.txt.gz")
rddPatents = sc.textFile("apat63_99.txt.gz")

The data looks like the following.

In [4]:
rddCitations.take(5)

['"CITING","CITED"',
 '3858241,956203',
 '3858241,1324234',
 '3858241,3398406',
 '3858241,3557384']

In [5]:
rddPatents.take(5)

['"PATENT","GYEAR","GDATE","APPYEAR","COUNTRY","POSTATE","ASSIGNEE","ASSCODE","CLAIMS","NCLASS","CAT","SUBCAT","CMADE","CRECEIVE","RATIOCIT","GENERAL","ORIGINAL","FWDAPLAG","BCKGTLAG","SELFCTUB","SELFCTLB","SECDUPBD","SECDLWBD"',
 '3070801,1963,1096,,"BE","",,1,,269,6,69,,1,,0,,,,,,,',
 '3070802,1963,1096,,"US","TX",,1,,2,6,63,,0,,,,,,,,,',
 '3070803,1963,1096,,"US","IL",,1,,2,6,63,,9,,0.3704,,,,,,,',
 '3070804,1963,1096,,"US","OH",,1,,2,6,63,,3,,0.6667,,,,,,,']

In other words, they are a single string with multiple CSV's. You will need to convert these to (K,V) pairs, probably convert the keys to `int` and so on. You'll need to `filter` out the header string as well since there's no easy way to extract all the lines except the first.

In [10]:
# Helper functions for parsing CSV data
def parse_citation_line(line):
    """Parse citation line and return (citing, cited) tuple"""
    parts = line.split(',')
    if len(parts) >= 2:
        try:
            citing = int(parts[0].strip('"'))
            cited = int(parts[1].strip('"'))
            return (citing, cited)
        except ValueError:
            return None
    return None

def parse_patent_line(line):
    """Parse patent line and return (patent_id, (state, full_data)) tuple"""
    parts = line.split(',')
    if len(parts) >= 6:
        try:
            patent_id = int(parts[0].strip('"'))
            country = parts[4].strip('"')
            state = parts[5].strip('"')
            
            # Only include US patents with state information
            if country == 'US' and state and state != '':
                return (patent_id, (state, parts))
        except (ValueError, IndexError):
            pass
    return None

In [18]:
# Cell: Process the data with sampling option
def solve_patent_problem(use_sample=True, sample_fraction=0.25):
    """
    Solve the patent citation problem using RDD API
    
    Args:
        use_sample: If True, use a sample of the data for faster processing
        sample_fraction: Fraction of data to sample (default 0.05 = 5%)
        
    Returns:
        tuple: (result_dict, top_10_list, full_citation_data_rdd) for verification
    """
    
    # Load and optionally sample the data
    if use_sample:
        print(f"Using {sample_fraction*100}% sample of the data...")
        citations_rdd = rddCitations.sample(False, sample_fraction, seed=42)
        patents_rdd = rddPatents.sample(False, sample_fraction, seed=42)
    else:
        print("Using full dataset...")
        citations_rdd = rddCitations
        patents_rdd = rddPatents
    
    # Remove headers and parse citations
    citations_header = citations_rdd.first()
    citations_parsed = (citations_rdd
                       .filter(lambda line: line != citations_header)
                       .map(parse_citation_line)
                       .filter(lambda x: x is not None))
    
    print(f"Parsed citations: {citations_parsed.count()}")
    citations_parsed.cache()
    
    # Remove headers and parse patents (only US patents with states)
    patents_header = patents_rdd.first()
    patents_parsed = (patents_rdd
                     .filter(lambda line: line != patents_header)
                     .map(parse_patent_line)
                     .filter(lambda x: x is not None))
    
    print(f"US patents with state info: {patents_parsed.count()}")
    patents_parsed.cache()
    
    # Create lookup for patent states: (patent_id, state)
    patent_states = patents_parsed.mapValues(lambda x: x[0])  # Extract just the state
    patent_states.cache()
    
    print("Sample patent states:")
    print(patent_states.take(5))
    
    # Join citations with cited patent states
    # Transform citations to (cited_patent, citing_patent) for join
    citations_for_cited_join = citations_parsed.map(lambda x: (x[1], x[0]))
    
    # Join with patent states to get cited patent state
    citations_with_cited_state = citations_for_cited_join.join(patent_states)
    # Result: (cited_patent, (citing_patent, cited_state))
    
    print(f"Citations with cited state: {citations_with_cited_state.count()}")
    citations_with_cited_state.cache()
    
    # Transform to (citing_patent, (cited_patent, cited_state)) for next join
    citations_citing_key = citations_with_cited_state.map(
        lambda x: (x[1][0], (x[0], x[1][1]))
    )
    
    # Join with patent states to get citing patent state
    full_citation_data = citations_citing_key.join(patent_states)
    # Result: (citing_patent, ((cited_patent, cited_state), citing_state))
    
    print(f"Full citation data: {full_citation_data.count()}")
    full_citation_data.cache()
    
    print("Sample full citation data:")
    sample_data = full_citation_data.take(5)
    for item in sample_data:
        citing = item[0]
        cited = item[1][0][0]
        cited_state = item[1][0][1]
        citing_state = item[1][1]
        print(f"Citing: {citing} ({citing_state}) -> Cited: {cited} ({cited_state})")
    
    # Filter for same-state citations and count per patent
    same_state_citations = full_citation_data.filter(
        lambda x: x[1][0][1] == x[1][1]  # cited_state == citing_state
    )
    
    print(f"Same-state citations: {same_state_citations.count()}")
    
    # Count same-state citations per citing patent
    same_state_counts = (same_state_citations
                        .map(lambda x: (x[0], 1))  # (citing_patent, 1)
                        .reduceByKey(operator.add))  # Sum up counts
    
    print(f"Patents with same-state citations: {same_state_counts.count()}")
    same_state_counts.cache()
    
    print("Sample same-state counts:")
    print(same_state_counts.take(10))
    
    # Join counts back with full patent data
    # patents_parsed structure: (patent_id, (state, full_data))
    patents_with_counts = patents_parsed.leftOuterJoin(same_state_counts)
    # Result: (patent_id, ((state, full_data), count_or_None))
    
    # Create final result dictionary
    def create_result_tuple(item):
        patent_id = item[0]
        state = item[1][0][0]
        full_data = item[1][0][1]
        count = item[1][1] if item[1][1] is not None else 0
        
        # Return (patent_id, (state, count, full_data))
        return (patent_id, (state, count, full_data))
    
    result_rdd = patents_with_counts.map(create_result_tuple)
    
    # Convert to dictionary and get top 10
    # First, filter for patents with same-state citations > 0
    patents_with_same_state = result_rdd.filter(lambda x: x[1][1] > 0)
    
    # Sort by count (descending) and take top 10
    top_10 = (patents_with_same_state
             .map(lambda x: (x[1][1], x))  # (count, full_record) for sorting
             .sortByKey(False)  # Sort by count descending
             .map(lambda x: x[1])  # Extract back the full record
             .take(10))
    
    # Create result dictionary
    result_dict = {}
    for patent_id, (state, count, full_data) in top_10:
        result_dict[patent_id] = (state, count, full_data)
    
    return result_dict, top_10, full_citation_data


In [20]:
# Run the solution with sample data first
print("=" * 50)
print("RUNNING WITH SAMPLE DATA")
print("=" * 50)

sample_result_dict, sample_top_10, sample_full_citation_data = solve_patent_problem(use_sample=True)

print(f"\nTop 10 patents with most same-state citations (SAMPLE):")
print("Patent ID | State | Same-State Citations")
print("-" * 45)
for patent_id, (state, count, full_data) in sample_top_10:
    print(f"{patent_id:8} | {state:5} | {count:16}")

RUNNING WITH SAMPLE DATA
Using 25.0% sample of the data...
Parsed citations: 4131964
US patents with state info: 445835
Sample patent states:
[(3070803, 'IL'), (3070804, 'OH'), (3070807, 'OH'), (3070810, 'IL'), (3070814, 'MN')]
Citations with cited state: 577741
Full citation data: 107682
Sample full citation data:
Citing: 4485657 (MI) -> Cited: 3818736 (IL)
Citing: 4485657 (MI) -> Cited: 3982415 (MI)
Citing: 4455558 (PA) -> Cited: 3550148 (PA)
Citing: 4487154 (WA) -> Cited: 3489998 (AL)
Citing: 5746431 (SC) -> Cited: 3492000 (MD)
Same-state citations: 23142
Patents with same-state citations: 20257
Sample same-state counts:
[(4485657, 1), (4455558, 1), (4013517, 1), (3911052, 1), (4536240, 1), (4487550, 2), (4482462, 1), (5009295, 2), (5058462, 1), (4134966, 2)]

Top 10 patents with most same-state citations (SAMPLE):
Patent ID | State | Same-State Citations
---------------------------------------------
 5936426 | CA    |               10
 5672153 | CA    |                9
 5879349 | 

In [21]:
# Verify one result from sample
if sample_top_10:
    print(f"\nVerification for top patent from sample: {sample_top_10[0][0]}")
    top_patent_id = sample_top_10[0][0]
    
    # Count all citations by this patent using the returned RDD
    all_citations_by_top = sample_full_citation_data.filter(lambda x: x[0] == top_patent_id)
    total_citations = all_citations_by_top.count()
    
    # Count same-state citations
    same_state_by_top = all_citations_by_top.filter(
        lambda x: x[1][0][1] == x[1][1]
    )
    same_state_count = same_state_by_top.count()
    
    print(f"Total citations by patent {top_patent_id}: {total_citations}")
    print(f"Same-state citations: {same_state_count}")



Verification for top patent from sample: 5936426
Total citations by patent 5936426: 13
Same-state citations: 10


In [22]:
# Run with full data (uncomment to run)

print("RUNNING WITH FULL DATA")
print("=" * 50)

full_result_dict, full_top_10, full_citation_data_rdd = solve_patent_problem(use_sample=False)

print(f"\\nTop 10 patents with most same-state citations (FULL DATA):")
print("Patent ID | State | Same-State Citations")
print("-" * 45)
for patent_id, (state, count, full_data) in full_top_10:
    print(f"{patent_id:8} | {state:5} | {count:16}")

# Display the result dictionary structure
print(f"\\nResult dictionary contains {len(full_result_dict)} entries")
print("Sample entries from result dictionary:")
for i, (patent_id, (state, count, full_data)) in enumerate(list(full_result_dict.items())[:3]):
    print(f"Patent {patent_id}: State={state}, Count={count}")
    print(f"  Full data preview: {full_data[:5]}...")
    print()

RUNNING WITH FULL DATA
Using full dataset...
Parsed citations: 16522438
US patents with state info: 1784989
Sample patent states:
[(3070802, 'TX'), (3070803, 'IL'), (3070804, 'OH'), (3070805, 'CA'), (3070806, 'PA')]
Citations with cited state: 9259246
Full citation data: 6920796
Sample full citation data:
Citing: 4325814 (NJ) -> Cited: 3364136 (NJ)
Citing: 4325814 (NJ) -> Cited: 3894934 (NJ)
Citing: 4325814 (NJ) -> Cited: 3808121 (NJ)
Citing: 5905860 (UT) -> Cited: 5103476 (VA)
Citing: 5905860 (UT) -> Cited: 4870568 (MA)
Same-state citations: 1488330
Patents with same-state citations: 571919
Sample same-state counts:
[(4325814, 3), (5905860, 3), (4798725, 3), (4209855, 8), (5969823, 2), (4980702, 2), (5783433, 1), (5201118, 2), (5455989, 4), (4962447, 1)]
\nTop 10 patents with most same-state citations (FULL DATA):
Patent ID | State | Same-State Citations
---------------------------------------------
 5959466 | CA    |              125
 5983822 | TX    |              103
 6008204 | CA 

In [None]:
# Clean up cached RDDs
def cleanup_cache():
    """Clean up cached RDDs to free memory"""
    try:
        citations_parsed.unpersist()
        patents_parsed.unpersist() 
        patent_states.unpersist()
        citations_with_cited_state.unpersist()
        full_citation_data.unpersist()
        same_state_counts.unpersist()
        print("Cache cleaned up successfully")
    except:
        print("Some RDDs may not have been cached yet")

print(f"\nSolution completed! Call cleanup_cache() to free memory.")
print("Uncomment the full data section above to run on complete dataset.")