In [4]:
import json
import pandas as pd 
import glob
import os
import re
from bs4 import BeautifulSoup 
from IPython.display import display, HTML
import pickle

In [5]:
## Read in consolidated annotations
#annotations={}
#annofiles=glob.glob("Data/opp115-parsed-annotation-1.0/*.json")
#print len(annofiles)
policyFiles=glob.glob("/share/pub/OPP-115/sanitized_policies/*.html")
print(len(policyFiles))

115


###  Read in the annotations and original policies

In [6]:
def readAnno(filelist):
  annotations={}
  for filename in filelist:
    website=re.sub(".json", '', os.path.basename(filename))
    with open(filename, "r") as f:
      annotations[website]=json.load(f)
    f.close()
  return annotations

def readPolicies(filelist):
  soups={}
  for filename in filelist:
    base=os.path.basename(filename).split("_")[1]
    website=re.sub(".html", '', base)
    soups[website]=BeautifulSoup(open(filename, "r").read(), 'html.parser')
  return soups

In [7]:
with open("Data/parsed-annotation-0.5.pk", 'rb') as f:
  annotations=pickle.load(f)

In [8]:
annotations["playstation.com"]["Data Retention"]['Retention Purpose']

Unnamed: 0,endIndexInSegment,section,selectedText,startIndexInSegment,value
0,153,18,promotional purpose through one of our websites,106,Marketing
1,201,18,or to make a purchase from the PlayStation Shop,154,Perform service
2,226,22,Email addresses collected from consumers durin...,0,Perform service
3,200,24,so that we may assist these customers with cur...,124,Perform service
4,159,40,necessary to fulfill the purposes outlined in ...,102,Other


In [9]:
### read in sanitized policy texts
policySoups=readPolicies(policyFiles)

In [10]:
## make sure that the website names are the same in both dictinoaries
test1=filter(lambda website: website in annotations.keys(), policySoups.keys())
test2=filter(lambda website: website in policySoups.keys(), annotations.keys())
print(len(list(test1)))
print(len(list(test2)))

115
115


## Get a list of tokens from all of the policies, with frequencies

In [11]:
## Get the texts from beautifulsoup objects

def extractTexts(soups):
  policyTexts={}
  for website in soups:
    policyTexts[website]=soups[website].get_text()
  return policyTexts

In [12]:
## get texts from soup objects
policyTexts=extractTexts(policySoups)

In [13]:
## get the training and validation set 
import random
websites=policyTexts.keys()
seed=124
random.seed(seed)
trainWebsites=random.sample(websites, 105)
valWebstes=[web for web in websites if web not in trainWebsites]

trainTexts={web:policyTexts[web] for web in trainWebsites}

In [14]:
from nltk.corpus import stopwords
from nltk import word_tokenize
from nltk.tokenize import sent_tokenize
import re
import itertools
import string
engstop=set(stopwords.words("english"))


In [15]:
print (string.punctuation)

!"#$%&'()*+,-./:;<=>?@[\]^_`{|}~


In [16]:
from collections import Counter
import itertools


def cleantext(rawtext, remove):
  # remove punctuations
  cleaned3=remove.sub("", rawtext.lower())
  return cleaned3


def getTokens(rawtext, length, stopwords):  # length defines number of words in the token, i.e. unigrams, bigrams ec
  punctuations = re.compile('[%s]' % re.escape(string.punctuation))
  if length==1:
    cleaned=cleantext(rawtext, punctuations)
    tokenlist=word_tokenize(cleaned)
    tokensNoStop=[token for token in tokenlist if token not in stopwords]
    return list(set(tokensNoStop))
  
  if length>=2:
    sentenceList=sent_tokenize(rawtext)
    sentenceClean=[cleantext(sentence, punctuations) for sentence in sentenceList]
    unigramLists=[word_tokenize(sentence) for sentence in sentenceClean]
    
    bigramLists=[zip(*[sentUnigram[i::] for i in range(length)]) for sentUnigram in unigramLists]
    bigrams=list(itertools.chain.from_iterable(bigramLists))
    
    return list(set(bigrams))

