# CSCI 4253 / 5253 - Lab #4 - Patent Problem with Spark RDD - SOLUTION
<div>
 <h2> CSCI 4283 / 5253 
  <IMG SRC="https://www.colorado.edu/cs/profiles/express/themes/cuspirit/logo.png" WIDTH=50 ALIGN="right"/> </h2>
</div>

This [Spark cheatsheet](https://s3.amazonaws.com/assets.datacamp.com/blog_assets/PySpark_SQL_Cheat_Sheet_Python.pdf) is useful

In [3]:
from pyspark import SparkContext, SparkConf
import numpy as np
import operator

In [6]:
conf=SparkConf().setAppName("Lab4-rdd").setMaster("local[*]")
sc = SparkContext(conf=conf)

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/02/06 15:34:32 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
26/02/06 15:34:32 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


Using PySpark and RDD's on the https://coding.csel.io machines is slow -- most of the code is executed in Python and this is much less efficient than the java-based code using the PySpark dataframes. Be patient and trying using `.cache()` to cache the output of joins. You may want to start with a reduced set of data before running the full task. You can use the `sample()` method to extract just a sample of the data or use 

These two RDD's are called "rawCitations" and "rawPatents" because you probably want to process them futher (e.g. convert them to integer types, etc). 

The `textFile` function returns data in strings. This should work fine for this lab.

Other methods you use might return data in type `Byte`. If you haven't used Python `Byte` types before, google it. You can convert a value of `x` type byte into e.g. a UTF8 string using `x.decode('uft-8')`. Alternatively, you can use the `open` method of the gzip library to read in all the lines as UTF-8 strings like this:
```
import gzip
with gzip.open('cite75_99.txt.gz', 'rt',encoding='utf-8') as f:
    rddCitations = sc.parallelize( f.readlines() )
```
This is less efficient than using `textFile` because `textFile` would use the underlying HDFS or other file system to read the file across all the worker nodes while the using `gzip.open()...readlines()` will read all the data in the frontend and then distribute it to all the worker nodes.

In [131]:
rddCitations = sc.textFile("cite75_99.txt.gz")
rddPatents = sc.textFile("apat63_99.txt.gz")

The data looks like the following.

In [132]:
rddCitations.take(5)

                                                                                

['"CITING","CITED"',
 '3858241,956203',
 '3858241,1324234',
 '3858241,3398406',
 '3858241,3557384']

In [133]:
rddPatents.take(5)

                                                                                

['"PATENT","GYEAR","GDATE","APPYEAR","COUNTRY","POSTATE","ASSIGNEE","ASSCODE","CLAIMS","NCLASS","CAT","SUBCAT","CMADE","CRECEIVE","RATIOCIT","GENERAL","ORIGINAL","FWDAPLAG","BCKGTLAG","SELFCTUB","SELFCTLB","SECDUPBD","SECDLWBD"',
 '3070801,1963,1096,,"BE","",,1,,269,6,69,,1,,0,,,,,,,',
 '3070802,1963,1096,,"US","TX",,1,,2,6,63,,0,,,,,,,,,',
 '3070803,1963,1096,,"US","IL",,1,,2,6,63,,9,,0.3704,,,,,,,',
 '3070804,1963,1096,,"US","OH",,1,,2,6,63,,3,,0.6667,,,,,,,']

In other words, they are a single string with multiple CSV's. You will need to convert these to (K,V) pairs, probably convert the keys to `int` and so on. You'll need to `filter` out the header string as well since there's no easy way to extract all the lines except the first.

In [134]:
patentHeader = rddPatents.first()


citationHeader = rddCitations.first()


                                                                                

In [135]:
rddPatentsNoHeader = rddPatents.filter(lambda x: x != patentHeader).cache()

In [136]:
rddPatentsNoHeader.take(5)

                                                                                

['3070801,1963,1096,,"BE","",,1,,269,6,69,,1,,0,,,,,,,',
 '3070802,1963,1096,,"US","TX",,1,,2,6,63,,0,,,,,,,,,',
 '3070803,1963,1096,,"US","IL",,1,,2,6,63,,9,,0.3704,,,,,,,',
 '3070804,1963,1096,,"US","OH",,1,,2,6,63,,3,,0.6667,,,,,,,',
 '3070805,1963,1096,,"US","CA",,1,,2,6,63,,1,,0,,,,,,,']

In [137]:
patentHeader = patentHeader.split(",")
patentHeader = [x.strip("\"") for x in patentHeader]

citationHeader = citationHeader.split(",")
citationHeader = [x.strip("\"") for x in citationHeader]

In [139]:
rddPatentsSplit = rddPatentsNoHeader.map(lambda x: (int(x.split(",")[0].strip()), {patentHeader[i+1]: int(val.strip()) if val.strip().isdigit() else val.strip("\"") for i, val in enumerate(x.split(",")[1:])}))

In [140]:
rddPatentState = rddPatentsSplit.map(lambda x: (x[0], x[1]['POSTATE']))

In [144]:
rddPatentState = rddPatentState.filter(lambda x: x[1] != "").cache()

In [145]:
rddPatentState.take(5)

                                                                                

[(3070802, 'TX'),
 (3070803, 'IL'),
 (3070804, 'OH'),
 (3070805, 'CA'),
 (3070806, 'PA')]

In [146]:
rCitationsNoHeader = rddCitations.filter(lambda x: x != citationHeader)
rCitationsSplit = rCitationsNoHeader.map(lambda x: {citationHeader[i]: int(val.strip()) if val.strip().isdigit() else val.strip("\"") for i, val in enumerate(x.split(","))}).cache()

rCitationsDict = rCitationsSplit.map(lambda x: {key: (None if value == "" else value) for key, value in x.items()}).cache()
rCitationsDict.take(5)



                                                                                

[{'CITING': 'CITING', 'CITED': 'CITED'},
 {'CITING': 3858241, 'CITED': 956203},
 {'CITING': 3858241, 'CITED': 1324234},
 {'CITING': 3858241, 'CITED': 3398406},
 {'CITING': 3858241, 'CITED': 3557384}]

In [147]:
rddCiting = rCitationsSplit.map(lambda x: (x['CITING'], x)).cache()
rddCited = rCitationsSplit.map(lambda x: (x['CITED'], x)).cache()




In [148]:
rddCitationsState = rddCiting.leftOuterJoin(rddPatentState).cache()

In [149]:
rddCitingWithState = rddCitationsState.map(lambda x: (
    x[0],  # CITING
    (x[1][0]['CITED'], x[1][1])  # (CITED, CITING_STATE)
)).cache()

In [150]:
rddCitedState = rddCited.leftOuterJoin(rddPatentState).cache()

In [151]:
rddCitedStateKV = rddCitedState.map(lambda x: (
    x[1][0]['CITED'],  # CITED from citation dict
    x[1][1]  # CITED_STATE
)).cache()

In [152]:
rddWithBothStates = rddCitingWithState.map(lambda x: (x[1][0], (x[0], x[1][1]))).join(rddCitedStateKV).cache()
# (CITING, ((CITED, CITING_STATE), CITED_STATE))

In [153]:
rddWithBothStates.take(5)

26/02/06 17:09:42 WARN MemoryStore: Not enough space to cache rdd_190_0 in memory! (computed 93.4 MiB so far)
26/02/06 17:09:42 WARN BlockManager: Putting block rdd_190_0 failed
26/02/06 17:09:42 WARN BlockManager: Block rdd_190_0 could not be removed as it was not found on disk or in memory
26/02/06 17:09:45 WARN MemoryStore: Not enough space to cache rdd_191_0 in memory! (computed 176.7 MiB so far)
26/02/06 17:09:45 WARN BlockManager: Putting block rdd_191_0 failed
26/02/06 17:09:45 WARN BlockManager: Block rdd_191_0 could not be removed as it was not found on disk or in memory
26/02/06 17:12:17 WARN MemoryStore: Not enough space to cache rdd_200_0 in memory! (computed 19.0 MiB so far)
26/02/06 17:12:17 WARN BlockManager: Putting block rdd_200_0 failed
26/02/06 17:12:17 WARN BlockManager: Block rdd_200_0 could not be removed as it was not found on disk or in memory
26/02/06 17:12:18 WARN MemoryStore: Not enough space to cache rdd_200_1 in memory! (computed 28.9 MiB so far)
26/02/06 1

KeyboardInterrupt: 

26/02/06 17:13:21 WARN BlockManager: Putting block rdd_217_0 failed due to exception org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/homebrew/Caskroom/miniforge/base/envs/lab4/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 3386, in main
    process()
  File "/opt/homebrew/Caskroom/miniforge/base/envs/lab4/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 3375, in process
    out_iter = func(split_index, iterator)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Caskroom/miniforge/base/envs/lab4/lib/python3.11/site-packages/pyspark/core/rdd.py", line 5306, in pipeline_func
    return func(split, prev_func(split, iterator))
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Caskroom/miniforge/base/envs/lab4/lib/python3.11/site-packages/pyspark/core/rdd.py", line 5306, in pipeline_func
    return func(split, prev_func(split, iterator))


In [110]:
rddWithBothStatesSame = rddWithBothStates.filter(lambda x: x[1][1] == x[1][0][1]).cache()

In [None]:
rddWithBothStatesSame.groupBy(lambda x: x[1][0]).agg(count("CITING").alias("CITATION_COUNT")).orderBy(col("CITATION_COUNT").desc()).show()

[(3707004, ((3858242, None), None)),
 (3707004, ((3858242, None), None)),
 (3707004, ((3858242, None), None)),
 (3707004, ((3978525, None), None)),
 (3707004, ((3978525, None), None))]