# CSCI 4253 / 5253 - Lab #4 - Patent Problem with Spark RDD
<div>
 <h2> CSCI 4283 / 5253 - Vandana Sridhar
  <IMG SRC="https://www.colorado.edu/cs/profiles/express/themes/cuspirit/logo.png" WIDTH=50 ALIGN="right"/> </h2>
</div>

This [Spark cheatsheet](https://s3.amazonaws.com/assets.datacamp.com/blog_assets/PySpark_SQL_Cheat_Sheet_Python.pdf) is useful

In [3]:
from pyspark import SparkContext, SparkConf
import numpy as np
import operator
from pyspark.sql import SparkSession
from pyspark.sql.types import Row

In [4]:
conf=SparkConf().setAppName("Lab4-rddd").setMaster("local[*]")
sc = SparkContext(conf=conf)
spark = SparkSession(sc)

Using PySpark and RDD's on the https://coding.csel.io machines is very slow -- most of the code is executed in Python and this is much less efficient than the java-based code using the PySpark dataframes. Be patient and trying using `.cache()` to cache the output of joins. You may want to start with a reduced set of data before running the full task.

To that end, we've included code to just extract the last 200,000 lines of each file below using the Python "slice" notation. Using that subset of the data your "new patent" table should look like:

![Top partial 10 RDD self-state citations](top-subsample-rdd.png)

When you're ready to run the whole thing, just include all the data and run it again (...and wait...).

These two RDD's are called "rawCitations" and "rawPatents" because you probably want to process them futher (e.g. convert them to integer types, etc). If you haven't used Python "byte" types before, google it. You can convert a byte variable `x` into e.g. a UTF8 string using `x.decode('uft-8')`.

> Taken the last 800,000 lines from both the citation and patent file

In [5]:
import gzip
with gzip.open('cite75_99.txt.gz', 'r') as f:
    rddCitations = sc.parallelize( f.readlines()[-800000:] )

In [6]:
with gzip.open('apat63_99.txt.gz', 'r') as f:
    rddPatents = sc.parallelize( f.readlines()[-800000:] )

> Both the raw citation and patent files are decoded to the utf-8 format and split based on commas. Each comma separated value is converted into an integer. However for the patent file, the patent state is left as a string. I've written lambda functions for decoding,splitting and for converting them to integers.

In [7]:
c_rdd = rddCitations.map(lambda x: x.decode('utf-8').split(',')).\
    map(lambda y: (int(y[0]),int(y[1])))

In [8]:
p_rdd = rddPatents.map(lambda x: x.decode('utf-8').split(',')).\
    map(lambda y: (int(y[0]),y[5]))

> Now I have performed an inner join between the transformed citations and patents file to obtain the citing number, cited number and citing state

In [9]:
citing_rdd = c_rdd.join(p_rdd)  # citing , cited , citing state

> To obtain the cited state, I have interchanged the columns of citing number with the cited number. This makes the cited number the key

In [10]:
cited = citing_rdd.map(lambda y : (y[1][0],(y[0],y[1][1]))) # cited, citing, citing state

> Made a left outer join with the cited number as the key alongside the patent table, to get the cited state. Thus this results in the intermediate table of cited number, citing number, cited and citing state

In [11]:
intermediate_table = cited.leftOuterJoin(p_rdd)

In [12]:
#intermediate_table.take(30)

> Modified the intermediate table by making the citing number the key. ie - repositioning the fields

In [13]:
final_intermediate = intermediate_table.map(lambda y: (y[1][0][0],(y[0],y[1][0][1],y[1][1])))

> Now the above final_intermediate RDD is a key- value based RDD with 3 partitions. To reduce the number of partitions to just 1, I've converted the rdd using coalesce function and I've provided the partition value as 1. The lambda function also maps the RDD based on the schema specified. Hence convert_list is the new RDD

In [14]:
convert_list = final_intermediate.map(lambda y: (y[0],y[1][0],y[1][1],y[1][2])).coalesce(1)

> To calculate the number of same state citations, I've written a function that checks if the cited state equals the citing state and the count gets incremented. The functions finally returns the citing number and the count

In [15]:
def same_state(y):
    count = 0
    if y[2] == y[3]:
        count = count + 1
    return(y[0],count)

In [16]:
calculate_count = convert_list.map(same_state)

In [17]:
#calculate_count.take(10)

> I've reconverted the list of elements into a key value pair

In [18]:
convert_new = calculate_count.map(lambda y: (y[0],(y[1])))

> The key is citing number and the groupByKey() function is used. This groups the values of a specific key( citing number). The group by key results in the key along with the iterable object. The iterable object contains all the values of a specific key.

In [19]:
convert_new2 = convert_new.groupByKey()

> I've used the mapValues() function to obtain the values from the iterable object and the sum function is used to calculate the sum of the values for a given key. This is mapped as count, citing number and I've sorted the values based on the descending order of count.

In [20]:
final_counts = convert_new2.mapValues(sum).map(lambda y :(y[1],y[0])).sortByKey(False)

> Counts are displayed

In [28]:
final_counts.take(10)

[(94, 5959466),
 (80, 6008204),
 (78, 5952345),
 (77, 5999972),
 (76, 5987245),
 (76, 5958954),
 (76, 5998655),
 (73, 5951547),
 (73, 5980517),
 (65, 5998471)]

The counts are joined with the patent file. To do this, I've processed the raw patent file again, but this time I've taken the patent value as the key and the rest of the columns as its value. I've made a left outer join between the counts and the patent file

In [22]:
# to join with patent:
modified_finalcount = final_counts.map(lambda y: (y[1],y[0]))

In [23]:
# to append with patent, process patent again
p_rdd_modified = rddPatents.map(lambda x: x.decode('utf-8').split(',')).\
    map(lambda y: (int(y[0]),y[1:22]))

In [34]:
# join count with patent
final_output = modified_finalcount.leftOuterJoin(p_rdd_modified)

> The top 10 same state citations are displayed in descending order. I used the map function to reposition the values and sort them based on count

In [35]:
final = final_output.map(lambda y : (y[0],y[1][1],y[1][0])).sortBy(lambda y: y[1][0])

In [38]:
final.take(10)

[(5952345,
  ['1999',
   '14501',
   '1997',
   '"US"',
   '"CA"',
   '749584',
   '2',
   '',
   '514',
   '3',
   '31',
   '118',
   '0',
   '1',
   '',
   '0.7442',
   '',
   '5.1102',
   '0',
   '0',
   ''],
  78),
 (5987245,
  ['1999',
   '14564',
   '1996',
   '"US"',
   '"CA"',
   '551495',
   '2',
   '',
   '709',
   '2',
   '22',
   '341',
   '0',
   '1',
   '',
   '0.8737',
   '',
   '4.0587',
   '0.0121',
   '0.0117',
   ''],
  76),
 (5998655,
  ['1999',
   '14585',
   '1998',
   '"US"',
   '"CA"',
   '',
   '1',
   '',
   '560',
   '1',
   '14',
   '114',
   '0',
   '1',
   '',
   '0.7387',
   '',
   '5.1667',
   '',
   '',
   ''],
  76),
 (5985740,
  ['1999',
   '14564',
   '1997',
   '"JP"',
   '""',
   '504585',
   '3',
   '',
   '438',
   '4',
   '46',
   '56',
   '0',
   '1',
   '',
   '0.5045',
   '',
   '2.1429',
   '0.9464',
   '0.9464',
   ''],
  55),
 (6003285,
  ['1999',
   '14599',
   '1998',
   '"US"',
   '"IL"',
   '720823',
   '2',
   '',
   '53',
   '6',
   