In [17]:
def getAllTokens(textDict, ngram, stopwords):
  allTokens=[]
  for website in textDict:
    allTokens.extend(getTokens(textDict[website], ngram, stopwords))
  return allTokens

#### Generate Training data

In [18]:
%%time
trainUnigrams=getAllTokens(trainTexts, 1, engstop)
trainBigrams=getAllTokens(trainTexts, 2, engstop)
trainTrigrams=getAllTokens(trainTexts, 3, engstop)

CPU times: user 7.79 s, sys: 88 ms, total: 7.88 s
Wall time: 7.82 s


In [19]:
## Number of UNIQUE n-grams
print(len(trainUnigrams))
print(len(trainBigrams))
print(len(trainTrigrams))

49721
154100
188901


In [20]:
## Get annotation of the training documents
trainAnno={key:annotations[key] for key in trainTexts.keys()}

In [21]:
hondaSoup=policySoups["honda.com"]
hondaText=hondaSoup.get_text()
hondaText.split("|||")[52][250:360]

u" name, email address, and your friend's email address. This information is collected and used only in a manner"

In [22]:
psSoup=policySoups['playstation.com']

In [23]:
annotations['playstation.com']['First Party Collection/Use']["Collection Mode"].keys()

Index([u'endIndexInSegment', u'section', u'selectedText',
       u'startIndexInSegment', u'value'],
      dtype='object')

In [24]:
#print(annotations['playstation.com']['First Party Collection/Use']["Personal Information Type"].keys())
#print annotations['playstation.com']['First Party Collection/Use']["Personal Information Type"]["selectedText"]

print (annotations['playstation.com']['First Party Collection/Use']["Personal Information Type"]["startIndexInSegment"][0])
#print annotations['playstation.com']['First Party Collection/Use']["Personal Information Type"]['endIndexInSegment'][0]

textinfo=pd.DataFrame({"selectedText":annotations['playstation.com']['First Party Collection/Use']["Personal Information Type"]["selectedText"],\
                      "startIndexInSegment": annotations['playstation.com']['First Party Collection/Use']["Personal Information Type"]["startIndexInSegment"],\
                      "endIndexInSegment": annotations['playstation.com']['First Party Collection/Use']["Personal Information Type"]['endIndexInSegment'],\
                      "value": annotations['playstation.com']['First Party Collection/Use']["Personal Information Type"]['value']})


textinfo=textinfo[["selectedText", "value", "startIndexInSegment", "endIndexInSegment"]]

344


In [25]:
psSections=psSoup.get_text().split("|||")

In [26]:
psSections[6][41:680]

u'ionNetwork   In parts of North America and South America, Sony Network Entertainment America Inc. ("SNEA") operates Sony Online Services, a network of online games, movies, music, other media and content and communication services. PlayStation Network ("PSN") is one of these Sony Online Services. With a Sony Online Services or Sony Entertainment Network account, users can purchase goods and services from SNEA through Sony Online Services and may have the opportunity to participate in various network community activities. Users can register for and log into a Sony Entertainment Network account via us.playstation.com. Collection and '

## Determine Relevance of each sentence in training set

### 1. Get the index of each sentences in the policy texts

In [27]:
## Label the sentences in a policy as relevant or irrelevant
## 
# Topic: "Personal Information Type"

In [28]:
def getSentIdx(raw):
  allIdx={}
  secList=raw.split("|||")
  for i in range(len(secList)):
    try:
      secText=secList[i]
    except IndexError:
      print(i)
    secSents=sent_tokenize(secText)
    secIdxLists=[]
    for sent in secSents:
      m=re.search(re.escape(sent), secText)  ## escape to account for quotes in the string
      secIdxLists.append((sent, i, m.start(),m.end()))
    allIdx[i]=secIdxLists
  return allIdx

