# PySpark Instalation

The action and functions are:
1. Installing PySpark
2. Importing nltk for stopwords
3. importing string for punctuation
4. Storage Mount
5. Creating Spark session,Spark Context and Spart app('RAKE')
6. Reading csv data and converting to RDD
7. Identification of delimiters
8. creating function for extracting candidate phrases from text
9. creating function for calculating deg(w) and feq(w)

In [90]:
!pip install pyspark



In [91]:
#creation of Spark session and Spark context
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("RAKE").getOrCreate()
sc = spark.sparkContext

# Downloading language packages

In [92]:
#importing stopwords and punctuation from string and nltk
import string
import nltk
nltk.download('stopwords')
from nltk.corpus import stopwords
stopwords = stopwords.words('english')

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


Pouplating delimiters : Stopwords from 'nltk' and punctuations from 'string' have been treated as delimiters. In addition to delimiters declared in ntlk and string, I have added a few stopwords that are quite frequently used in the documents. I have also used the delimiter functionality to remove information that is does not qualify as candidate phrases like 
1. page numbers
2. 'et' used for citations
3. 'nan' identifiers for a filename with no relevent text

In [93]:
#declare delimiters used for extracting candidate phrases
punctuationDelimiter = ['\\'+i for i in string.punctuation]
stopwordDelimiter = ['\\b'+i+'\\b' for i in stopwords]
stopwordDelimiter.append('\\bshall\\b')
stopwordDelimiter.append('\\bpage \d*\\b')
stopwordDelimiter.append('\\bnan\\b')
stopwordDelimiter.append('\\bet\\b')
stopwordDelimiter += punctuationDelimiter

# Mounting Storage

In [94]:
#mounting storage
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [95]:
!ls /content/drive/MyDrive/Colab\ Notebooks/data
#check for data
#files are uploaded into the same folder as the jupytr file

 Anti_assignment_CIC_g3.csv
 Governing_Law.csv
'Label Report - Anti-assignment, CIC (Group 3).xlsx'
'Label Report - Governing Law.xlsx'


# Reading data from csv files

Function **toRDD** : this function focuses on extracting data from the csv files (government_law.csv and Anti_assignment_cic.csv) to RDDs that have the exact structure as the csv file. Hence if government_law.csv has two columns then the corresponding RDD will have only two columns, because Anti_assignment_cic.csv has 3 columns then so will the corresponding RDD. The header row has been removed from the RDD that is returned. Identification of data based on header is done manually using column numbers

In [96]:
import csv
#function for reading csv files
def toRDD(filename):
  csv_file = sc.textFile(filename).map(lambda line:line)
  header = csv_file.first()
  csv_file = csv_file.filter(lambda line: line != header)
  csvRDDreader = csv.reader(csv_file.collect())
  csvRDD = sc.parallelize(csvRDDreader)
  return csvRDD

In [97]:
inputGovernmentLaw = 'file:///content/drive/MyDrive/Colab\ Notebooks/data/Governing_Law.csv'
inputAntiAssignment = 'file:///content/drive/MyDrive/Colab\ Notebooks/data/Anti_assignment_CIC_g3.csv'

In [98]:
#convert government law csv to rdd
rddGovernmentLaw = toRDD(inputGovernmentLaw)
rddGovernmentLaw.take(5)

