# CSCI 4253 / 5253 - Lab #4 - Patent Problem with Spark RDD
<div>
 <h2> CSCI 4283 / 5253 
  <IMG SRC="https://www.colorado.edu/cs/profiles/express/themes/cuspirit/logo.png" WIDTH=50 ALIGN="right"/> </h2>
</div>

This [Spark cheatsheet](https://s3.amazonaws.com/assets.datacamp.com/blog_assets/PySpark_SQL_Cheat_Sheet_Python.pdf) is useful

In [1]:
from pyspark import SparkContext, SparkConf
import numpy as np
import operator

In [2]:
conf=SparkConf().setAppName("Lab4-rddd").setMaster("local[*]")
sc = SparkContext(conf=conf)

Using PySpark and RDD's on the https://coding.csel.io machines is very slow -- most of the code is executed in Python and this is much less efficient than the java-based code using the PySpark dataframes. Be patient and trying using `.cache()` to cache the output of joins. You may want to start with a reduced set of data before running the full task.

To that end, we've included code to just extract the last 200,000 lines of each file below using the Python "slice" notation. Using that subset of the data your "new patent" table should look like:

![Top partial 10 RDD self-state citations](top-subsample-rdd.png)

When you're ready to run the whole thing, just include all the data and run it again (...and wait...).

These two RDD's are called "rawCitations" and "rawPatents" because you probably want to process them futher (e.g. convert them to integer types, etc). If you haven't used Python "byte" types before, google it. You can convert a byte variable `x` into e.g. a UTF8 string using `x.decode('uft-8')`.

#### Extracting 800,000 lines from Citations and Patetns data

In [3]:
import gzip
with gzip.open('cite75_99.txt.gz', 'r') as f:
    rddCitations = sc.parallelize( f.readlines()[-800000:] )

In [4]:
with gzip.open('apat63_99.txt.gz', 'r') as f:
    rddPatents = sc.parallelize( f.readlines()[-800000:] )

#### Converitng data in bytes format to string format 

In [5]:
# decoding from byte order to string
rddCitations1 = rddCitations.map(lambda x: x.decode('utf-8').rstrip('\n'))

In [6]:
# decoding from byte order to string
rddPatents1 = rddPatents.map(lambda x: x.decode('utf-8').rstrip('\n'))

#### Subsetting the patent data on columns "PATENT" and "POSTATE" <br/> where "PATENT" is the key, table "**patents_sub**"
#### This table is used to join with Citation data

In [7]:
#subset patent data on two columns
patents_sub = rddPatents1.map(lambda x: x.split(","))\
           .map(lambda x: (x[0],x[5]))

In [8]:
patents_sub.take(4)

[('5200108', '""'), ('5200109', '""'), ('5200110', '""'), ('5200111', '""')]

#### Creating table "**citing_as_key**" to make citing as key and value pair

In [9]:
citing_as_key = rddCitations1.map(lambda x: x.split(","))\
                             .map(lambda x: (x[0],x[0]))\
                             .distinct()

In [10]:
citing_as_key.take(4)

[('5935430', '5935430'),
 ('5935432', '5935432'),
 ('5935438', '5935438'),
 ('5935451', '5935451')]

#### Creating table "**cited_as_key**" to make cited as key and citing as value pair

In [11]:
cited_as_key = rddCitations1.map(lambda x: x.split(","))\
                         .map(lambda x: (x[1],x[0]))

In Cited_as_key, first element is the key "cited" and value is "citing"

In [12]:
cited_as_key.take(4)

[('5453769', '5935430'),
 ('5500071', '5935430'),
 ('5567868', '5935430'),
 ('5571410', '5935430')]

#### Joined "**cited_as_key**" and "**patents_sub**" based on "**cited**" as key <br/> resulting in **Table1** 

In [13]:
# joined based on Cited and patent 
table1 = cited_as_key.join(patents_sub)\
                     .map(lambda x: (x[1][0],(x[0],x[1][1])))

In [14]:
table1.take(4)

[('5935430', ('5453769', '"CA"')),
 ('5988786', ('5453769', '"CA"')),
 ('5997708', ('5453769', '"CA"')),
 ('6003977', ('5453769', '"CA"'))]

#### Joined "**citing_as_key**" and "**patents_sub**" based on "**citing**" as key <br/> resulting in **Table2** 

In [15]:
table2 = citing_as_key.join(patents_sub)

In [16]:
table2 = table2.map(lambda x: (x[0],x[1][1]))

In [17]:
table2.take(4)

[('5935437', '"TX"'),
 ('5935444', '"LA"'),
 ('5935445', '""'),
 ('5935459', '""')]

#### Join table1 and table 2 to get intermediate table for citing, citing_state, cited and cited_state

In [18]:
table3 = table2.join(table1)

In [19]:
table3.take(4)

[('5935444', ('"LA"', ('5567318', '"IL"'))),
 ('5935467', ('"NY"', ('5353292', '""'))),
 ('5935467', ('"NY"', ('5325229', '"OK"'))),
 ('5935467', ('"NY"', ('5363391', '"CA"')))]

In [20]:
table4 = table3.map(lambda x:((x[0],x[1][0]),(x[1][1][0],x[1][1][1])))\
               .keyBy(lambda x:x[0][0])

In [21]:
table4.take(4)

[('5935444', (('5935444', '"LA"'), ('5567318', '"IL"'))),
 ('5935467', (('5935467', '"NY"'), ('5353292', '""'))),
 ('5935467', (('5935467', '"NY"'), ('5325229', '"OK"'))),
 ('5935467', (('5935467', '"NY"'), ('5363391', '"CA"')))]

#### Count the number of same state citations using groupby opertation and stored into table5

In [22]:
table5 = table4.filter(lambda x: x[1][0][1]!='""')\
      .filter(lambda x: x[1][0][1]==x[1][1][1])\
      .groupByKey()\
      .map(lambda x:(x[0],[len(list(x[1]))]))

#### Same-state citation count for each citing 

In [23]:
table5.take(4)

[('5935536', [1]), ('5935600', [6]), ('5935640', [1]), ('5935662', [1])]

In [24]:
new_patent_table = rddPatents1.map(lambda x:x.split(","))\
                              .map(lambda x: (x[0],x[1:]))

#### Join the count of same-state citations to the patent-orginal table 

In [25]:
new_patent_table1 = new_patent_table.join(table5)\
                        .map(lambda x:(x[0],x[1][0] + x[1][1]))

In [26]:
new_patent_table1_sorted = new_patent_table1.map(lambda x:(x[0],x[1][0:22],x[1][22]))\
                                            .sortBy(lambda x:x[2],ascending=False)

#### Sorted the new table by count in descending order

In [27]:
sorted_patent_table = new_patent_table1_sorted.map(lambda x:(x[0],x[1]+[x[2]]))

#### Printing the top 10 same-state counts resulting from 800,000 lines of citations and patent 

In [28]:
for i in sorted_patent_table.take(10):
    print(i)

('5959466', ['1999', '14515', '1997', '"US"', '"CA"', '5310', '2', '', '326', '4', '46', '159', '0', '1', '', '0.6186', '', '4.8868', '0.0455', '0.044', '', '', 94])
('6008204', ['1999', '14606', '1998', '"US"', '"CA"', '749584', '2', '', '514', '3', '31', '121', '0', '1', '', '0.7415', '', '5', '0.0085', '0.0083', '', '', 80])
('5952345', ['1999', '14501', '1997', '"US"', '"CA"', '749584', '2', '', '514', '3', '31', '118', '0', '1', '', '0.7442', '', '5.1102', '0', '0', '', '', 78])
('5999972', ['1999', '14585', '1996', '"US"', '"CA"', '551495', '2', '', '709', '2', '22', '352', '0', '1', '', '0.8714', '', '4.0398', '0.0117', '0.0114', '', '', 77])
('5987245', ['1999', '14564', '1996', '"US"', '"CA"', '551495', '2', '', '709', '2', '22', '341', '0', '1', '', '0.8737', '', '4.0587', '0.0121', '0.0117', '', '', 76])
('5998655', ['1999', '14585', '1998', '"US"', '"CA"', '', '1', '', '560', '1', '14', '114', '0', '1', '', '0.7387', '', '5.1667', '', '', '', '', 76])
('5958954', ['1999', '