def getAllIdx(textDict):
  results={}
  for website, text in textDict.items():
    try:
      senidxes=getSentIdx(text)
      results[website]=senidxes
    except IndexError:
      print(website)
  return results


In [29]:
%%time
trainSentIdx=getAllIdx(trainTexts)

CPU times: user 8.25 s, sys: 8 ms, total: 8.26 s
Wall time: 8.25 s


In [27]:
### test to see if the indices are correct
trainAnno["playstation.com"]['First Party Collection/Use']['Personal Information Type'].loc[2]

endIndexInSegment                          912
section                                      8
selectedText           credit card information
startIndexInSegment                        889
value                                Financial
Name: 2, dtype: object

In [28]:
trainSentIdx["playstation.com"][8][3]

(u'Collection of personal information required to access certain website services may include the collection of date of birth, name, mailing address, email address or credit card information.',
 8,
 709,
 897)

In [29]:
print( re.search("credit card information", trainSentIdx["playstation.com"][8][3][0]).start())
print (re.search("credit card information", trainSentIdx["playstation.com"][8][3][0]).end())

print ("start index in text", 709+164)
print ("end index in text", 709+187)

print ("Length of sentence in Annotation, and raw text", 912-889, 896-873)

## for playstation.com anno indices are 16 characters ahead of copurs indices

164
187
('start index in text', 873)
('end index in text', 896)
('Length of sentence in Annotation, and raw text', 23, 23)


In [30]:
trainSentIdx["playstation.com"][8]

[(u'WHAT WE COLLECT:   Collection of Personal Information through our Websites   We do not require that website visitors reveal any personally identifying information in order to gain general access to our websites.',
  8,
  0,
  211),
 (u'However, visitors who do not wish to, or are not allowed by law to share personally identifying information, may not be able to access certain areas of our websites, participate in certain activities, or make a purchase from the PlayStationShop.',
  8,
  212,
  457),
 (u'Although personally identifying information may be required to participate in certain promotions or features offered through our websites or to make a purchase from the PlayStationShop, participants provide this information on a voluntary basis only.',
  8,
  458,
  708),
 (u'Collection of personal information required to access certain website services may include the collection of date of birth, name, mailing address, email address or credit card information.',
  8,
  709,
  897)]

In [None]:
## Another example:
exampleSite="honda.com"
annoEg=trainAnno[exampleSite]['First Party Collection/Use']['Personal Information Type'].loc[6]
print (annoEg)
selecTextAnno=annoEg["selectedText"]

print (selecTextAnno)

print ("\n")
corpusText=trainSentIdx["honda.com"][annoEg['section']]
print ("Text in corpus : ")
display(corpusText)

print ("\n")
sentenceNumber=0
corpusStart=re.search(selecTextAnno, corpusText[sentenceNumber][0]).start()
corpusEnd= re.search(selecTextAnno, corpusText[sentenceNumber][0]).end()

print ("start index in corpus section :", corpusText[sentenceNumber][2]+corpusStart)
print ("end index in corpus section :", corpusText[sentenceNumber][2]+corpusEnd)



In [None]:
##a third example

In [None]:
## Another example:
exampleSite="uptodate.com"
annoEg=trainAnno[exampleSite]['First Party Collection/Use']['Personal Information Type'].loc[4]
print (annoEg)
selecTextAnno=annoEg["selectedText"]

print (selecTextAnno)

print ("\n")
corpusText=trainSentIdx[exampleSite][annoEg['section']]
sentenceNumber=[idx for idx in range(len(corpusText))][0]

print ("Text in corpus : ")
display(corpusText)

print ("\n")

corpusStart=re.search(selecTextAnno, corpusText[sentenceNumber][0]).start()
corpusEnd= re.search(selecTextAnno, corpusText[sentenceNumber][0]).end()

print ("start index in corpus section :", corpusText[sentenceNumber][2]+corpusStart)
print ("end index in corpus section :", corpusText[sentenceNumber][2]+corpusEnd)

print ("Annotation indices is ahead of corpus indices by {} characters ".format(annoEg["startIndexInSegment"]-corpusStart))


