# CSCI 4253 / 5253 - Lab #4 - Patent Problem with Spark RDD
<div>
 <h2> CSCI 4283 / 5253 
  <IMG SRC="https://www.colorado.edu/cs/profiles/express/themes/cuspirit/logo.png" WIDTH=50 ALIGN="right"/> </h2>
</div>

This [Spark cheatsheet](https://s3.amazonaws.com/assets.datacamp.com/blog_assets/PySpark_SQL_Cheat_Sheet_Python.pdf) is useful

In [94]:
from pyspark import SparkContext, SparkConf
import numpy as np
import operator

In [95]:
conf=SparkConf().setAppName("Lab4-rddd").setMaster("local[*]")
sc = SparkContext(conf=conf)

Using PySpark and RDD's on the https://coding.csel.io machines is very slow -- most of the code is executed in Python and this is much less efficient than the java-based code using the PySpark dataframes. Be patient and trying using `.cache()` to cache the output of joins. You may want to start with a reduced set of data before running the full task.

To that end, we've included code to just extract the last 200,000 lines of each file below using the Python "slice" notation. Using that subset of the data your "new patent" table should look like:

![Top partial 10 RDD self-state citations](top-subsample-rdd.png)

When you're ready to run the whole thing, just include all the data and run it again (...and wait...).

These two RDD's are called "rawCitations" and "rawPatents" because you probably want to process them futher (e.g. convert them to integer types, etc). If you haven't used Python "byte" types before, google it. You can convert a byte variable `x` into e.g. a UTF8 string using `x.decode('uft-8')`.

In [96]:
import gzip
with gzip.open('cite75_99.txt.gz', 'r') as f:
    rddCitations = sc.parallelize( f.readlines()[-800000:] )

In [119]:
with gzip.open('apat63_99.txt.gz', 'r') as f:
    rddPatents = sc.parallelize( f.readlines()[-800000:] )

In [98]:
def getData(line):
      line = line.split(b',')
      return (line[0],line[5])
patentOut = rddPatents.map(getData)

In [99]:
def getCitationData(line):
      line = line.split(b',')
      return (line[0],line[1].split(b'\n')[0])
citationsOut = rddCitations.map(getCitationData)

In [100]:
patentOut.take(5)
citationsOut.take(5)

[(b'5935430', b'5453769'),
 (b'5935430', b'5500071'),
 (b'5935430', b'5567868'),
 (b'5935430', b'5571410'),
 (b'5935430', b'5595650')]

In [101]:
input_join = citationsOut.join(patentOut)

In [102]:
input_join.take(5)

[(b'5935437', (b'4001117', b'"TX"')),
 (b'5935437', (b'5104375', b'"TX"')),
 (b'5935437', (b'5219328', b'"TX"')),
 (b'5935437', (b'5284244', b'"TX"')),
 (b'5935444', (b'2531427', b'"LA"'))]

In [103]:
def getCitationData(x):
      citing, (cited, citing_state) = x
      return (cited, (citing, citing_state))
swapped_out = input_join.map(getCitationData)

In [104]:
swapped_out.take(5)

[(b'4001117', (b'5935437', b'"TX"')),
 (b'5104375', (b'5935437', b'"TX"')),
 (b'5219328', (b'5935437', b'"TX"')),
 (b'5284244', (b'5935437', b'"TX"')),
 (b'2531427', (b'5935444', b'"LA"'))]

In [105]:
intermediate_output = swapped_out.join(patentOut)

In [106]:
def getIntermediateOutput(x):
      cited, ((citing, citing_state), cited_state) = x
      return (citing, (citing_state, cited, cited_state))
intermediate_output = intermediate_output.map(getIntermediateOutput)

In [107]:
intermediate_output.take(5)

[(b'5935536', (b'"FL"', b'5583166', b'""')),
 (b'5935560', (b'"IL"', b'5270036', b'"MI"')),
 (b'5935599', (b'"IL"', b'5478575', b'""')),
 (b'5935600', (b'"VA"', b'5622719', b'"VA"')),
 (b'5965162', (b'"VA"', b'5622719', b'"VA"'))]

In [108]:
def filterPerStateMatch(x):
      citing, (citing_state, cited, cited_state) = x
      return True if (citing_state != b'""' and (citing_state == cited_state)) else False
count_per_citing = intermediate_output.filter(filterPerStateMatch)

In [109]:
def mapKeyToCounter(x):
      citing, (citing_state, cited, cited_state) = x
      return (citing,1)
count_per_citing = count_per_citing.map(mapKeyToCounter).reduceByKey(lambda accumulator, val: accumulator + val)

In [122]:
count_per_citing = count_per_citing.sortBy(lambda x: x[1], ascending=False)
count_per_citing.take(10)

[(b'5959466', 94),
 (b'6008204', 80),
 (b'5952345', 78),
 (b'5999972', 77),
 (b'5998655', 76),
 (b'5958954', 76),
 (b'5987245', 76),
 (b'5980517', 73),
 (b'5951547', 73),
 (b'5998471', 65)]

In [120]:
def getFormattedPatentData(line):
      line_split = line.split(b',')
      return (line_split[0],line)
rddPatents = rddPatents.map(getFormattedPatentData)

In [123]:
rddPatents = rddPatents.join(count_per_citing)

In [125]:
def findCountKey(x):
      (key, (rest, count)) = x
      return count
rddPatents.sortBy(lambda x: findCountKey(x), ascending=False).take(10)

[(b'5959466',
  (b'5959466,1999,14515,1997,"US","CA",5310,2,,326,4,46,159,0,1,,0.6186,,4.8868,0.0455,0.044,,\n',
   94)),
 (b'6008204',
  (b'6008204,1999,14606,1998,"US","CA",749584,2,,514,3,31,121,0,1,,0.7415,,5,0.0085,0.0083,,\n',
   80)),
 (b'5952345',
  (b'5952345,1999,14501,1997,"US","CA",749584,2,,514,3,31,118,0,1,,0.7442,,5.1102,0,0,,\n',
   78)),
 (b'5999972',
  (b'5999972,1999,14585,1996,"US","CA",551495,2,,709,2,22,352,0,1,,0.8714,,4.0398,0.0117,0.0114,,\n',
   77)),
 (b'5998655',
  (b'5998655,1999,14585,1998,"US","CA",,1,,560,1,14,114,0,1,,0.7387,,5.1667,,,,\n',
   76)),
 (b'5958954',
  (b'5958954,1999,14515,1997,"US","CA",749584,2,,514,3,31,116,0,1,,0.7397,,5.181,0,0,,\n',
   76)),
 (b'5987245',
  (b'5987245,1999,14564,1996,"US","CA",551495,2,,709,2,22,341,0,1,,0.8737,,4.0587,0.0121,0.0117,,\n',
   76)),
 (b'5980517',
  (b'5980517,1999,14557,1998,"US","CA",733846,2,,606,3,32,241,0,1,,0.7394,,8.3776,0,0,,\n',
   73)),
 (b'5951547',
  (b'5951547,1999,14501,1997,"US","CA",7338

In [None]:
#sc.stop()