# CSCI 4253 / 5253 - Lab #4 - Patent Problem with Spark RDD - SOLUTION
<div>
 <h2> CSCI 4283 / 5253 
  <IMG SRC="https://www.colorado.edu/cs/profiles/express/themes/cuspirit/logo.png" WIDTH=50 ALIGN="right"/> </h2>
</div>

This [Spark cheatsheet](https://s3.amazonaws.com/assets.datacamp.com/blog_assets/PySpark_SQL_Cheat_Sheet_Python.pdf) is useful

In [1]:
from pyspark import SparkContext, SparkConf
import numpy as np
import operator

In [2]:
conf=SparkConf().setAppName("Lab4-rdd").setMaster("local[*]")
sc = SparkContext(conf=conf)

Using PySpark and RDD's on the https://coding.csel.io machines is slow -- most of the code is executed in Python and this is much less efficient than the java-based code using the PySpark dataframes. Be patient and trying using `.cache()` to cache the output of joins. You may want to start with a reduced set of data before running the full task. You can use the `sample()` method to extract just a sample of the data or use 

These two RDD's are called "rawCitations" and "rawPatents" because you probably want to process them futher (e.g. convert them to integer types, etc). 

The `textFile` function returns data in strings. This should work fine for this lab.

Other methods you use might return data in type `Byte`. If you haven't used Python `Byte` types before, google it. You can convert a value of `x` type byte into e.g. a UTF8 string using `x.decode('uft-8')`. Alternatively, you can use the `open` method of the gzip library to read in all the lines as UTF-8 strings like this:
```
import gzip
with gzip.open('cite75_99.txt.gz', 'rt',encoding='utf-8') as f:
    rddCitations = sc.parallelize( f.readlines() )
```
This is less efficient than using `textFile` because `textFile` would use the underlying HDFS or other file system to read the file across all the worker nodes while the using `gzip.open()...readlines()` will read all the data in the frontend and then distribute it to all the worker nodes.

In [3]:
rddCitations = sc.textFile("cite75_99.txt.gz")
rddPatents = sc.textFile("apat63_99.txt.gz")



Generated a newrddCitations, which is basically the same as rddCitations, except that it does not contain the header. The header is removed, so that it does not cause any problems in the future operations when a column is coverted into an integer.

In [4]:
from itertools import islice

newrddCitations = rddCitations.mapPartitionsWithIndex(
    lambda idx, it: islice(it, 1, None) if idx == 0 else it 
).cache()
newrddCitations.take(5)

['3858241,956203',
 '3858241,1324234',
 '3858241,3398406',
 '3858241,3557384',
 '3858241,3634889']

Generated a newrddPatents, which is basically the same as rddPatents, except that it does not contain the header. The header is removed, so that it does not cause any problems in the future operations when a column is coverted into an integer.

In [5]:
newrddPatents = rddPatents.mapPartitionsWithIndex(
    lambda idx, it: islice(it, 1, None) if idx == 0 else it 
).cache()
newrddPatents.take(5)

['3070801,1963,1096,,"BE","",,1,,269,6,69,,1,,0,,,,,,,',
 '3070802,1963,1096,,"US","TX",,1,,2,6,63,,0,,,,,,,,,',
 '3070803,1963,1096,,"US","IL",,1,,2,6,63,,9,,0.3704,,,,,,,',
 '3070804,1963,1096,,"US","OH",,1,,2,6,63,,3,,0.6667,,,,,,,',
 '3070805,1963,1096,,"US","CA",,1,,2,6,63,,1,,0,,,,,,,']

The newrddCitations and newrddPatents strings are split based on "," and they are also converted into integer for future operations.

In [8]:
integerCitations = newrddCitations.map(lambda x: x.split(',')).\
    map(lambda y: (int(y[0]),int(y[1]))).cache()
integerCitations.take(5)

[(3858241, 956203),
 (3858241, 1324234),
 (3858241, 3398406),
 (3858241, 3557384),
 (3858241, 3634889)]

In [9]:
integerPatents = newrddPatents.map(lambda x: x.split(',')).\
    map(lambda y: (int(y[0]),y[5])).cache()
integerPatents.take(5)

[(3070801, '""'),
 (3070802, '"TX"'),
 (3070803, '"IL"'),
 (3070804, '"OH"'),
 (3070805, '"CA"')]

An inner join is performed between the above two, to get the data in the following format - CITING, CITED , CITING_STATE

In [10]:
joinOne = integerCitations.join(integerPatents).cache() 
joinOne.take(5)

[(3858474, (3757618, '"OH"')),
 (3858474, (3768101, '"OH"')),
 (3858862, (197405, '"MA"')),
 (3858862, (1299890, '"MA"')),
 (3858862, (2059498, '"MA"'))]

Made a left outer join of the above data along with the integerPatents; to get the data in the following format - CITED, CITING, CITED_STATE and CITING_STATE. This represents outputOne.

In [12]:
outputOne = joinOne.leftOuterJoin(integerPatents).cache()
outputOne.take(5)

[(1053093, ((4442770, '"OH"'), None)),
 (1053093, ((5348319, '"SC"'), None)),
 (3787839, ((5853846, '"MN"'), '"MN"')),
 (3787839, ((5917326, '"MN"'), '"MN"')),
 (3787839, ((5764060, '"MN"'), '"MN"'))]

The data is re-ordered from the above output to get data in the following format - CITING, CITED , CITED_STATE and CITING_STATE.

In [13]:
outputOneFinal = outputOne.map(lambda y: (y[1][0][0],(y[0],y[1][0][1],y[1][1]))).cache()
outputOneFinal.take(5)

[(4442770, (1053093, '"OH"', None)),
 (5348319, (1053093, '"SC"', None)),
 (5853846, (3787839, '"MN"', '"MN"')),
 (5917326, (3787839, '"MN"', '"MN"')),
 (5764060, (3787839, '"MN"', '"MN"'))]

The above table is flattened out using coalesce() function. This makes it easier to operate on later.

In [14]:
flattenOutputOne = outputOneFinal.map(lambda y: (y[0],y[1][0],y[1][1],y[1][2])).coalesce(1).cache()
flattenOutputOne.take(5)

[(4442770, 1053093, '"OH"', None),
 (5348319, 1053093, '"SC"', None),
 (5853846, 3787839, '"MN"', '"MN"'),
 (5917326, 3787839, '"MN"', '"MN"'),
 (5764060, 3787839, '"MN"', '"MN"')]

Below Function is used to calculate the same state citation count. This is done, as the count cannot be obtained using a simple lambda function.

In [15]:
def SameStateCount(y):
    Count = 0
    # Check for the below 3 conditions, and then only increment the count
    if y[2]!='""' and y[3]!='""' and y[2] == y[3]:
        Count += 1
    # Return the citation number along with the count
    return(y[0],Count)

Apply the above defined function to flattenOutputOne, to get the same state citation count

In [16]:
sameStateCount = flattenOutputOne.map(SameStateCount).cache()
sameStateCount.take(5)

[(4442770, 0), (5348319, 0), (5853846, 1), (5917326, 1), (5764060, 1)]

outputTwo is obtained by grouping together all the above values based on the Key value.

In [18]:
outputTwo = sameStateCount.groupByKey().cache()
outputTwo.take(5)

[(5158150, <pyspark.resultiterable.ResultIterable at 0x7f9d995d13a0>),
 (4912837, <pyspark.resultiterable.ResultIterable at 0x7f9d995d1a90>),
 (4171616, <pyspark.resultiterable.ResultIterable at 0x7f9d995d1bb0>),
 (5764946, <pyspark.resultiterable.ResultIterable at 0x7f9d995d1df0>),
 (5710752, <pyspark.resultiterable.ResultIterable at 0x7f9d995dfdf0>)]

The values in the above table are summed up and a single value which represents the total same state citations, for a given key is generated.

In [19]:
requiredCount = outputTwo.mapValues(sum).map(lambda y :(y[1],y[0])).sortByKey(False).cache()
requiredCount.take(5)

[(125, 5959466), (103, 5983822), (100, 6008204), (98, 5952345), (96, 5998655)]

The ordering of the above table is changed, to make the citation number as the Key, and the count of same state citation as the value.

In [20]:
requiredCountModified = requiredCount.map(lambda y: (int(y[1]),int(y[0]))).cache()
requiredCountModified.take(5)

[(5959466, 125), (5983822, 103), (6008204, 100), (5952345, 98), (5998655, 96)]

The original newrddPatents is modified to split based on ',' and also to make the citation number as the Key and the rest of the data as the Value.

In [21]:
newrddPatentsModified = newrddPatents.map(lambda x: x.split(',')).map(lambda y: (int(y[0]),y[1:22])).cache()
newrddPatentsModified.take(5)

[(3070801,
  ['1963',
   '1096',
   '',
   '"BE"',
   '""',
   '',
   '1',
   '',
   '269',
   '6',
   '69',
   '',
   '1',
   '',
   '0',
   '',
   '',
   '',
   '',
   '',
   '']),
 (3070802,
  ['1963',
   '1096',
   '',
   '"US"',
   '"TX"',
   '',
   '1',
   '',
   '2',
   '6',
   '63',
   '',
   '0',
   '',
   '',
   '',
   '',
   '',
   '',
   '',
   '']),
 (3070803,
  ['1963',
   '1096',
   '',
   '"US"',
   '"IL"',
   '',
   '1',
   '',
   '2',
   '6',
   '63',
   '',
   '9',
   '',
   '0.3704',
   '',
   '',
   '',
   '',
   '',
   '']),
 (3070804,
  ['1963',
   '1096',
   '',
   '"US"',
   '"OH"',
   '',
   '1',
   '',
   '2',
   '6',
   '63',
   '',
   '3',
   '',
   '0.6667',
   '',
   '',
   '',
   '',
   '',
   '']),
 (3070805,
  ['1963',
   '1096',
   '',
   '"US"',
   '"CA"',
   '',
   '1',
   '',
   '2',
   '6',
   '63',
   '',
   '1',
   '',
   '0',
   '',
   '',
   '',
   '',
   '',
   ''])]

outputThree is obtained by doing a left outer join of the above table with the requiredCountModified table.

In [22]:
outputThree = requiredCountModified.leftOuterJoin(newrddPatentsModified).cache()
outputThree.take(5)

[(5276906,
  (9,
   ['1994',
    '12422',
    '1993',
    '"US"',
    '"IL"',
    '386735',
    '2',
    '14',
    '455',
    '2',
    '21',
    '15',
    '16',
    '1',
    '0.3047',
    '0.1244',
    '1.5625',
    '10.6667',
    '0.5714',
    '0.5333',
    '0.3125'])),
 (5983548,
  (5,
   ['1999',
    '14564',
    '1998',
    '"US"',
    '"CA"',
    '760952',
    '2',
    '',
    '42',
    '5',
    '59',
    '23',
    '0',
    '0.8696',
    '',
    '0.6',
    '',
    '24.1304',
    '0',
    '0',
    ''])),
 (5230956,
  (5,
   ['1993',
    '12261',
    '1990',
    '"US"',
    '"IL"',
    '27895',
    '2',
    '9',
    '428',
    '6',
    '69',
    '8',
    '14',
    '1',
    '0.7245',
    '0.7188',
    '4.8571',
    '9',
    '0.625',
    '0.625',
    '0'])),
 (5695412,
  (4,
   ['1997',
    '13857',
    '1996',
    '"US"',
    '"TX"',
    '',
    '4',
    '7',
    '473',
    '6',
    '62',
    '22',
    '0',
    '0.9091',
    '',
    '0',
    '',
    '18.0909',
    '',
    '',
    '']

The same state citation count is made to be the Key, so that it becomes easy to sort, and also it is converted to an integer. The rest of the data becomes the value.

In [50]:
outputThreeModified = outputThree.map(lambda y : (int(y[1][0]),y[0],y[1][1])).coalesce(1).cache()
outputThreeModified.take(5)

[(35,
  5374608,
  ['1994',
   '12772',
   '1992',
   '"US"',
   '"CA"',
   '595450',
   '2',
   '34',
   '504',
   '1',
   '11',
   '61',
   '0',
   '0.8852',
   '',
   '0.9005',
   '',
   '14.8361',
   '0.6226',
   '0.541',
   '']),
 (24,
  5962118,
  ['1999',
   '14522',
   '1997',
   '"US"',
   '"PA"',
   '',
   '1',
   '',
   '428',
   '6',
   '69',
   '53',
   '0',
   '0.9811',
   '',
   '0.7581',
   '',
   '13.9057',
   '',
   '',
   '']),
 (21,
  5365004,
  ['1994',
   '12737',
   '1993',
   '"US"',
   '"NJ"',
   '381140',
   '2',
   '31',
   '585',
   '1',
   '19',
   '29',
   '8',
   '1',
   '0.375',
   '0.6659',
   '1.625',
   '12.6552',
   '0.75',
   '0.7241',
   '1']),
 (16,
  5816024,
  ['1998',
   '14158',
   '1996',
   '"US"',
   '"IL"',
   '746193',
   '2',
   '17',
   '53',
   '6',
   '68',
   '94',
   '1',
   '0.7766',
   '0',
   '0.5123',
   '1',
   '26.1383',
   '0',
   '0',
   '1']),
 (12,
  5728176,
  ['1998',
   '13955',
   '1995',
   '"US"',
   '"CA"',
   '2004

The final output is obtained by sorting based on Key values and then presenting the data in the required format. The top 10 values are printed below as required.

In [58]:
finalOutput=outputThreeModified.sortByKey(False).map(lambda x: (x[1],x[2],x[0])).cache()
finalOutput.take(10)

[(5959466,
  ['1999',
   '14515',
   '1997',
   '"US"',
   '"CA"',
   '5310',
   '2',
   '',
   '326',
   '4',
   '46',
   '159',
   '0',
   '1',
   '',
   '0.6186',
   '',
   '4.8868',
   '0.0455',
   '0.044',
   ''],
  125),
 (5983822,
  ['1999',
   '14564',
   '1998',
   '"US"',
   '"TX"',
   '569900',
   '2',
   '',
   '114',
   '5',
   '55',
   '200',
   '0',
   '0.995',
   '',
   '0.7201',
   '',
   '12.45',
   '0',
   '0',
   ''],
  103),
 (6008204,
  ['1999',
   '14606',
   '1998',
   '"US"',
   '"CA"',
   '749584',
   '2',
   '',
   '514',
   '3',
   '31',
   '121',
   '0',
   '1',
   '',
   '0.7415',
   '',
   '5',
   '0.0085',
   '0.0083',
   ''],
  100),
 (5952345,
  ['1999',
   '14501',
   '1997',
   '"US"',
   '"CA"',
   '749584',
   '2',
   '',
   '514',
   '3',
   '31',
   '118',
   '0',
   '1',
   '',
   '0.7442',
   '',
   '5.1102',
   '0',
   '0',
   ''],
  98),
 (5958954,
  ['1999',
   '14515',
   '1997',
   '"US"',
   '"CA"',
   '749584',
   '2',
   '',
   '514',
 