In [None]:
trainAnno[exampleSite]['First Party Collection/Use']['Personal Information Type'].loc[4]["startIndexInSegment"]

### Gather the sentences and put them in a convenient structure

In [31]:
%%time
# for each selectedText in annotation, search for corresponding sentence in the corpus 
# if a selectedText is more than one sentence long, it will be discarded, since it does not provide much information
# for the importance of words
def labelRel(annotations, sentIdx):
  sentlabels={}
  for website, siteanno in annotations.items():
    sentlabels[website]={}
    siteSents=sentIdx[website]
    for section, sentList in siteSents.items():
      
      sentlabels[website][section]=[list(sentTuple) for sentTuple in sentList]
      for sentEntry in sentlabels[website][section]:
        sentEntry.append([])

    for category in siteanno:
      for topic in siteanno[category]:
        topicFrame=siteanno[category][topic]
        for idx in topicFrame.index:
          if topicFrame.loc[idx]["startIndexInSegment"]!=-1 and topicFrame.loc[idx]["value"]!="Unspecified":
            entry=topicFrame.loc[idx]
            anno_start=entry["endIndexInSegment"]
            anno_end=entry["startIndexInSegment"]
            corpusSents=sentlabels[website][entry["section"]]
            for sent in corpusSents:
              corpus_start=sent[2]
              corpus_end=sent[3]
              if corpus_start <=anno_start and corpus_end >= anno_end:
                sent[4].append((category, topic, entry["value"]))
              elif  abs(corpus_start-anno_start) <20 and abs(corpus_end-anno_end)<20:
                sent[4].append((category,topic, entry["value"]))
              elif anno_start<=corpus_start and anno_end >= corpus_end:
                sent[4].append((category,topic, entry["value"]))
                
                
  return sentlabels

labeledTrainSents=labelRel(trainAnno, trainSentIdx)

CPU times: user 48.6 s, sys: 208 ms, total: 48.8 s
Wall time: 48.5 s


In [32]:
print(labeledTrainSents["playstation.com"][2][2])
egAnno=trainAnno["playstation.com"]['Data Security']['Security Measure']
print(egAnno.loc[egAnno["section"]==2, "selectedText"].values)

[u'As part of the privacy program, we are subject to frequent audits of our sites and other enforcement and accountability mechanisms administered independently by ESRB.', 2, 467, 633, [('Data Security', 'Security Measure', 'Privacy/Security program'), ('Other', 'Other Type', 'Practice not covered')]]
[ 'To protect your privacy to the maximum extent possible, we have undertaken this privacy initiative and our websites have been reviewed and certified by ESRB Privacy Online to meet established online information collection and use practices. As part of the privacy program, we are subject to frequent audits of our sites and other enforcement and accountability mechanisms administered independently by ESRB.']


In [33]:
### Collect all the sentences , along with their topics, for ease of processing later
### Leave out the value for now

def gatherSentsTopics(labeldIdxSet):
  topiclist=[]
  allsentences=[]
  for website, corpus in labeldIdxSet.items():
    for section, sentLists in corpus.items():
      for sent in sentLists:
        allsentences.append([sent[0], [(item[0], item[1]) for item in sent[4] if item[0]!="Other"]])
        for item in sent[4]:
          topiclist.append((item[0], item[1]))
  return set(topiclist), allsentences

alltopics, allSentences=gatherSentsTopics(labeledTrainSents)

In [34]:
allSentences[4]

[u'For example, Bing uses a cookie with a unique identifier known as the Search ID to operate the service and enable certain search features.',
 [('First Party Collection/Use', 'Collection Mode'),
  ('First Party Collection/Use', 'Identifiability'),
  ('First Party Collection/Use', 'Personal Information Type'),
  ('First Party Collection/Use', 'Personal Information Type'),
  ('First Party Collection/Use', 'Personal Information Type'),
  ('First Party Collection/Use', 'Purpose'),
  ('First Party Collection/Use', 'Purpose')]]