[['CybergyHoldingsInc_20140520_10-Q_EX-10.27_8605784_EX-10.27_Affiliate Agreement.pdf',
  'This Agreement is accepted by Company in the State of Nevada and shall be governed by and construed in accordance with the laws thereof, which laws shall prevail in the event of any conflict. (Page 13)'],
 ['EuromediaHoldingsCorp_20070215_10SB12G_EX-10.B(01)_525118_EX-10.B(01)_Content License Agreement.pdf',
  'This Agreement shall be governed by laws of the Province of Ontario and the federal laws of Canada applicable therein. (Page 9)  This Agreement is subject to all laws, regulations, license conditions and decisions of the Canadian Radio-television and Telecommunications Commission (‚ÄúCRTC‚Äù) municipal, provincial and federal governments or other authorities which are applicable to Rogers and/or Licensor, and which are now in force or hereafter adopted (‚ÄúApplicable Law‚Äù). (Page 11)'],
 ['FulucaiProductionsLtd_20131223_10-Q_EX-10.9_8368347_EX-10.9_Content License Agreement.pdf',
  'All 

In [99]:
#convert Anti Assignment cic csv to rdd 
rddAntiAssignment = toRDD(inputAntiAssignment)
rddAntiAssignment.take(3)

[['CybergyHoldingsInc_20140520_10-Q_EX-10.27_8605784_EX-10.27_Affiliate Agreement.pdf',
  '',
  'MA may not assign, sell, lease or otherwise transfer in whole or in party any of the rights granted pursuant to this Agreement without prior written approval of Company. (Page 12)'],
 ['EuromediaHoldingsCorp_20070215_10SB12G_EX-10.B(01)_525118_EX-10.B(01)_Content License Agreement.pdf',
  '',
  'This Agreement may not be assigned, sold or transferred without the prior written consent of the other party. (Page 9)  Notwithstanding the foregoing, Rogers may, without consent, assign its rights and obligations under this Agreement in whole or in part to: (i) a person that directly or indirectly controls, is controlled by or is under common control with Rogers; or (ii) a purchaser of all or substantially all of the assets used in connection with the ROD Service. A change of control of Rogers shall not be considered an assignment of this Agreement. (Page 9)  Any purported assignment, sale, or tran

In [100]:
#extract RDD for Anti Assignment Corpus
aaRDD = rddAntiAssignment.map(lambda x:(x[0],x[2]))
aaRDD.take(3)

[('CybergyHoldingsInc_20140520_10-Q_EX-10.27_8605784_EX-10.27_Affiliate Agreement.pdf',
  'MA may not assign, sell, lease or otherwise transfer in whole or in party any of the rights granted pursuant to this Agreement without prior written approval of Company. (Page 12)'),
 ('EuromediaHoldingsCorp_20070215_10SB12G_EX-10.B(01)_525118_EX-10.B(01)_Content License Agreement.pdf',
  'This Agreement may not be assigned, sold or transferred without the prior written consent of the other party. (Page 9)  Notwithstanding the foregoing, Rogers may, without consent, assign its rights and obligations under this Agreement in whole or in part to: (i) a person that directly or indirectly controls, is controlled by or is under common control with Rogers; or (ii) a purchaser of all or substantially all of the assets used in connection with the ROD Service. A change of control of Rogers shall not be considered an assignment of this Agreement. (Page 9)  Any purported assignment, sale, or transfer in cont

In [101]:
#extract RDD for change of control Corpus
ccRDD = rddAntiAssignment.map(lambda x:(x[0],x[1]))
ccRDD.take(3)

[('CybergyHoldingsInc_20140520_10-Q_EX-10.27_8605784_EX-10.27_Affiliate Agreement.pdf',
  ''),
 ('EuromediaHoldingsCorp_20070215_10SB12G_EX-10.B(01)_525118_EX-10.B(01)_Content License Agreement.pdf',
  ''),
 ('GopageCorp_20140221_10-K_EX-10.1_8432966_EX-10.1_Content License Agreement.pdf',
  "For purposes of the preceding sentence, and without limiting its generality, any merger, consolidation or reorganization involving Licensee (regardless of whether Licensee is a surviving or disappearing entity) will be deemed to be a transfer of rights, obligations or performance under this Agreement for which Licensor's prior written consent is required. (Page 15)")]

# Common functions used for both workloads

Function **splitPhrase**: This function succesfully extracts and returns candidate phrases for input text.It returns an array of phrases after filtering out the following return values that may tamper with final results: 
1. phrases with word count more than 4
2. null strings
3. excess padding

In [102]:
from re import split,sub
#function for extracting candidate phrases
def splitPhrase(string):
    pattern = r'|'.join(stopwordDelimiter)
    return list(filter(lambda x: x and len(x.split()) < 5,[sub('\s+',' ',s.strip()) for s in split(pattern, string)]))

function **keywordMap** : this function created to calculate the degree and frequency of every word that has occured in any of the candidate phrases. It takes the candidate phrase as input and returns a list of words with the value the the phrase adds to the words degree(w) and frequency(w). Each element of the array returned is in the format (word,(degree(w),frequency(w))) and the length of the array contains one element for every word that used in the candidate phrase. when reduce is applied on the resultant array then we acheive the final score of each word that can be used to calculate the score of the phrases.

In [103]:
#function for generating degree and frequency of each word in candidate phrases
def keywordMap(phrase):
  words = phrase.split(" ")
  if len(words) == 1:
    return [(w,(1,1)) for w in words if w!='']
  else:
    return [(w,(len(words),1)) for w in words if w!='']

# Workload One - Candidate Phrase Identification

Three code blocks below, 
1. filters out all null and empty values in the RDD
2. then converts all test to lower case
3. then split the text into array of candidate phrases using flat map

In [104]:
#function 1
#extracting candidate phrases for Government law corpus
govLawDocList = rddGovernmentLaw.filter(lambda x:x[1])\
                            .map(lambda x:(x[0],x[1].lower()))\
                            .flatMapValues(splitPhrase)
govLawDocList.take(10)

[('CybergyHoldingsInc_20140520_10-Q_EX-10.27_8605784_EX-10.27_Affiliate Agreement.pdf',
  'agreement'),
 ('CybergyHoldingsInc_20140520_10-Q_EX-10.27_8605784_EX-10.27_Affiliate Agreement.pdf',
  'accepted'),
 ('CybergyHoldingsInc_20140520_10-Q_EX-10.27_8605784_EX-10.27_Affiliate Agreement.pdf',
  'company'),
 ('CybergyHoldingsInc_20140520_10-Q_EX-10.27_8605784_EX-10.27_Affiliate Agreement.pdf',
  'state'),
 ('CybergyHoldingsInc_20140520_10-Q_EX-10.27_8605784_EX-10.27_Affiliate Agreement.pdf',
  'nevada'),
 ('CybergyHoldingsInc_20140520_10-Q_EX-10.27_8605784_EX-10.27_Affiliate Agreement.pdf',
  'governed'),
 ('CybergyHoldingsInc_20140520_10-Q_EX-10.27_8605784_EX-10.27_Affiliate Agreement.pdf',
  'construed'),
 ('CybergyHoldingsInc_20140520_10-Q_EX-10.27_8605784_EX-10.27_Affiliate Agreement.pdf',
  'accordance'),
 ('CybergyHoldingsInc_20140520_10-Q_EX-10.27_8605784_EX-10.27_Affiliate Agreement.pdf',
  'laws thereof'),
 ('CybergyHoldingsInc_20140520_10-Q_EX-10.27_8605784_EX-10.27_Affiliate

In [105]:
#extracting candidate phrases for Anti assignment corpus
aaDocList = aaRDD.filter(lambda x:x[1])\
                      .map(lambda x:(x[0],x[1].lower()))\
                      .flatMapValues(splitPhrase)
aaDocList.take(10)

[('CybergyHoldingsInc_20140520_10-Q_EX-10.27_8605784_EX-10.27_Affiliate Agreement.pdf',
  'may'),
 ('CybergyHoldingsInc_20140520_10-Q_EX-10.27_8605784_EX-10.27_Affiliate Agreement.pdf',
  'assign'),
 ('CybergyHoldingsInc_20140520_10-Q_EX-10.27_8605784_EX-10.27_Affiliate Agreement.pdf',
  'sell'),
 ('CybergyHoldingsInc_20140520_10-Q_EX-10.27_8605784_EX-10.27_Affiliate Agreement.pdf',
  'lease'),
 ('CybergyHoldingsInc_20140520_10-Q_EX-10.27_8605784_EX-10.27_Affiliate Agreement.pdf',
  'otherwise transfer'),
 ('CybergyHoldingsInc_20140520_10-Q_EX-10.27_8605784_EX-10.27_Affiliate Agreement.pdf',
  'whole'),
 ('CybergyHoldingsInc_20140520_10-Q_EX-10.27_8605784_EX-10.27_Affiliate Agreement.pdf',
  'party'),
 ('CybergyHoldingsInc_20140520_10-Q_EX-10.27_8605784_EX-10.27_Affiliate Agreement.pdf',
  'rights granted pursuant'),
 ('CybergyHoldingsInc_20140520_10-Q_EX-10.27_8605784_EX-10.27_Affiliate Agreement.pdf',
  'company'),
 ('EuromediaHoldingsCorp_20070215_10SB12G_EX-10.B(01)_525118_EX-10.B(

In [106]:
#extracting candidate phrases for Change of control corpus
ccDocList = ccRDD.filter(lambda x:x[1])\
                    .map(lambda x:(x[0],x[1].lower()))\
                    .flatMapValues(splitPhrase)
ccDocList.take(10)

[('GopageCorp_20140221_10-K_EX-10.1_8432966_EX-10.1_Content License Agreement.pdf',
  'purposes'),
 ('GopageCorp_20140221_10-K_EX-10.1_8432966_EX-10.1_Content License Agreement.pdf',
  'preceding sentence'),
 ('GopageCorp_20140221_10-K_EX-10.1_8432966_EX-10.1_Content License Agreement.pdf',
  'without limiting'),
 ('GopageCorp_20140221_10-K_EX-10.1_8432966_EX-10.1_Content License Agreement.pdf',
  'generality'),
 ('GopageCorp_20140221_10-K_EX-10.1_8432966_EX-10.1_Content License Agreement.pdf',
  'merger'),
 ('GopageCorp_20140221_10-K_EX-10.1_8432966_EX-10.1_Content License Agreement.pdf',
  'consolidation'),
 ('GopageCorp_20140221_10-K_EX-10.1_8432966_EX-10.1_Content License Agreement.pdf',
  'reorganization involving licensee'),
 ('GopageCorp_20140221_10-K_EX-10.1_8432966_EX-10.1_Content License Agreement.pdf',
  'regardless'),
 ('GopageCorp_20140221_10-K_EX-10.1_8432966_EX-10.1_Content License Agreement.pdf',
  'whether licensee'),
 ('GopageCorp_20140221_10-K_EX-10.1_8432966_EX-10.1

In [107]:
#group candidate phrases by filename
docList = govLawDocList.groupByKey()
docList.take(3)

[('FulucaiProductionsLtd_20131223_10-Q_EX-10.9_8368347_EX-10.9_Content License Agreement.pdf',
  <pyspark.resultiterable.ResultIterable at 0x7ff6bd744fd0>),
 ('IdeanomicsInc_20160330_10-K_EX-10.26_9512211_EX-10.26_Content License Agreement.pdf',
  <pyspark.resultiterable.ResultIterable at 0x7ff6bd7c6290>),
 ('DeltathreeInc_19991102_S-1A_EX-10.19_6227850_EX-10.19_Co-Branding Agreement_ Service Agreement.pdf',
  <pyspark.resultiterable.ResultIterable at 0x7ff6bec23a90>)]

In [108]:
#group candidate phrases by filename
docListAa = aaDocList.groupByKey()
docListAa.take(3)

[('IdeanomicsInc_20160330_10-K_EX-10.26_9512211_EX-10.26_Content License Agreement.pdf',
  <pyspark.resultiterable.ResultIterable at 0x7ff6bd716e10>),
 ('DeltathreeInc_19991102_S-1A_EX-10.19_6227850_EX-10.19_Co-Branding Agreement_ Service Agreement.pdf',
  <pyspark.resultiterable.ResultIterable at 0x7ff6bd71ecd0>),
 ('ConformisInc_20191101_10-Q_EX-10.6_11861402_EX-10.6_Development Agreement.pdf',
  <pyspark.resultiterable.ResultIterable at 0x7ff6bec29cd0>)]

In [109]:
#group candidate phrases by filename
docListCc = ccDocList.groupByKey()
docListCc.take(3)

[('DeltathreeInc_19991102_S-1A_EX-10.19_6227850_EX-10.19_Co-Branding Agreement_ Service Agreement.pdf',
  <pyspark.resultiterable.ResultIterable at 0x7ff6bd764290>),
 ('ConformisInc_20191101_10-Q_EX-10.6_11861402_EX-10.6_Development Agreement.pdf',
  <pyspark.resultiterable.ResultIterable at 0x7ff6bf107110>),
 ('EtonPharmaceuticalsInc_20191114_10-Q_EX-10.1_11893941_EX-10.1_Development Agreement.pdf',
  <pyspark.resultiterable.ResultIterable at 0x7ff6bec09d10>)]

# Workload One - Word Score Calculation

The 3 code blocks below,
1. calculate deg(w) and feq(w) for all words used in candidate phrases
2. rearrange elements of tuple from (file,(word,(deg,feq))) to ((file,word),(deg,feq))
3. use reduce function to calculate final deg(w) and feq(w) for all words for each file
4. rearrange elements of tuple from ((file,word),(deg,feq)) to (file,(word,(deg,feq)))
5. calculate deg(w)/feq(w) for all words for each file name
6. group based on filename

In [110]:
#group word score by filename
keywords = govLawDocList.flatMapValues(keywordMap)\
                  .map(lambda x:((x[0],x[1][0]),x[1][1]))\
                  .reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1]))\
                  .map(lambda x:(x[0][0],(x[0][1],x[1])))\
                  .map(lambda x:(x[0],(x[1][0],x[1][1][0]/x[1][1][0]))).groupByKey()
keywords.take(50)

[('FulucaiProductionsLtd_20131223_10-Q_EX-10.9_8368347_EX-10.9_Content License Agreement.pdf',
  <pyspark.resultiterable.ResultIterable at 0x7ff6bec0db90>),
 ('IdeanomicsInc_20160330_10-K_EX-10.26_9512211_EX-10.26_Content License Agreement.pdf',
  <pyspark.resultiterable.ResultIterable at 0x7ff6bf105e50>),
 ('DeltathreeInc_19991102_S-1A_EX-10.19_6227850_EX-10.19_Co-Branding Agreement_ Service Agreement.pdf',
  <pyspark.resultiterable.ResultIterable at 0x7ff6bf1073d0>),
 ('ConformisInc_20191101_10-Q_EX-10.6_11861402_EX-10.6_Development Agreement.pdf',
  <pyspark.resultiterable.ResultIterable at 0x7ff6bd7546d0>),
 ('EtonPharmaceuticalsInc_20191114_10-Q_EX-10.1_11893941_EX-10.1_Development Agreement.pdf',
  <pyspark.resultiterable.ResultIterable at 0x7ff6bec28f50>),
 ('FuelcellEnergyInc_20191106_8-K_EX-10.1_11868007_EX-10.1_Development Agreement.pdf',
  <pyspark.resultiterable.ResultIterable at 0x7ff6bec14050>),
 ('ImineCorp_20180725_S-1_EX-10.5_11275970_EX-10.5_Distributor Agreement.pdf'

In [111]:
#group word score by filename
AAkeywords = aaDocList.flatMapValues(keywordMap)\
                  .map(lambda x:((x[0],x[1][0]),x[1][1]))\
                  .reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1]))\
                  .map(lambda x:(x[0][0],(x[0][1],x[1])))\
                  .map(lambda x:(x[0],(x[1][0],x[1][1][0]/x[1][1][0]))).groupByKey()
AAkeywords.take(5)

[('IdeanomicsInc_20160330_10-K_EX-10.26_9512211_EX-10.26_Content License Agreement.pdf',
  <pyspark.resultiterable.ResultIterable at 0x7ff6bec07850>),
 ('DeltathreeInc_19991102_S-1A_EX-10.19_6227850_EX-10.19_Co-Branding Agreement_ Service Agreement.pdf',
  <pyspark.resultiterable.ResultIterable at 0x7ff6bd716b90>),
 ('ConformisInc_20191101_10-Q_EX-10.6_11861402_EX-10.6_Development Agreement.pdf',
  <pyspark.resultiterable.ResultIterable at 0x7ff6bd4cc090>),
 ('EtonPharmaceuticalsInc_20191114_10-Q_EX-10.1_11893941_EX-10.1_Development Agreement.pdf',
  <pyspark.resultiterable.ResultIterable at 0x7ff6bf107f10>),
 ('FuelcellEnergyInc_20191106_8-K_EX-10.1_11868007_EX-10.1_Development Agreement.pdf',
  <pyspark.resultiterable.ResultIterable at 0x7ff6bd4c9c50>)]

In [112]:
#group word score by filename
CCkeywords = ccDocList.flatMapValues(keywordMap)\
                  .map(lambda x:((x[0],x[1][0]),x[1][1]))\
                  .reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1]))\
                  .map(lambda x:(x[0][0],(x[0][1],x[1])))\
                  .map(lambda x:(x[0],(x[1][0],x[1][1][0]/x[1][1][0]))).groupByKey()
CCkeywords.take(5)

[('DeltathreeInc_19991102_S-1A_EX-10.19_6227850_EX-10.19_Co-Branding Agreement_ Service Agreement.pdf',
  <pyspark.resultiterable.ResultIterable at 0x7ff6bd4eaf50>),
 ('ConformisInc_20191101_10-Q_EX-10.6_11861402_EX-10.6_Development Agreement.pdf',
  <pyspark.resultiterable.ResultIterable at 0x7ff6bec25b10>),
 ('EtonPharmaceuticalsInc_20191114_10-Q_EX-10.1_11893941_EX-10.1_Development Agreement.pdf',
  <pyspark.resultiterable.ResultIterable at 0x7ff6bd4ec350>),
 ('FuelcellEnergyInc_20191106_8-K_EX-10.1_11868007_EX-10.1_Development Agreement.pdf',
  <pyspark.resultiterable.ResultIterable at 0x7ff6bd4f6090>),
 ('WaterNowInc_20191120_10-Q_EX-10.12_11900227_EX-10.12_Distributor Agreement.pdf',
  <pyspark.resultiterable.ResultIterable at 0x7ff6bd504b10>)]

# Workload One - Candidate Phrase Score Calculation

In [113]:
#(doc,(phrase[],map[keyword,score]))
#join phrases group to keyword map group for calculating phrase score
govLawJoin = docList.join(keywords)
govLawJoin.take(5)

[('FulucaiProductionsLtd_20131223_10-Q_EX-10.9_8368347_EX-10.9_Content License Agreement.pdf',
  (<pyspark.resultiterable.ResultIterable at 0x7ff6bd4ff590>,
   <pyspark.resultiterable.ResultIterable at 0x7ff6bd4ff7d0>)),
 ('IdeanomicsInc_20160330_10-K_EX-10.26_9512211_EX-10.26_Content License Agreement.pdf',
  (<pyspark.resultiterable.ResultIterable at 0x7ff6bd4ca910>,
   <pyspark.resultiterable.ResultIterable at 0x7ff6bd4ffd10>)),
 ('DeltathreeInc_19991102_S-1A_EX-10.19_6227850_EX-10.19_Co-Branding Agreement_ Service Agreement.pdf',
  (<pyspark.resultiterable.ResultIterable at 0x7ff6bd6a7a50>,
   <pyspark.resultiterable.ResultIterable at 0x7ff6bd6ab050>)),
 ('ConformisInc_20191101_10-Q_EX-10.6_11861402_EX-10.6_Development Agreement.pdf',
  (<pyspark.resultiterable.ResultIterable at 0x7ff6bd4ff950>,
   <pyspark.resultiterable.ResultIterable at 0x7ff6bd4fb3d0>)),
 ('EtonPharmaceuticalsInc_20191114_10-Q_EX-10.1_11893941_EX-10.1_Development Agreement.pdf',
  (<pyspark.resultiterable.Resul

In [114]:
#join phrases group to keyword map group for calculating phrase score
aaJoin = docListAa.join(AAkeywords)
aaJoin.take(5)

[('IdeanomicsInc_20160330_10-K_EX-10.26_9512211_EX-10.26_Content License Agreement.pdf',
  (<pyspark.resultiterable.ResultIterable at 0x7ff6bd6b2590>,
   <pyspark.resultiterable.ResultIterable at 0x7ff6bd4ca750>)),
 ('DeltathreeInc_19991102_S-1A_EX-10.19_6227850_EX-10.19_Co-Branding Agreement_ Service Agreement.pdf',
  (<pyspark.resultiterable.ResultIterable at 0x7ff6bd6b2710>,
   <pyspark.resultiterable.ResultIterable at 0x7ff6bd7809d0>)),
 ('ConformisInc_20191101_10-Q_EX-10.6_11861402_EX-10.6_Development Agreement.pdf',
  (<pyspark.resultiterable.ResultIterable at 0x7ff6bec25550>,
   <pyspark.resultiterable.ResultIterable at 0x7ff6bd7b0ad0>)),
 ('EtonPharmaceuticalsInc_20191114_10-Q_EX-10.1_11893941_EX-10.1_Development Agreement.pdf',
  (<pyspark.resultiterable.ResultIterable at 0x7ff6bd6b2290>,
   <pyspark.resultiterable.ResultIterable at 0x7ff6bd25be10>)),
 ('FuelcellEnergyInc_20191106_8-K_EX-10.1_11868007_EX-10.1_Development Agreement.pdf',
  (<pyspark.resultiterable.ResultIterabl

In [115]:
#join phrases group to keyword map group for calculating phrase score
ccJoin = docListCc.join(CCkeywords)
ccJoin.take(5)

[('DeltathreeInc_19991102_S-1A_EX-10.19_6227850_EX-10.19_Co-Branding Agreement_ Service Agreement.pdf',
  (<pyspark.resultiterable.ResultIterable at 0x7ff6bd6c3790>,
   <pyspark.resultiterable.ResultIterable at 0x7ff6bd4ec310>)),
 ('ConformisInc_20191101_10-Q_EX-10.6_11861402_EX-10.6_Development Agreement.pdf',
  (<pyspark.resultiterable.ResultIterable at 0x7ff6bd6c3d10>,
   <pyspark.resultiterable.ResultIterable at 0x7ff6bd267050>)),
 ('EtonPharmaceuticalsInc_20191114_10-Q_EX-10.1_11893941_EX-10.1_Development Agreement.pdf',
  (<pyspark.resultiterable.ResultIterable at 0x7ff6bd268310>,
   <pyspark.resultiterable.ResultIterable at 0x7ff6bd2688d0>)),
 ('FuelcellEnergyInc_20191106_8-K_EX-10.1_11868007_EX-10.1_Development Agreement.pdf',
  (<pyspark.resultiterable.ResultIterable at 0x7ff6bd6c3750>,
   <pyspark.resultiterable.ResultIterable at 0x7ff6bd269c90>)),
 ('WaterNowInc_20191120_10-Q_EX-10.12_11900227_EX-10.12_Distributor Agreement.pdf',
  (<pyspark.resultiterable.ResultIterable at 

The below three code blocks are trial and error blocks used to test code that id used to create the function generateCandidateList().The functionalities tested are

1. converting resultIterable to list
2. converting resultIterable to Map
3. calculating scire for each phrase returned in block one to return a RDD with tuples containing phrase and score as a key-value pair

In [116]:
#extracting phrases list from first RDD element
l = list(govLawJoin.collect()[0][1][0])
rddForList = sc.parallelize(l)
pList = rddForList.collect()
pList

['questions',
 'respect',
 'construction',
 'agreement',
 'rights',
 'liabilities',
 'parties hereto',
 'governed',
 'laws',
 'state',
 'florida']

In [117]:
#extracting keyword map from first RDD element
t = list(govLawJoin.collect()[0][1][1])
rddForTuplelist = sc.parallelize(t)
kMap = rddForTuplelist.collectAsMap()
kMap

{'agreement': 1.0,
 'construction': 1.0,
 'florida': 1.0,
 'governed': 1.0,
 'hereto': 1.0,
 'laws': 1.0,
 'liabilities': 1.0,
 'parties': 1.0,
 'questions': 1.0,
 'respect': 1.0,
 'rights': 1.0,
 'state': 1.0}

In [118]:
#calculating candidate scores for first RDD element
scores = []
score = 0
for p in pList:
  score = 0
  words = p.split(" ")
  for w in words:
    score = score + kMap[w]
  scores.append((p,score))
scoreRDD = sc.parallelize(scores).sortBy(lambda x:-x[1])

function **generateCandidateList()**:the function takes the key-value pair with a phrase list ResultIterable as key and the word scores ResultIterable as value.The return value for this function is a list of tuples containg the candidate phrases and their repective scores sorted in the desending order of the score.

In [119]:
#calculating candidate scores for each document
def generateCandidateList(pair):
  ph_list,kw_map = pair[0],pair[1]
  pList = list(ph_list) 
  kMap = {x[0]:x[1] for x in list(kw_map)}
  scores = {}
  score = 0
  for p in pList:
    score = 0
    words = list(filter(lambda x:x,p.split(" ")))
    for w in words:
        score = score + kMap[w]
    scores[p] = score
    scoreArr = [(x[0],x[1]) for x in list(scores.items())]
    scoreArr.sort(key=lambda x:-x[1])
  return scoreArr


# Workload One - Keyword Identification

function **assignEdfRdf()** : this function created to calculate the *edf* and *rdf* of every candidate phrase that constitutes the corpus. It takes the candidate phrase scores map as input and returns a list of tuples with the key element as the phrase. The rdf value is set to 1 for each phrase and the edf value is set as 1 if index value of the phrase in the sorted phrase list is less than 4 to signify that the candidate phrase is selected as a extracted phrase for having a score in the top 4 of the phrases that were selected as cadidate phrases for the document.

This function is applied as a map functions on the RDD that is grouped by filename hence the edf and rdf values are assigned seperately for each document in the corpus. When reduce is applied on these values the final rdf and edf is calculated for the entire corpus.

In [120]:
#calculating rdf and edf for all candidate phrases
def assignRdfEdf(doc):
  k_words = doc
  for i in range(len(k_words)):
    if i < 4 :
      k_words[i] = (k_words[i][0],1,1)
    else:
      k_words[i] = (k_words[i][0],1,0)
  return k_words

In [121]:
govLawJoin.mapValues(generateCandidateList).take(3)

[('FulucaiProductionsLtd_20131223_10-Q_EX-10.9_8368347_EX-10.9_Content License Agreement.pdf',
  [('parties hereto', 2.0),
   ('questions', 1.0),
   ('respect', 1.0),
   ('construction', 1.0),
   ('agreement', 1.0),
   ('rights', 1.0),
   ('liabilities', 1.0),
   ('governed', 1.0),
   ('laws', 1.0),
   ('state', 1.0),
   ('florida', 1.0)]),
 ('IdeanomicsInc_20160330_10-K_EX-10.26_9512211_EX-10.26_Content License Agreement.pdf',
  [('without giving effect', 3.0),
   ('new york', 2.0),
   ('laws principles', 2.0),
   ('agreement', 1.0),
   ('governed', 1.0),
   ('construed', 1.0),
   ('respects', 1.0),
   ('accordance', 1.0),
   ('laws', 1.0),
   ('state', 1.0),
   ('conflicts', 1.0)]),
 ('DeltathreeInc_19991102_S-1A_EX-10.19_6227850_EX-10.19_Co-Branding Agreement_ Service Agreement.pdf',
  [('without giving effect', 3.0),
   ('laws principles thereof', 3.0),
   ('new york', 2.0),
   ('agreement', 1.0),
   ('governed', 1.0),
   ('construed', 1.0),
   ('accordance', 1.0),
   ('laws', 1.0)

# Workload One - Essentiality Score Calculation and Final Results

The three blocks below return the final result of method 1
the actions performed are
1. generating candidate score based on the values given by the RDD join
2. assign values of rdf and edf to each phrases
3. As we dont need the filename any more, the tuple format is changed from (filename,(phrase,rdf,edf)) to (phrase,rdf,edf)
4. In order to apply reduce on the value part of the tuple the tuple structure is changed to (phrase,(rdf,edf))
5. reduce is applied to the rdf and edf values to calculate final values for each phrase
6. ess is calculated for each phrase
7. RDD os sorted based on ess calculated
8. Top 20 phrases are shown along with their respective rdf,edf,ess values

**FINAL RESULTS ARE DISPLAYED IN THE FORMAT: (Candidate Phrase,(rdf,edf),ess))**

In [122]:
#top 20 keywords for Government Law corpus
CorpusMapGl = govLawJoin.mapValues(generateCandidateList)\
                  .mapValues(assignRdfEdf)\
                  .flatMap(lambda x:(x[1]))\
                  .map(lambda x:(x[0],(x[1],x[2])))\
                  .reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1]))\
                  .mapValues(lambda x:(x,(x[1]*x[1])/x[0])).sortBy(lambda x:-x[1][1])
CorpusMapGl.take(20)

[('agreement', ((398, 244), 149.58793969849245)),
 ('governed', ((359, 141), 55.37883008356546)),
 ('new york', ((76, 61), 48.96052631578947)),
 ('without regard', ((69, 53), 40.710144927536234)),
 ('without giving effect', ((38, 38), 38.0)),
 ('construed', ((258, 77), 22.98062015503876)),
 ('laws principles', ((34, 27), 21.441176470588236)),
 ('without reference', ((25, 23), 21.16)),
 ('united states', ((23, 20), 17.391304347826086)),
 ('law principles', ((40, 26), 16.9)),
 ('substantive laws', ((20, 18), 16.2)),
 ('internal laws', ((23, 19), 15.695652173913043)),
 ('laws provisions', ((17, 16), 15.058823529411764)),
 ('law provisions', ((19, 16), 13.473684210526315)),
 ('parties hereto', ((13, 12), 11.076923076923077)),
 ('law rules', ((22, 15), 10.227272727272727)),
 ('new york without regard', ((10, 10), 10.0)),
 ('performed entirely within', ((9, 9), 9.0)),
 ('laws provisions thereof', ((8, 8), 8.0)),
 ('performed wholly within', ((8, 8), 8.0))]

**FINAL RESULTS ARE DISPLAYED IN THE FORMAT: (Candidate Phrase,(rdf,edf),ess))**

In [123]:
#top 20 keywords for Anti-Assignment corpus
CorpusMapAA = aaJoin.mapValues(generateCandidateList).mapValues(assignRdfEdf).flatMap(lambda x:(x[1])).map(lambda x:(x[0],(x[1],x[2]))).reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1])).mapValues(lambda x:(x,(x[1]*x[1])/x[0])).sortBy(lambda x:-x[1][1])
CorpusMapAA.take(20)

[('prior written consent', ((236, 217), 199.52966101694915)),
 ('neither party may assign', ((58, 58), 58.0)),
 ('either party without', ((38, 36), 34.10526315789474)),
 ('either party may assign', ((26, 26), 26.0)),
 ('agreement may', ((50, 36), 25.92)),
 ('neither party', ((51, 36), 25.41176470588235)),
 ('agreement without', ((73, 36), 17.753424657534246)),
 ('obligations hereunder without', ((20, 18), 16.2)),
 ('party may assign', ((18, 17), 16.055555555555557)),
 ('third party without', ((17, 16), 15.058823529411764)),
 ('written consent', ((39, 23), 13.564102564102564)),
 ('express prior written consent', ((13, 13), 13.0)),
 ('binding upon', ((26, 18), 12.461538461538462)),
 ('prior written approval', ((14, 13), 12.071428571428571)),
 ('express written consent', ((13, 12), 11.076923076923077)),
 ('parties hereto', ((14, 11), 8.642857142857142)),
 ('unreasonably withheld', ((87, 25), 7.183908045977011)),
 ('otherwise transfer', ((33, 15), 6.818181818181818)),
 ('respective success

**FINAL RESULTS ARE DISPLAYED IN THE FORMAT: (Candidate Phrase,(rdf,edf),ess))**

In [124]:
#top 20 keywords for change of control corpus
CorpusMapCC = ccJoin.mapValues(generateCandidateList).mapValues(assignRdfEdf).flatMap(lambda x:(x[1])).map(lambda x:(x[0],(x[1],x[2]))).reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1])).mapValues(lambda x:(x,(x[1]*x[1])/x[0])).sortBy(lambda x:-x[1][1])
CorpusMapCC.take(20)

[('prior written consent', ((17, 14), 11.529411764705882)),
 ('control occurs', ((3, 3), 3.0)),
 ('licensor may terminate', ((3, 3), 3.0)),
 ('provide written notice', ((4, 3), 2.25)),
 ('days written notice', ((4, 3), 2.25)),
 ('surviving entity acquires', ((2, 2), 2.0)),
 ('agreement upon written notice', ((2, 2), 2.0)),
 ('providing written notice', ((2, 2), 2.0)),
 ('substantial change', ((2, 2), 2.0)),
 ('automatically terminate upon', ((2, 2), 2.0)),
 ('material change', ((2, 2), 2.0)),
 ('without limiting', ((2, 2), 2.0)),
 ('terminated upon', ((2, 2), 2.0)),
 ('authority granted hereunder', ((2, 2), 2.0)),
 ('received notice', ((2, 2), 2.0)),
 ('party may assign', ((2, 2), 2.0)),
 ('interest expressly assumes', ((2, 2), 2.0)),
 ('party may terminate', ((2, 2), 2.0)),
 ('either party may', ((2, 2), 2.0)),
 ('either party may terminate', ((2, 2), 2.0))]

# Workload Two - Candidate Phrase Identification & word score calculation

The three cells below extract the text values from each RDD because all phrases in one column are treated as a single document.

In [125]:
#function 2
#extracting data for Government law document
governmentLawDoc = rddGovernmentLaw.map(lambda x:x[1])
governmentLawDoc.take(3)

['This Agreement is accepted by Company in the State of Nevada and shall be governed by and construed in accordance with the laws thereof, which laws shall prevail in the event of any conflict. (Page 13)',
 'This Agreement shall be governed by laws of the Province of Ontario and the federal laws of Canada applicable therein. (Page 9)  This Agreement is subject to all laws, regulations, license conditions and decisions of the Canadian Radio-television and Telecommunications Commission (‚ÄúCRTC‚Äù) municipal, provincial and federal governments or other authorities which are applicable to Rogers and/or Licensor, and which are now in force or hereafter adopted (‚ÄúApplicable Law‚Äù). (Page 11)',
 'All questions with respect to the construction of this Agreement, and the rights and liabilities of the Parties hereto, shall be governed by the laws of the State of Florida. (Page 6)']

In [126]:
#function 2
#extracting data for Anti assignment document
antiAssignmentDoc = rddAntiAssignment.map(lambda row:row[2])
antiAssignmentDoc.take(3)

['MA may not assign, sell, lease or otherwise transfer in whole or in party any of the rights granted pursuant to this Agreement without prior written approval of Company. (Page 12)',
 'This Agreement may not be assigned, sold or transferred without the prior written consent of the other party. (Page 9)  Notwithstanding the foregoing, Rogers may, without consent, assign its rights and obligations under this Agreement in whole or in part to: (i) a person that directly or indirectly controls, is controlled by or is under common control with Rogers; or (ii) a purchaser of all or substantially all of the assets used in connection with the ROD Service. A change of control of Rogers shall not be considered an assignment of this Agreement. (Page 9)  Any purported assignment, sale, or transfer in contravention of this Section shall be null and void. (Page 9)',
 "Licensee shall not assign or otherwise transfer any of its rights, or delegate or otherwise transfer any of its obligations or perfor

In [127]:
#function 2
#extracting data for Change of Control document
changeOfControlDoc = rddAntiAssignment.map(lambda row:row[1])
changeOfControlDoc.take(3)

['',
 '',
 "For purposes of the preceding sentence, and without limiting its generality, any merger, consolidation or reorganization involving Licensee (regardless of whether Licensee is a surviving or disappearing entity) will be deemed to be a transfer of rights, obligations or performance under this Agreement for which Licensor's prior written consent is required. (Page 15)"]

The below 2 blocks complete implementation for the government Law document.
The first block does the following actions:
1. changing all text to lower case
2. extract all candidate phrases from the text
3. calculate deg(w) and feq(w) based on the candidate phrases
4. reduce function for calculating final deg(w) and feq(w) for each word
5. calculate final deg(w)/feq(w)
6. sort deg(w)/feq(w) in desending order




In [128]:
#function 2
#extracting phrrases and keyword scores 
glPhrases = governmentLawDoc.map(lambda x:x.lower()).flatMap(splitPhrase)
GovLawKeywordMapRDD = glPhrases.flatMap(keywordMap)\
                      .reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1]))\
                      .map(lambda word:(word[0],word[1][0]/word[1][1]))\
                      .sortBy(lambda x:-x[1])

In [129]:
glPhrases.take(25)

['agreement',
 'accepted',
 'company',
 'state',
 'nevada',
 'governed',
 'construed',
 'accordance',
 'laws thereof',
 'laws',
 'prevail',
 'event',
 'conflict',
 'agreement',
 'governed',
 'laws',
 'province',
 'ontario',
 'federal laws',
 'canada applicable therein',
 'agreement',
 'subject',
 'laws',
 'regulations',
 'license conditions']

In [130]:
GovLawKeywordMapRDD.take(10)

[('payment', 4.0),
 ('systems', 4.0),
 ('following', 4.0),
 ('attorns', 4.0),
 ('practices', 4.0),
 ('independently', 4.0),
 ('private', 4.0),
 ('oftexas', 4.0),
 ('initiate', 4.0),
 ('data', 4.0)]

In [131]:
#extracting phrrases and keyword scores 
ccPhrases = changeOfControlDoc.filter(lambda x:x).map(lambda x:x.lower()).flatMap(splitPhrase)
changeControlKeywordMapRDD = ccPhrases.flatMap(keywordMap)\
                      .reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1]))\
                      .map(lambda word:(word[0],word[1][0]/word[1][1]))\
                      .sortBy(lambda x:-x[1])

In [132]:
ccPhrases.take(25)

['purposes',
 'preceding sentence',
 'without limiting',
 'generality',
 'merger',
 'consolidation',
 'reorganization involving licensee',
 'regardless',
 'whether licensee',
 'surviving',
 'disappearing entity',
 'deemed',
 'transfer',
 'rights',
 'obligations',
 'performance',
 'agreement',
 'licensor',
 'prior written consent',
 'required',
 'term',
 'agreement',
 'effective',
 'date first stated',
 'continue']

In [133]:
changeControlKeywordMapRDD.take(10)

[('performed', 4.0),
 ('accruing', 4.0),
 ('breaching', 4.0),
 ('reynolds', 4.0),
 ('aside', 4.0),
 ('ncm�', 4.0),
 ('detail', 4.0),
 ('therefor', 4.0),
 ('exclusive', 4.0),
 ('call', 4.0)]

In [134]:
#extracting phrrases and keyword scores 
aaPhrases = antiAssignmentDoc.filter(lambda x:x).map(lambda x:x.lower()).flatMap(splitPhrase)
antiAssignmentKeywordMapRDD = aaPhrases.flatMap(keywordMap)\
                      .reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1]))\
                      .map(lambda word:(word[0],word[1][0]/word[1][1]))\
                      .sortBy(lambda x:-x[1])

