# CSCI 4253 / 5253 - Lab #4 - Patent Problem with Spark RDD
<div>
 <h2> CSCI 4283 / 5253 
  <IMG SRC="https://www.colorado.edu/cs/profiles/express/themes/cuspirit/logo.png" WIDTH=50 ALIGN="right"/> </h2>
</div>

### Solution using RDD: Shreyas Gopalakrishna

This [Spark cheatsheet](https://s3.amazonaws.com/assets.datacamp.com/blog_assets/PySpark_SQL_Cheat_Sheet_Python.pdf) is useful

In [1]:
from pyspark import SparkContext, SparkConf
import numpy as np
import operator

In [2]:
conf=SparkConf().setAppName("Lab4-rddd").setMaster("local[*]")
sc = SparkContext(conf=conf)

Using PySpark and RDD's on the https://coding.csel.io machines is very slow -- most of the code is executed in Python and this is much less efficient than the java-based code using the PySpark dataframes. Be patient and trying using `.cache()` to cache the output of joins. You may want to start with a reduced set of data before running the full task.

To that end, we've included code to just extract the last 200,000 lines of each file below using the Python "slice" notation. Using that subset of the data your "new patent" table should look like:

![Top partial 10 RDD self-state citations](top-subsample-rdd.png)

When you're ready to run the whole thing, just include all the data and run it again (...and wait...).

These two RDD's are called "rawCitations" and "rawPatents" because you probably want to process them futher (e.g. convert them to integer types, etc). If you haven't used Python "byte" types before, google it. You can convert a byte variable `x` into e.g. a UTF8 string using `x.decode('uft-8')`.

In [3]:
import gzip
with gzip.open('cite75_99.txt.gz', 'r') as f:
    rddCitations = sc.parallelize( f.readlines()[-800000:])

In [4]:
with gzip.open('apat63_99.txt.gz', 'r') as f:
    rddPatents = sc.parallelize( f.readlines()[-800000:])

#### Decoding to get in utf-8 format

In [5]:
rddCitations.take(1)[0].decode("utf-8").replace('\n','')

'5935430,5453769'

In [6]:
rddPatents.take(1)

[b'5200108,1993,12149,1989,"JP","",265595,3,6,252,1,19,8,12,1,0.4028,0.6875,4.4167,3.125,0.875,0.875,0.3333,0.3333\n']

In [7]:
rddCitations = rddCitations.map(lambda xs: xs.decode("utf-8").replace('\n',''))
rddPatents = rddPatents.map(lambda xs: xs.decode("utf-8").replace('\n',''))

In [8]:
rddCitations.take(10)

['5935430,5453769',
 '5935430,5500071',
 '5935430,5567868',
 '5935430,5571410',
 '5935430,5595650',
 '5935430,5792943',
 '5935430,5888390',
 '5935431,4141830',
 '5935431,4156652',
 '5935431,4179616']

In [9]:
rddPatents.take(10)

['5200108,1993,12149,1989,"JP","",265595,3,6,252,1,19,8,12,1,0.4028,0.6875,4.4167,3.125,0.875,0.875,0.3333,0.3333',
 '5200109,1993,12149,1992,"JP","",87490,3,201,252,1,19,2,9,1,0,0.5,3.1111,6,1,0.5,0.4444,0.4444',
 '5200110,1993,12149,1992,"JP","",503380,3,20,252,1,19,5,1,1,0,0.32,3,4,0.4,0.4,0,0',
 '5200111,1993,12149,1989,"JP","",39150,3,2,252,1,19,3,0,1,,0.6667,,5,0,0,,',
 '5200112,1993,12149,1991,"US","NY",280070,2,19,252,1,19,1,13,1,0.8521,0,4,2,0,0,0.1667,0.1538',
 '5200113,1993,12149,1989,"DE","",104245,2,25,252,1,19,8,1,1,0,0.6563,6,10.5,0.8571,0.75,0,0',
 '5200114,1993,12149,1991,"US","PA",350970,2,7,510,1,19,15,3,0.8,0.6667,0.7778,3.3333,15.0667,0,0,0,0',
 '5200115,1993,12149,1991,"DE","",247990,3,12,510,1,19,10,4,1,0.375,0.18,4.25,4.7,0.5,0.5,0.25,0.25',
 '5200116,1993,12149,1990,"GB","",450155,2,25,252,1,19,16,29,1,0.3615,0.75,4.6552,10.9375,0.2,0.1875,0.1724,0.1724',
 '5200117,1993,12149,1992,"US","TX",381140,2,16,510,1,19,18,7,0.9444,0.7347,0.7889,4,13.6111,0,0,0,0']

#### Data formtting to store as key value pair

In [10]:
xxx = rddCitations.map(lambda xs: (int(xs.split(',')[0]),int(xs.split(',')[1])))
yyy = rddPatents.map(lambda xs: (int(xs.split(',')[0]),','.join(xs.split(',')[1:])))
yyy.take(10)

[(5200108,
  '1993,12149,1989,"JP","",265595,3,6,252,1,19,8,12,1,0.4028,0.6875,4.4167,3.125,0.875,0.875,0.3333,0.3333'),
 (5200109,
  '1993,12149,1992,"JP","",87490,3,201,252,1,19,2,9,1,0,0.5,3.1111,6,1,0.5,0.4444,0.4444'),
 (5200110,
  '1993,12149,1992,"JP","",503380,3,20,252,1,19,5,1,1,0,0.32,3,4,0.4,0.4,0,0'),
 (5200111,
  '1993,12149,1989,"JP","",39150,3,2,252,1,19,3,0,1,,0.6667,,5,0,0,,'),
 (5200112,
  '1993,12149,1991,"US","NY",280070,2,19,252,1,19,1,13,1,0.8521,0,4,2,0,0,0.1667,0.1538'),
 (5200113,
  '1993,12149,1989,"DE","",104245,2,25,252,1,19,8,1,1,0,0.6563,6,10.5,0.8571,0.75,0,0'),
 (5200114,
  '1993,12149,1991,"US","PA",350970,2,7,510,1,19,15,3,0.8,0.6667,0.7778,3.3333,15.0667,0,0,0,0'),
 (5200115,
  '1993,12149,1991,"DE","",247990,3,12,510,1,19,10,4,1,0.375,0.18,4.25,4.7,0.5,0.5,0.25,0.25'),
 (5200116,
  '1993,12149,1990,"GB","",450155,2,25,252,1,19,16,29,1,0.3615,0.75,4.6552,10.9375,0.2,0.1875,0.1724,0.1724'),
 (5200117,
  '1993,12149,1992,"US","TX",381140,2,16,510,1,19,1

#### Joining Citations data with Patent data such that the citing number matches the patent info

In [11]:
rdd3 = xxx.join(yyy)
rdd3.take(1)

[(5935432,
  (3064647,
   '1999,14466,1997,"DK","",76070,2,,210,1,19,31,0,0.9677,,0.7889,,14.9677,0,0,,'))]

In [12]:
rdd3.take(10)

[(5935432,
  (3064647,
   '1999,14466,1997,"DK","",76070,2,,210,1,19,31,0,0.9677,,0.7889,,14.9677,0,0,,')),
 (5935432,
  (3078847,
   '1999,14466,1997,"DK","",76070,2,,210,1,19,31,0,0.9677,,0.7889,,14.9677,0,0,,')),
 (5935432,
  (3799342,
   '1999,14466,1997,"DK","",76070,2,,210,1,19,31,0,0.9677,,0.7889,,14.9677,0,0,,')),
 (5935432,
  (3908893,
   '1999,14466,1997,"DK","",76070,2,,210,1,19,31,0,0.9677,,0.7889,,14.9677,0,0,,')),
 (5935432,
  (3911918,
   '1999,14466,1997,"DK","",76070,2,,210,1,19,31,0,0.9677,,0.7889,,14.9677,0,0,,')),
 (5935432,
  (3932277,
   '1999,14466,1997,"DK","",76070,2,,210,1,19,31,0,0.9677,,0.7889,,14.9677,0,0,,')),
 (5935432,
  (4086924,
   '1999,14466,1997,"DK","",76070,2,,210,1,19,31,0,0.9677,,0.7889,,14.9677,0,0,,')),
 (5935432,
  (4141887,
   '1999,14466,1997,"DK","",76070,2,,210,1,19,31,0,0.9677,,0.7889,,14.9677,0,0,,')),
 (5935432,
  (4300717,
   '1999,14466,1997,"DK","",76070,2,,210,1,19,31,0,0.9677,,0.7889,,14.9677,0,0,,')),
 (5935432,
  (4359049,
   '1

#### Rearranging data as (cited,(citing,citing State))

In [13]:
aaa = rdd3.map(lambda xs: (xs[1][0],(xs[0],xs[1][1].split(',')[4])))
aaa.take(2)

[(3064647, (5935432, '""')), (3078847, (5935432, '""'))]

#### Joining the table with Patent data such that the cited number matches the patent info

In [14]:
bbb = aaa.join(yyy)
bbb.take(10)

[(5304348,
  ((5935432, '""'),
   '1994,12527,1992,"US","CA",540,2,15,422,1,19,18,7,1,0.6122,0.5926,4.4286,7.1667,0.0833,0.0556,0.2857,0.2857')),
 (5304348,
  ((6002475, '"OH"'),
   '1994,12527,1992,"US","CA",540,2,15,422,1,19,18,7,1,0.6122,0.5926,4.4286,7.1667,0.0833,0.0556,0.2857,0.2857')),
 (5304348,
  ((5958253, '""'),
   '1994,12527,1992,"US","CA",540,2,15,422,1,19,18,7,1,0.6122,0.5926,4.4286,7.1667,0.0833,0.0556,0.2857,0.2857')),
 (5540244,
  ((5935536, '"FL"'),
   '1996,13360,1993,"US","AR",7701,2,7,134,6,69,19,2,0.9474,0,0.8519,4.5,16.6316,0.0714,0.0526,0,0')),
 (5413788,
  ((5935608, '""'),
   '1995,12912,1989,"GB","",296280,3,8,424,3,31,7,2,0.7143,0,0.32,7,24.1429,0.6667,0.2857,0,0')),
 (5366736,
  ((5935624, '"WI"'),
   '1994,12744,1993,"US","GA",599530,2,20,424,3,31,1,4,1,0,0,3.75,3,0,0,0,0')),
 (5366736,
  ((5968565, '"OK"'),
   '1994,12744,1993,"US","GA",599530,2,20,424,3,31,1,4,1,0,0,3.75,3,0,0,0,0')),
 (5500668,
  ((5935688, '""'),
   '1996,13227,1994,"CA","",635240,2,2

#### Rearranging data so that it is (citing, citing state, cited, cited state)

In [15]:
ccc = bbb.map(lambda xs: (xs[1][0][0],(xs[1][0][1],xs[0],xs[1][1].split(',')[4])))
ccc.take(1)

[(5935432, ('""', 5304348, '"CA"'))]

In [16]:
ccc.take(10)

[(5935432, ('""', 5304348, '"CA"')),
 (6002475, ('"OH"', 5304348, '"CA"')),
 (5958253, ('""', 5304348, '"CA"')),
 (5935536, ('"FL"', 5540244, '"AR"')),
 (5935608, ('""', 5413788, '""')),
 (5935624, ('"WI"', 5366736, '"GA"')),
 (5968565, ('"OK"', 5366736, '"GA"')),
 (5935688, ('""', 5500668, '""')),
 (5989650, ('""', 5500668, '""')),
 (5935696, ('"NY"', 5366796, '""'))]

In [17]:
ddd = ccc.map(lambda xs: (xs[0],xs[1][0],xs[1][1],xs[1][2]))
ddd.take(10)

[(5935432, '""', 5304348, '"CA"'),
 (6002475, '"OH"', 5304348, '"CA"'),
 (5958253, '""', 5304348, '"CA"'),
 (5935536, '"FL"', 5540244, '"AR"'),
 (5935608, '""', 5413788, '""'),
 (5935624, '"WI"', 5366736, '"GA"'),
 (5968565, '"OK"', 5366736, '"GA"'),
 (5935688, '""', 5500668, '""'),
 (5989650, '""', 5500668, '""'),
 (5935696, '"NY"', 5366796, '""')]

#### Removing empty strings and getting same state citations using filter

In [18]:
eee = ddd.filter(lambda xs: xs[1]!='""' and xs[3]!='""' and xs[1]==xs[3])

In [19]:
eee.take(10)

[(5935712, '"NY"', 5269740, '"NY"'),
 (5998033, '"NY"', 5269740, '"NY"'),
 (5960245, '"NY"', 5269740, '"NY"'),
 (5935712, '"NY"', 5480724, '"NY"'),
 (5998033, '"NY"', 5480724, '"NY"'),
 (5960245, '"NY"', 5480724, '"NY"'),
 (5935784, '"IA"', 5550024, '"IA"'),
 (5939264, '"IA"', 5550024, '"IA"'),
 (5935960, '"CT"', 5359068, '"CT"'),
 (5936224, '"WA"', 5553084, '"WA"')]

In [20]:
# keeping it in (k,v) format
fff = eee.map(lambda xs: (xs[0],(xs[1],xs[2],xs[3])))
fff.take(10)

[(5935712, ('"NY"', 5269740, '"NY"')),
 (5998033, ('"NY"', 5269740, '"NY"')),
 (5960245, ('"NY"', 5269740, '"NY"')),
 (5935712, ('"NY"', 5480724, '"NY"')),
 (5998033, ('"NY"', 5480724, '"NY"')),
 (5960245, ('"NY"', 5480724, '"NY"')),
 (5935784, ('"IA"', 5550024, '"IA"')),
 (5939264, ('"IA"', 5550024, '"IA"')),
 (5935960, ('"CT"', 5359068, '"CT"')),
 (5936224, ('"WA"', 5553084, '"WA"'))]

#### Grouping by key and counting the length which gives us the count of same citations

In [21]:
ggg = fff.groupByKey().mapValues(len).sortBy(lambda a: a[1], ascending=False)

In [22]:
ggg.take(10)

[(5959466, 94),
 (6008204, 80),
 (5952345, 78),
 (5999972, 77),
 (5987245, 76),
 (5958954, 76),
 (5998655, 76),
 (5951547, 73),
 (5980517, 73),
 (5998471, 65)]

#### Joining it with patent info to add same state citations as a column

In [23]:
hhh = yyy.leftOuterJoin(ggg)
hhh.take(10)

[(5200112,
  ('1993,12149,1991,"US","NY",280070,2,19,252,1,19,1,13,1,0.8521,0,4,2,0,0,0.1667,0.1538',
   None)),
 (5200128,
  ('1993,12149,1992,"IT","",690995,3,4,264,5,51,5,1,0.8,0,0.375,2,19,0,0,0,0',
   None)),
 (5200144,
  ('1993,12149,1992,"US","ID",597195,6,4,376,4,44,5,1,1,0,0.32,2,13.6,0.4,0.4,0,0',
   None)),
 (5200160,
  ('1993,12149,1991,"US","PA",154240,2,18,423,1,19,9,6,1,0.5,0.1975,4,5.4444,0,0,0.8333,0.8333',
   None)),
 (5200176,
  ('1993,12149,1989,"US","CA",218000,2,21,424,3,31,4,9,1,0.6173,0.375,6.3333,3.5,0,0,0,0',
   None)),
 (5200192,
  ('1993,12149,1988,"DE","",,1,23,424,3,31,5,2,1,0,0.32,6.5,8,,,,', None)),
 (5200208,
  ('1993,12149,1988,"US","OH",601635,2,18,425,5,51,9,4,0.8889,0.375,0.2188,8.5,16.8889,0,0,0,0',
   None)),
 (5200224,
  ('1993,12149,1991,"US","PA",416145,2,10,426,6,61,12,1,0.9167,0,0.5455,6,16,0.2,0.0833,0,0',
   None)),
 (5200240,
  ('1993,12149,1991,"CA","",,1,11,428,6,69,5,6,1,0.6667,0.48,4.3333,12,,,,',
   None)),
 (5200256,
  ('1993,12149,1

In [29]:
#handling none type
def none1(a):
    if(a[1][1] == None):
        return (a[0],(a[1][0],0))
    else:
        return a
iii = hhh.map(none1)

In [30]:
iii.take(10)

[(5200112,
  ('1993,12149,1991,"US","NY",280070,2,19,252,1,19,1,13,1,0.8521,0,4,2,0,0,0.1667,0.1538',
   0)),
 (5200128,
  ('1993,12149,1992,"IT","",690995,3,4,264,5,51,5,1,0.8,0,0.375,2,19,0,0,0,0',
   0)),
 (5200144,
  ('1993,12149,1992,"US","ID",597195,6,4,376,4,44,5,1,1,0,0.32,2,13.6,0.4,0.4,0,0',
   0)),
 (5200160,
  ('1993,12149,1991,"US","PA",154240,2,18,423,1,19,9,6,1,0.5,0.1975,4,5.4444,0,0,0.8333,0.8333',
   0)),
 (5200176,
  ('1993,12149,1989,"US","CA",218000,2,21,424,3,31,4,9,1,0.6173,0.375,6.3333,3.5,0,0,0,0',
   0)),
 (5200192,
  ('1993,12149,1988,"DE","",,1,23,424,3,31,5,2,1,0,0.32,6.5,8,,,,', 0)),
 (5200208,
  ('1993,12149,1988,"US","OH",601635,2,18,425,5,51,9,4,0.8889,0.375,0.2188,8.5,16.8889,0,0,0,0',
   0)),
 (5200224,
  ('1993,12149,1991,"US","PA",416145,2,10,426,6,61,12,1,0.9167,0,0.5455,6,16,0.2,0.0833,0,0',
   0)),
 (5200240,
  ('1993,12149,1991,"CA","",,1,11,428,6,69,5,6,1,0.6667,0.48,4.3333,12,,,,',
   0)),
 (5200256,
  ('1993,12149,1989,"US","LA",,1,16,428,6,6

#### Final output sorted based on count of same citations

In [31]:
output = iii.sortBy(lambda a: a[1][1], ascending=False)
output.take(10)

[(5959466,
  ('1999,14515,1997,"US","CA",5310,2,,326,4,46,159,0,1,,0.6186,,4.8868,0.0455,0.044,,',
   94)),
 (6008204,
  ('1999,14606,1998,"US","CA",749584,2,,514,3,31,121,0,1,,0.7415,,5,0.0085,0.0083,,',
   80)),
 (5952345,
  ('1999,14501,1997,"US","CA",749584,2,,514,3,31,118,0,1,,0.7442,,5.1102,0,0,,',
   78)),
 (5999972,
  ('1999,14585,1996,"US","CA",551495,2,,709,2,22,352,0,1,,0.8714,,4.0398,0.0117,0.0114,,',
   77)),
 (5958954,
  ('1999,14515,1997,"US","CA",749584,2,,514,3,31,116,0,1,,0.7397,,5.181,0,0,,',
   76)),
 (5987245,
  ('1999,14564,1996,"US","CA",551495,2,,709,2,22,341,0,1,,0.8737,,4.0587,0.0121,0.0117,,',
   76)),
 (5998655,
  ('1999,14585,1998,"US","CA",,1,,560,1,14,114,0,1,,0.7387,,5.1667,,,,', 76)),
 (5980517,
  ('1999,14557,1998,"US","CA",733846,2,,606,3,32,241,0,1,,0.7394,,8.3776,0,0,,',
   73)),
 (5951547,
  ('1999,14501,1997,"US","CA",733846,2,,606,3,32,242,0,1,,0.7382,,8.3471,0,0,,',
   73)),
 (5998471,
  ('1999,14585,1998,"US","CA",749584,2,,514,3,31,103,0,1,,0.

In [32]:
output.count()

800000