In [35]:
## for each topic, we get alist of related sentences and a list of unrelated sentences

def getTopicRelevanceList(labeledSentences, topiclist):
  topicSentsCollection={topic:{"Related":[], "Unrelated":[]} for topic in topiclist}
  for entry in labeledSentences:
    labelset=set(entry[1])
    for topic in topiclist:
      if topic not in labelset:
        topicSentsCollection[topic]["Unrelated"].append(entry[0])
      else:
         topicSentsCollection[topic]["Related"].append(entry[0])
          
  results={}
  for topic in topiclist:
    results[topic]={}
    results[topic]["Related"]=list(set(topicSentsCollection[topic]["Related"]))
    results[topic]["Unrelated"]=list(set(topicSentsCollection[topic]["Unrelated"]))
  
  return results


relevantSetences=getTopicRelevanceList(allSentences, alltopics)

In [36]:
print(len(relevantSetences[('First Party Collection/Use', 'Does/Does Not')]["Related"]))
print(len(set(relevantSetences[('First Party Collection/Use', 'Does/Does Not')]["Related"])))

1424
1424


In [37]:
relevantSetences[('User Choice/Control', 'Purpose')]["Related"][2]

u'For more information about Flash cookies and how to remove them from your computer, please see the paragraph below entitled "SPECIAL NOTE - Flash Cookies."'

In [38]:
# Calculate -2log(lambda) scores , version 1
# n is the length of ngram, 
# get O11, O12, O21, and O22 counts, see Lin and Hovy paper
from scipy.stats import binom
def getCounts(relevantSents, ngramList, n):
  #break each sentence to the type of ngrams
  ngramCounts={}
  topiclist=list(relevantSents.keys())
  sentNgram={topic:{"Related":[], "Unrelated":[]} for topic in topiclist}
  for topic, sentMap in relevantSents.items():
    print("Setting Default for topic {}".format(str(topic)))
    ngramCounts.setdefault(topic, {})

    for relevance, sentlist in sentMap.items():
      for sent in sentlist:
        sentNgram[topic][relevance].append(getTokens(sent, length=n,  stopwords=engstop))
    #for ngram in ngramList:
      #ngramCounts[topic].setdefault(ngram, Counter({"O11":0, "O12":0, "O21":0, "O22":0}))
    
  for topic, releNgrams in sentNgram.items():
    print("Calculating Signature Scores for topic {}".format(str(topic)))
    for relSent in releNgrams["Related"]:
      for ngram in ngramList:
        ngramCounts[topic].setdefault(ngram, Counter({"O11":0, "O12":0, "O21":0, "O22":0}))
        if ngram in relSent:
          ngramCounts[topic][ngram]+=Counter({"O11":1})
        else:
          ngramCounts[topic][ngram]+=Counter({"O21":1})
          
    for unrelSent in releNgrams["Unrelated"]:
      for ngram in ngramList:
        ngramCounts[topic].setdefault(ngram, Counter({"O11":0, "O12":0, "O21":0, "O22":0}))
        if ngram in unrelSent:
          ngramCounts[topic][ngram]+=Counter({"O12":1})
        else:
          ngramCounts[topic][ngram]+=Counter({"O22":1})
  
  return ngramCounts

In [None]:
%%time
# Get the scores for Unigrams, Bigrams and Trigrams
unigramCounts=getCounts(relevantSetences, trainUnigrams, 1)
#bigramCounts=getCounts(relevantSetences, trainBigrams, 2)
#bigramCounts=getCounts(relevantSetences, trainTrigrams, 3)