In [135]:
aaPhrases.take(25)

['may',
 'assign',
 'sell',
 'lease',
 'otherwise transfer',
 'whole',
 'party',
 'rights granted pursuant',
 'company',
 'agreement may',
 'assigned',
 'sold',
 'transferred without',
 'prior written consent',
 'party',
 'notwithstanding',
 'foregoing',
 'rogers may',
 'without consent',
 'assign',
 'rights',
 'obligations',
 'agreement',
 'whole',
 'part']

In [136]:
antiAssignmentKeywordMapRDD.take(10)

[('limiting', 4.0),
 ('restrictions', 4.0),
 ('reynolds', 4.0),
 ('ncm�', 4.0),
 ('nfla', 4.0),
 ('whose', 4.0),
 ('primary', 4.0),
 ('entities', 4.0),
 ('thewritten', 4.0),
 ('moody', 4.0)]

# Workload Two - Candidate Keyword Score Calculation and Final Results

The below blocks does the following actions:
1. combines phrase List and the keyword scores into a tuple as a key value pair
2. generate scores for all candidate phrases
3. sort the result in decreasing order of scores
4. print the top 20 candidate phrases from the final list


**FINAL RESULTS ARE DISPLAYED IN THE FORMAT: (Candidate Phrase,score)**

In [137]:
#top 20 keywords extracted from Government Law document