In [None]:
# Calculate -2log(lambda) scores, version 2
# n is the length of ngram, 
# get O11, O12, O21, and O22 counts, see Lin and Hovy paper
from scipy.stats import binom
def getCountsTwo(relevantSents, ngramList, n):
  #break each sentence to the type of ngrams
  ngramCounts={}
  topiclist=list(relevantSents.keys())
  sentNgram={topic:{"Related":[], "Unrelated":[]} for topic in topiclist}
  
  for topic, sentMap in relevantSents.items():
    print("Setting Default for topic {}".format(str(topic)))
    ngramCounts.setdefault(topic, {})
    for relevance, sentlist in sentMap.items():
      for sent in sentlist:
        sentNgram[topic][relevance].append(getTokens(sent, length=n,  stopwords=engstop))
        
    for ngram in ngramList:
      ngramCounts[topic].setdefault(ngram, Counter({"O11":0, "O12":0, "O21":0, "O22":0}))
  
  for topic, releNgrams in sentNgram.items():
    print("Calculating Signature Scores for topic {}".format(str(topic)))
    for relSent in releNgrams["Related"]:
      counted=[]
      for ngram in relSent:
        ngramCounts[topic][ngram]+=Counter({"O11":1})
        counted.append(ngram)
        
      left=[x for x in ngramList if x not in counted]
      
      for otherngram in left:
        ngramCounts[topic][otherngram]+=Counter({"O21":1})
          
    for unrelSent in releNgrams["Unrelated"]:
      unrelCounted=[]
      for ngram in unrelSent:
        ngramCounts[topic][ngram]+=Counter({"O12":1})
        unrelCounted.append(ngram)
      unrelLeft=[x for x in ngramList if x not in unrelCounted]
      
      for otherngram in unrelLeft:
        ngramCounts[topic][otherngram]+=Counter({"O22":1})
  
  return ngramCounts

In [None]:
%%time
unigramCounts=getCountsTwo(relevantSetences, trainUnigrams, 1)
#bigramCounts=getCounts(relevantSetences, trainBigrams, 2)
#bigramCounts=getCounts(relevantSetences, trainTrigrams, 3)

In [None]:
a=set(range(20))
b=a.difference(set(range(10)))
print(b)

In [None]:
# Calculate -2log(lambda) scores , version 3
# n is the length of ngram, 
# get O11, O12, O21, and O22 counts, see Lin and Hovy paper
from scipy.stats import binom
def getCountsV3(relevantSents, ngramList, n):
  #break each sentence to the type of ngrams
  ngramCounts={}
  topiclist=list(relevantSents.keys())
  ngramSet=set(ngramList)
  sentNgram={topic:{"Related":[], "Unrelated":[]} for topic in topiclist}
  
  for topic, sentMap in relevantSents.items():
    #print("Setting corpus Default for topic {}".format(str(topic)))
    ngramCounts.setdefault(topic, {})
    for relevance, sentlist in sentMap.items():
      for sent in sentlist:
        sentNgram[topic][relevance].append(getTokens(sent, length=n,  stopwords=engstop))
        
    #print("Setting count Default for topic {}".format(str(topic)))
    for ngram in ngramList:
      ngramCounts[topic].setdefault("O11", Counter())
      ngramCounts[topic].setdefault("O12", Counter())
      ngramCounts[topic].setdefault("O21", Counter())
      ngramCounts[topic].setdefault("O22", Counter())
    
    
  for topic, releNgrams in sentNgram.items():
    #print("Calculating Signature Scores for topic {}".format(str(topic)))
    for relSent in releNgrams["Related"]:
      ngramCounts[topic]["O11"]+=Counter({ngram:1 for ngram in relSent})
      ngramleft=ngramSet.difference(set(relSent))
      ngramCounts[topic]["O21"]+=Counter({ngram:1 for ngram in ngramleft})
      
    for unrelSent in releNgrams["Unrelated"]:
      ngramCounts[topic]["O12"]+=Counter({ngram:1 for ngram in unrelSent})
      ngramleftUnrel=ngramSet.difference(set(unrelSent))
      ngramCounts[topic]["O22"]+=Counter({ngram:1 for ngram in ngramleftUnrel}) 
  
  return ngramCounts



In [None]:
%%time
## calculate the counts for unigrams
#unigramCounts=getCountsV3(relevantSetences, trainUnigrams, 1)
# Time : 42 mins

### Taking too long, Use Spark to calculate the counts

In [1]:
import pyspark 
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Lambda Score").master("spark://server1.capstonemly.com:7077").getOrCreate()
sc  = spark.sparkContext

In [3]:
import pickle
with open("../Results/relevantSetences.pk",'rb') as f:
  relevantSetences=pickle.load(f)

In [30]:
# use the object relevantSentences
# Define a function to put the topic and relevance with the sentences
# Output: list of sublist with each sublist being [single topic, sentence, relevance ]
def embedLabels(relSent):
  results=[]
  for topic, releSents in relSent.items():
    for relevance, labeledSents in releSents.items():
      for sentence in labeledSents:
        results.append((topic, sentence, relevance))
  return results

relSentTuples=embedLabels(relevantSetences)

In [31]:
relSentTuplesRDD=sc.parallelize(relSentTuples)
relSentTuplesRDD.first()

(('Other', 'Other Type'),
 u' How Does Disinformation Use My Information?',
 'Unrelated')

In [35]:
#Get bigram counts and scores
#define a function to release from each sentence a stream of ngram with topics and whether the token is relevant or not 
# Each item in the stream is (topic, token, Observation{O11, O12, O21, O22}, 1)
# From each sentence that are related, release the ngrams they have with the label "O11". 
# Also release all the other ngrams with the label "O21"

# From each sentence that are unrelated to the topic, release the ngram they have with the label "O12"
# Also release all the other ngrams not in the sentence with the label "O22"

# Define a funciton applied to each element of the relSentTuples
trainUnigramSet=set(trainUnigrams)
trainBigramSet=set(trainBigrams)
trainTrigramSet=set(trainTrigrams)


def streamNgrams(sentTuple, n, ngramSet):
  # define a label lookup mechanism to avoid if/else statements
  assignlabel={"In":{"Related":"O11", "Unrelated":"O12"},"Out":{"Related":"O21", "Unrelated":"O22"}}
  results=[]
  relevance=sentTuple[2]
  topic=sentTuple[0]
  ngramList=getTokens(sentTuple[1], length=n, stopwords=engstop)
  for ngram in ngramList:
    label=assignlabel["In"][relevance]
    results.append(((topic, ngram, label), 1))
    
  ngramsLeft=ngramSet.difference(set(ngramList))
  
  for otherngram in ngramsLeft:
    otherlabel=assignlabel["Out"][relevance]
    results.append(((topic, otherngram, otherlabel), 1))
      
  return results
  
  


#### Do a small test

In [73]:
## Test using one topic
relSentTuplesTest=[x for x in relSentTuples if x[0]==('Third Party Sharing/Collection', 'Does/Does Not')]
len(relSentTuplesTest)

relSentTuplesTestRDD=sc.parallelize(relSentTuplesTest)
testStream=relSentTuplesTestRDD.flatMap(lambda sentTuple: streamNgrams(sentTuple, 2, trainBigramSet))
testCounts=testStream.reduceByKey(lambda a, b:a+b)
testresult=testCounts.collect()

9282

### Run on whole dataset

In [36]:
## count for a;; bigrams
bigramStream=relSentTuplesRDD.flatMap(lambda sentTuple: streamNgrams(sentTuple, 2, trainBigramSet)).partitionBy(8000)


In [37]:
# For each (Topic, ngram, Counts), sum up the result
from operator import add
bigramCounts=bigramStream.reduceByKey(lambda a, b:a+b)

In [38]:
bigramCounts.first()

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job 2 cancelled 
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
	at org.apache.spark.scheduler.DAGScheduler.handleJobCancellation(DAGScheduler.scala:1375)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1622)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:441)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:606)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:745)


In [82]:
bigramCountsResult=bigramCounts.collect()

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job 13 cancelled 
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
	at org.apache.spark.scheduler.DAGScheduler.handleJobCancellation(DAGScheduler.scala:1375)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1622)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1958)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:935)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:934)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:453)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:606)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:745)