#join
GL_DOC = (glPhrases.collect(),GovLawKeywordMapRDD.collect())

#calculating score for candidate phrases
GL_SCORE = sc.parallelize(generateCandidateList(GL_DOC)).sortBy(lambda x:-x[1])
GL_SCORE.take(20)

[('1 covering competition following', 14.5),
 ('payment data systems hereunder', 14.4),
 ('either party herein initiate', 13.45),
 ('met independently without reference', 13.244457013574662),
 ('1980 united nations convention', 12.412280701754387),
 ('intellectual property right applies', 11.952380952380953),
 ('general international business practices', 11.64102564102564),
 ('united states trademark act', 11.352756892230575),
 ('maryland without giving effect', 11.129508229963905),
 ('parties hereto expressly attorns', 11.002816901408451),
 ('pennsylvania without giving effect', 10.82068470055214),
 ('agreement takes effect upon', 10.720484026822055),
 ('german private international law', 10.685741088180112),
 ('massachusetts without giving effect', 10.59379394424962),
 ('intellectual property laws relevant', 10.58523884989812),
 ('new york without recourse', 10.581361775479422),
 ('delaware without giving effect', 10.532569454453702),
 ('nevada without giving effect', 10.479508229963

**FINAL RESULTS ARE DISPLAYED IN THE FORMAT: (Candidate Phrase,score)**

In [138]:
#top 20 keywords extracted from Change of Control document

#join
CC_DOC = (ccPhrases.collect(),changeControlKeywordMapRDD.collect())

#calculating score for candidate phrases
CC_SCORE = sc.parallelize(generateCandidateList(CC_DOC)).sortBy(lambda x:-x[1])
CC_SCORE.take(20)

[('vs key leadership position', 14.5),
 ('management team successfully completes', 13.833333333333332),
 ('ehave companion solution within', 12.475),
 ('required local advertising expenditures', 12.357142857142858),
 ('without thereby becoming liable', 12.3),
 ('reasonable detail based upon', 12.200980392156863),
 ('advertising agency representing tda', 12.133333333333335),
 ('providing ebix written notice', 12.073275862068964),
 ('set aside within ninety', 11.975),
 ('upon sending written notice', 11.899256254225829),
 ('provide prompt written notice', 11.83958020989505),
 ('dova hereunder whether accruing', 11.515151515151516),
 ('shareholder owning fifty percent', 11.505882352941178),
 ('total voting power represented', 11.502415458937199),
 ('04 exxonmobil may terminate', 11.469019274376416),
 ('reynolds group holdings limited', 11.411111111111111),
 ('providing prior written notice', 11.400198938992041),
 ('admit additional general partners', 11.333333333333334),
 ('without limiti

**FINAL RESULTS ARE DISPLAYED IN THE FORMAT: (Candidate Phrase,score)**

In [139]:
#top 20 keywords extracted from Anti-Assignment document

#join
AA_DOC = (aaPhrases.collect(),antiAssignmentKeywordMapRDD.collect())

#calculating score for candidate phrases
AA_SCORE = sc.parallelize(generateCandidateList(AA_DOC)).sortBy(lambda x:-x[1])
AA_SCORE.take(20)

[('minimum net worth equal', 14.5),
 ('indirect loss thus caused', 14.444444444444445),
 ('restrictions set forth herein', 13.759090909090908),
 ('managing group may decide', 13.522942643391522),
 ('express prior written authorization', 13.344629439188262),
 ('expressly set forth herein', 12.605244755244755),
 ('exxon mobil corporation owns', 12.583333333333334),
 ('reynolds group holdings limited', 12.45),
 ('without limiting article viii', 12.374066040009641),
 ('nfla prior written consent', 12.360207051540067),
 ('otherwise create derivative works', 12.235135135135135),
 ('forty niners sc without', 12.218510484454086),
 ('taken together would constitute', 12.083333333333332),
 ('without first obtaining ginkgo�', 11.935177151120751),
 ('without first obtaining bp�', 11.935177151120751),
 ('take relevant breaching liability', 11.9),
 ('prior express written approval', 11.869629439188262),
 ('express prior written approval', 11.869629439188262),
 ('60 days� written notice', 11.80194295