In [1]:
import json
import nltk
from nltk.tokenize import RegexpTokenizer
import string
import math
import time
import pandas as pd
import pyspark
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.linalg import SparseVector
from pyspark.mllib.classification import SVMWithSGD, SVMModel
%matplotlib inline
import matplotlib.pyplot as plt
import matplotlib as mpl
import numpy as np
import random
from sklearn.metrics import coverage_error

In [2]:
IS_SAMPLE = False

In [3]:
STOP_WORDS = nltk.corpus.stopwords.words('english')
NUMBER_INDICATOR = "number_inidicator"
CURRENCY_INDICATOR = "currency_inidicator"
CHEMICAL_INDICATOR = "chemical_inidicator"
MIN_SIZE = 3
MIN_DOCUMENTS = 5
TOP_N_FEATURES = 10000

TEST_SET_PERCENTAGE = 0.2
VALIDATION_IN_TRAINING_PERCENTAGE = 0.2
MIN_DOCUMENTS_FOR_TEST = 1
MIN_DOCUMENTS_FOR_VALIDATION = 1

MIN_DOCUMENTS_FOR_TRAINING_SAMPLE = 10
MIN_DOCUMENTS_FOR_TEST_SAMPLE = 1
MIN_DOCUMENTS_FOR_VALIDATION_SAMPLE = 1

SVM_ITERATIONS = 1000
SVM_CONVERGENCE = 0.1
SVM_REG = 0.01

BM25_K = 1.5  # controls power of tf component
BM25_b = 0.75  # controls the BM25 length normalization

RANDOM_SEED = 10000

stemmer = nltk.stem.porter.PorterStemmer().stem

### Text Manipulation functions

In [4]:
def stemtokenizer(text, doc_id):
    """ MAIN FUNCTION to get clean stems out of a text. A list of clean stems are returned """
    tokenizer = RegexpTokenizer(r'\s+', gaps=True)
    tokens = tokenizer.tokenize(text)
    stems = []  # result
    previous_unigram = None
    for token in tokens:
        stem = token.lower()
        stem = stem.strip(string.punctuation)
        if stem:
            if is_number(stem):
                stem = NUMBER_INDICATOR
            elif is_currency(stem):
                stem = CURRENCY_INDICATOR
            elif is_chemical(stem):
                stem = CHEMICAL_INDICATOR
            elif is_stopword(stem):
                stem = None
            else:
                stem = stemmer(token)
                stem = stem.strip(string.punctuation)
            if stem and len(stem) >= MIN_SIZE:
                # extract uni-grams
                stems.append((stem,{doc_id: 1}))
                # extract bi-grams
                if previous_unigram: stems.append((previous_unigram + " " + stem,{doc_id: 1}))
                previous_unigram = stem
    del tokens
    return stems

def is_stopword(word):
  return word in STOP_WORDS

def is_number(str):
    """ Returns true if given string is a number (float or int)"""
    try:
        float(str.replace(",", ""))
        return True
    except ValueError:
        return False

def is_currency(str):
    return str[0] == "$"

def is_chemical(str):
    return str.count("-") > 3

### Training functions

In [5]:
def merge_postings(postings_list1, postings_list2):
    # key could be either a doc id or a term
    for key in postings_list2:
        if postings_list1.get(key):
            postings_list1[key] += postings_list2[key]
        else:
            postings_list1[key] = postings_list2[key]
    return postings_list1

def get_term_dictionary(terms):
    """
    Maps string terms to indexes in an array
    """
    term_dictionary = {}
    term_array = [None] * len(terms)
    def put(key):
        hashvalue = hashfunction(key, len(term_array))
        if term_array[hashvalue] == None:
            term_array[hashvalue] = key
            return hashvalue
        else:
            nextslot = rehash(hashvalue, len(term_array))
            while term_array[nextslot] != None:
                nextslot = rehash(nextslot, len(term_array))
            if term_array[nextslot] == None:
                term_array[nextslot] = key
                return nextslot
    def hashfunction(key, size):
        return hash(key) % size
    def rehash(oldhash, size):
        return (oldhash + 1) % size
    i = 0
    for term in terms:
        corresponding_index = put(term)
        term_dictionary[term] = corresponding_index
        i+=1
        if i%10000 == 0: print "finished " + str(i)
    return term_dictionary

def get_doc_index(term, postings_list, term_dictionary):
    #return [(doc_id, {term: postings_list[doc_id]}) for doc_id in postings_list]
    return [(doc_id, {term_dictionary[term]: postings_list[doc_id]}) for doc_id in postings_list]

def get_classes(ipc_classification):
    sections = []
    classes = []
    subclasses = []
    for classification in ipc_classification:
        # we do the check because some documents have repetitions
        section_name = classification['section']
        class_name = classification['section'] + "-" + classification['class']
        subclass_name = classification['section'] + "-" + classification['class'] + "-" + classification['subclass']
        if section_name not in sections:
            sections.append(section_name)
        if class_name not in classes:
            classes.append(class_name)
        if subclass_name not in subclasses:
            subclasses.append(subclass_name)
    return {"sections": sections, "classes": classes, "subclasses": subclasses}


def get_training_vector_old(classification, term_list, classifications, classification_key_name, number_of_terms):
    clss = 1 if classification in classifications[classification_key_name] else 0
    return LabeledPoint(clss, SparseVector(number_of_terms, term_list))

def get_training_vector(classification, term_list, classifications, number_of_terms):
    clss = 1 if classification in classifications else 0
    return LabeledPoint(clss, SparseVector(number_of_terms, term_list))


def calculate_sublinear_tf(tf):
    # laplace smoothing with +1 in case of term with no documents (useful during testing)
    return math.log10(1 + tf)


def calculate_tf_idf(tf, df, N):
    # laplace smoothing with +1 in case of term with no documents (useful during testing)
    return tf * math.log10((N+1) / (df + 1))


def calculate_bm25(tf, df, N, d_len, d_avg):
    idf = max(0, math.log10((N-df + 0.5)/(df+0.5))) # in rare cases where the df is over 50% of N, this could become -ve, so we guard against that
    tf_comp = float(((BM25_K + 1) * tf)) / ( BM25_K * ((1-BM25_b) + BM25_b*(float(d_len)/d_avg)) + tf)
    return tf_comp * idf


def calculate_rf(df_relevant, df_non_relevant):
    return math.log( (2 + (float(df_relevant)/max(1, df_non_relevant))), 2)


def calculate_tf_rf(tf, df_relevant, df_non_relevant):
    return tf * calculate_rf(df_relevant, df_non_relevant)


def compare_classifications(x,y):
    len_comp = cmp(len(x), len(y))
    if len_comp == 0:
        return cmp(x,y)
    return len_comp


def create_doc_index(term_index, term_dictionary):
    return term_index \
        .flatMap(lambda (term, postings_list): get_doc_index(term, postings_list, term_dictionary)) \
        .reduceByKey(lambda x, y: merge_postings(x, y))


def get_rf_stats(postings, classification):
    a_plus_c = set(postings.keys())
    a_plus_b = set(classifications_index[classification])
    # first intersection is to get (a), second difference is to get (c) (checkout tf-rf paper for reference)
    a = a_plus_c.intersection(a_plus_b)
    c = a_plus_c.difference(a_plus_b)
    size_a = len(a)
    size_c = len(c)
    return size_a, size_c


def get_rf_postings(classification):
    def get_rf_postings_internal(postings):
        size_a, size_c = get_rf_stats(postings, classification)
        return {docId: calculate_rf(size_a, size_c)
                for docId, tf in postings.items()}
    return get_rf_postings_internal


def get_tf_rf_postings(classification):
    def get_tf_rf_postings_internal(postings):
        size_a, size_c = get_rf_stats(postings, classification)
        return {docId: calculate_tf_rf(tf, size_a, size_c)
                for docId, tf in postings.items()}
    return get_tf_rf_postings_internal


def train_level_old(docs_with_classes, classification, classification_label):
    training_vectors = docs_with_classes.map(
        lambda (doc_id, (term_list, classifications)): get_training_vector_old(classification, term_list, classifications,
                                                                           classification_label, number_of_terms))
    svm = SVMWithSGD.train(training_vectors, iterations=SVM_ITERATIONS, convergenceTol=SVM_CONVERGENCE)
    return training_vectors, svm


def train_level(docs_with_classes, classification, number_of_terms):
    training_vectors = docs_with_classes.map(
        lambda (doc_id, (term_list, classifications)): get_training_vector(classification, term_list,
                                                                           classifications, number_of_terms))
    svm = SVMWithSGD.train(training_vectors, iterations=SVM_ITERATIONS, convergenceTol=SVM_CONVERGENCE, regParam=SVM_REG)
    return training_vectors, svm


def get_error(svm, test_vectors):
    labelsAndPreds = test_vectors.map(lambda p: (p.label, svm.predict(p.features)))
    trainErr = labelsAndPreds.filter(lambda (v, p): v != p).count() / float(test_vectors.count())
    return trainErr


def train_all(docs_with_classes):
    training_errors = {}
    for section in sections:
        training_vectors, svm = train_level(docs_with_classes, section, "sections")
        train_err = get_error(svm, training_vectors)
        training_errors[section] = train_err
    #
    with open(training_errors_output, 'w') as file:
        file.write(json.dumps(training_errors))
    #
    for clss in classes:
        training_vectors, svm = train_level(docs_with_classes, clss, "classes")
        train_err = get_error(svm, training_vectors)
        training_errors[clss] = train_err
    
    with open(training_errors_output, 'w') as file:
        file.write(json.dumps(training_errors))
    
    for subclass in subclasses:
        training_vectors, svm = train_level(docs_with_classes, subclass, "subclasses")
        train_err = get_error(svm, training_vectors)
        training_errors[subclass] = train_err
    return training_errors


def get_labeled_points_from_doc_index(doc_index, doc_classification_map, number_of_terms):
    docs_with_classes = doc_index.map(lambda (doc_id, terms): (doc_id, (terms, doc_classification_map[doc_id])))
    training_vectors = docs_with_classes.map(
        lambda (doc_id, (term_list, classifications)): get_training_vector(classification, term_list,
                                                                           classifications, number_of_terms))
    return training_vectors


class Evaluator:
    
    def __init__(self, labels, scores, threshold=0.5):
        self.threshold = 0
        self.count = len(self.labels)
        
        self.tp = 0
        self.fp = 0
        self.fn = 0
        self.tn = 0
        
        for (l,s) in zip(labels,scores):
            if self.is_true(l) and self.is_true(s):
                self.tp += 1
            if self.is_true(l) and not self.is_true(s):
                self.fn += 1
            if not self.is_true(l) and self.is_true(s):
                self.fp += 1
            if not self.is_true(l) and not self.is_true(s):
                self.tn += 1
        self.precision = self.get_precision()
        self.recall = self.get_precision()
        self.fa = self.get_f1()
        self.error_rate = self.get_error_rate()
        
    def calculate_contingency(self, label, contingency):
        
        self.tp = 0
        self.fp = 0
        self.fn = 0
        self.tn = 0
        
        for (l,s) in zip(labels,scores):
            if self.is_true(l) and self.is_true(s):
                self.tp += 1
            if self.is_true(l) and not self.is_true(s):
                self.fn += 1
            if not self.is_true(l) and self.is_true(s):
                self.fp += 1
            if not self.is_true(l) and not self.is_true(s):
                self.tn += 1
    
    def is_true(self, label):
        return label > self.threshold
    
    def get_error_rate(self):
        return float(self.tp + self.tn) / len(labels)
    
    def get_precision(self):
        # self.calculate_contingency()
        if self.tp == 0: return 0
        return float(self.tp) / (self.tp + self.fp)
        
    def get_recall(self):
        # self.calculate_contingency()
        if self.tp == 0: return 0
        return float(self.tp) / (self.tp + self.fn)
    
    def get_f1(self):
        return 2 * (self.get_precision() * self.get_recall()) / (self.get_precision() + self.get_recall())

### Input/Output directories

In [6]:
#sc = SparkContext("", "Generate Inverted Index Job")
es_server = "deka.cip.ifi.lmu.de"
es_port = "9200"

save_parent_location = "hdfs://deka.cip.ifi.lmu.de/svm/new/"
if IS_SAMPLE: 
    save_parent_location = save_parent_location + "sample/"

file_name = "sample.json"
test_file_name = "sample.json"
#url = "/media/Work/workspace/thesis/benchmark/output/" + file_name
sample_location = save_parent_location + file_name
sample_test_location = save_parent_location + test_file_name
postings_list_output = save_parent_location + "postings_list_full.json"
postings_list_chi_selected_output = save_parent_location + "postings_list_{}.json"
classification_index_output = save_parent_location + "classification_index.pkl"
doc_classification_map_output = save_parent_location + "doc_classification_map.pkl"
sections_output = save_parent_location + "sections.pkl"
classes_output = save_parent_location + "classes.pkl"
subclasses_output = save_parent_location + "subclasses.pkl"
classifications_output = save_parent_location + "classifications.pkl"
# training, validation and test set lists
training_docs_list_output = save_parent_location + "training_docs_list.pkl"
validation_docs_list_output = save_parent_location + "validation_docs_list.pkl"
test_docs_list_output = save_parent_location + "test_docs_list.pkl"
test_postings_list_output = save_parent_location + "test_postings_list_50000.json"
training_errors_output = save_parent_location + "training_errors.json"
model_output = save_parent_location + "models/" + "iter_" + str(SVM_ITERATIONS) + "_reg_" + str(SVM_REG) + "/"

In [7]:
def get_model_name(method, classification, reg=SVM_REG, no_of_features=TOP_N_FEATURES, iterations=SVM_ITERATIONS):
    return save_parent_location + "models/" + "iter_" + str(iterations) + "_reg_" + str(reg) + "/" + method + "_" + classification + "_model.svm"

### Document RDDs

In [8]:
%%time

read_conf = {
    'es.nodes': es_server,
    'es.port': es_port,
    'es.resource': 'patents3/patent',
    'es.query': '{ "query" : { "match_all" : {} }}',
    'es.scroll.keepalive': '120m',
    'es.scroll.size': '1000',
    'es.http.timeout': '20m'
}
data = sc.newAPIHadoopRDD(
    inputFormatClass = 'org.elasticsearch.hadoop.mr.EsInputFormat',
    keyClass = 'org.apache.hadoop.io.NullWritable', 
    valueClass = 'org.elasticsearch.hadoop.mr.LinkedMapWritable',
    conf = read_conf
)

#data = sc.textFile(sample_location)
#doc_count = data.count()
#doc_objs = data.persist(pyspark.StorageLevel.MEMORY_AND_DISK_SER)
doc_objs = data

CPU times: user 4 ms, sys: 4 ms, total: 8 ms
Wall time: 6.13 s


In [15]:
%%time
### doc_objs = data.map(lambda x: json.loads(x))

doc_class_map = doc_objs.map(lambda (doc_id, doc): (doc_id, get_classes(doc['classification-ipc']))).cache()
doc_classification_map = doc_class_map.map(lambda (doc_id, classification_obj): (doc_id, sorted(reduce(lambda x, lst: x + lst, classification_obj.values(), [])))).collectAsMap()
doc_count = len(doc_classification_map)
# contains [(classification,  list of docs)]
# second list comprehension is to get list of lists [["A", "B"],["A-01","B-03"]] to one list ["A", "B", "A-01","B-03"], we could have also used a reduce as in doc_classifications_map
classifications_index = doc_class_map.flatMap(lambda (doc_id, classifications_obj): [(classification, doc_id) for classification in [classif for cat in classifications_obj.values() for classif in cat]])\
    .groupByKey().map(lambda (classf, classf_docs): (classf, list(set(classf_docs)))).collectAsMap()

sections = sorted(doc_class_map.flatMap(lambda (doc_id, classifications): classifications['sections']).distinct().collect())
classes = sorted(doc_class_map.flatMap(lambda (doc_id, classifications): classifications['classes']).distinct().collect())
subclasses = sorted(doc_class_map.flatMap(lambda (doc_id, classifications): classifications['subclasses']).distinct().collect())
classifications = sorted(classifications_index.keys(), cmp=compare_classifications)
# classifications = sorted(set(reduce(lambda x, lst: x + lst, map(lambda doc_id: classifications_index[doc_id], classifications_index), [])))

CPU times: user 28.3 s, sys: 6.48 s, total: 34.8 s
Wall time: 15min 29s


#### Save classification objects

In [16]:
%%time
sc.parallelize(doc_classification_map.items()).saveAsPickleFile(doc_classification_map_output)
sc.parallelize(classifications_index.items()).saveAsPickleFile(classification_index_output)
sc.parallelize(sections).saveAsPickleFile(sections_output)
sc.parallelize(classes).saveAsPickleFile(classes_output)
sc.parallelize(subclasses).saveAsPickleFile(subclasses_output)
sc.parallelize(classifications).saveAsPickleFile(classifications_output)

CPU times: user 32.2 s, sys: 2.54 s, total: 34.8 s
Wall time: 1min 16s


#### Load Classification Objects

In [9]:
doc_classification_map = dict(sc.pickleFile(doc_classification_map_output).collect())
doc_count = len(doc_classification_map)
classifications_index = dict(sc.pickleFile(classification_index_output).collect())
sections = sc.pickleFile(sections_output).collect()
classes = sc.pickleFile(classes_output).collect()
subclasses = sc.pickleFile(subclasses_output).collect()
classifications = sc.pickleFile(classifications_output).collect()

In [10]:
doc_class_map.take(1)

NameError: name 'doc_class_map' is not defined

In [12]:
doc_count

2009750

In [13]:
classifications_index.items()[0]

(u'G-20-B', [u'07433566', u'07896523', u'06985663', u'07116477', u'07218441'])

In [14]:
doc_classification_map.items()[10]

(u'07007598', [u'B', u'B-30', u'B-30-B'])

In [15]:
sections

[u'A', u'B', u'C', u'D', u'E', u'F', u'G', u'H']

## Creating Training, Validation and Test Splits

In [37]:
# Get min number of documents for any classification
min = 1000
from collections import defaultdict
min_classf = defaultdict(list)
for (classf, documents) in classifications_index.items():
    if len(documents) == 2: 
        min = len(documents)
        min_classf[classf].append(min)
min_classf, min
        

(defaultdict(list,
             {u'A-02-D': [2],
              u'A-04-F': [2],
              u'A-04-J': [2],
              u'A-04-L': [2],
              u'A-04-Q': [2],
              u'A-06-B': [2],
              u'A-07-H': [2],
              u'A-11-B': [2],
              u'A-11-F': [2],
              u'A-11-K': [2],
              u'A-12-N': [2],
              u'A-13': [2],
              u'A-16-L': [2],
              u'A-18': [2],
              u'A-18-B': [2],
              u'A-22-L': [2],
              u'A-25': [2],
              u'A-26-D': [2],
              u'A-31-F': [2],
              u'A-37-D': [2],
              u'A-41-J': [2],
              u'A-42-F': [2],
              u'A-44-D': [2],
              u'A-45-G': [2],
              u'A-45-H': [2],
              u'A-45-K': [2],
              u'A-46-H': [2],
              u'A-51-B': [2],
              u'A-51-F': [2],
              u'A-60-B': [2],
              u'A-60-C': [2],
              u'A-60-F': [2],
              u'A-61-O': [2

In [35]:
len(min_classf)

760

In [20]:
len(classifications_index)

2235

In [None]:
training_documents = set()
validation_documents = set()
test_documents = set()
random.seed(RANDOM_SEED)
for (classf, documents) in classifications_index.items():
    # only worry about subclasses, classes and sections will be already included
    if(classf in sections or classf in classes): pass
    
    # remove any documents that have already been picked before
    docs_set = set(documents)
    docs_set-=training_documents
    docs_set-=validation_documents
    docs_set-=test_documents
    
    base_test_docs_num = int(len(docs_set)* TEST_SET_PERCENTAGE)
    num_test_docs = base_test_docs_num if base_test_docs_num > 0 else MIN_DOCUMENTS_FOR_TEST if MIN_DOCUMENTS_FOR_TEST < len(docs_set) else 0
    print len(docs_set), num_test_docs
    classif_test_docs = random.sample(docs_set, num_test_docs)
    
    remaining_docs = docs_set.difference(set(classif_test_docs))
    base_validation_docs_num = int(len(remaining_docs)* VALIDATION_IN_TRAINING_PERCENTAGE)
    num_validation_docs = base_validation_docs_num if base_validation_docs_num > 0 else MIN_DOCUMENTS_FOR_VALIDATION if MIN_DOCUMENTS_FOR_VALIDATION < len(remaining_docs) else 0
    classif_validation_docs = random.sample(remaining_docs, num_validation_docs)
    
    classif_training_docs = set(remaining_docs).difference(set(classif_validation_docs))
    
    training_documents.update(classif_training_docs)
    validation_documents.update(classif_validation_docs)
    test_documents.update(classif_test_docs)

#### Save the training, validation and test document lists

In [25]:
sc.parallelize(training_documents).saveAsPickleFile(training_docs_list_output)
sc.parallelize(validation_documents).saveAsPickleFile(validation_docs_list_output)
sc.parallelize(test_documents).saveAsPickleFile(test_docs_list_output)

#### Load the training, validation and test document lists

In [10]:
training_documents = sc.pickleFile(training_docs_list_output).collect()
validation_documents = sc.pickleFile(validation_docs_list_output).collect()
test_documents = sc.pickleFile(test_docs_list_output).collect()

In [14]:
len(set(validation_documents))

928631

### Section Distribution

In [None]:
for classif in sorted(classifications_index.keys()):
    if len(classif) == 1:
        print "%s : %d, %.3f" % (classif, len(set(classifications_index[classif])), float(len(classifications_index[classif]))/doc_count)

### Section Overlap

In [None]:
%%time
overlap_df = pd.DataFrame({section: [0]*len(sections) for section in sections} , index=sections, columns=sections)
for doc_id in doc_classification_map:
    for classif in doc_classification_map[doc_id]:
        if len(classif) == 1:
            for classif2 in doc_classification_map[doc_id]:
                if len(classif2) == 1:
                    overlap_df[classif][classif2] += 1
overlap_df

In [None]:
mpl.colors.Normalize(1,3)

In [None]:
overlap_df.values

In [None]:
fig = plt.figure(figsize=(16,8), dpi=120)
#ax = fig.add_subplot(111, frameon=True, xticks=[], yticks=[])
vals = overlap_df.values
normal = mpl.colors.Normalize()
normal = mpl.colors.Normalize(vals.min()-1, vals.max()+vals.max()/2)
formatter = lambda x: "{:,d}".format(int(x))

the_table=plt.table(cellText=np.vectorize(formatter)(vals), rowLabels=overlap_df.index, colLabels=overlap_df.columns, 
                    colWidths = [0.1]*(vals.shape[1]+3), loc='center',
                    cellColours=plt.cm.YlGn(normal(vals)))
the_table.set_fontsize(30)
the_table.scale(2, 4)
plt.axis("off")
plt.show()

### Create Postings List

In [9]:
%%time
# Create Postings List
postings_lists = doc_objs.flatMap(lambda (doc_id, doc): stemtokenizer(doc['description'], doc_id)).reduceByKey(lambda x,y: merge_postings(x,y))
### postings_lists = doc_objs.flatMap(lambda x: stemtokenizer(x['description'], x['id'])).reduceByKey(lambda x,y: merge_postings(x,y))
min_doc_postings_lists = postings_lists.filter(lambda (x,y): len(y) > MIN_DOCUMENTS)
#number_of_terms = min_doc_postings_lists.count()

CPU times: user 20 ms, sys: 4 ms, total: 24 ms
Wall time: 116 ms


### Load Postings List

In [10]:
# Load Postings Lists
#min_doc_postings_lists = sc.textFile(postings_list_output).map(lambda json_postings: json.loads(json_postings))

### Save Postings List

In [None]:
%%time
# Save Postings List
# min_doc_postings_lists.map(lambda (term, postings_list): ",".join([term, json.dumps(postings_list)])).repartition(1).saveAsTextFile(postings_list_output)
min_doc_postings_lists.map(lambda postings: json.dumps(postings)).saveAsTextFile(postings_list_output)

In [None]:
def get_chi_index(term_index, classifications_index, subclasses, number_of_docs):
    return term_index.map(lambda (term, postings_list): (term, calculate_chi_squared(postings_list.keys(), classifications_index, subclasses, number_of_docs)))

def calculate_chi_squared(document_list, classifications_index, subclasses, number_of_docs):
    chi_score = 0
    for subclass in subclasses:
        Nt1 = len(document_list) # actual collection frequency of having the word
        Nt0 = number_of_docs - len(document_list) # actual collection frequency of not having the word
        Pt1 = float(len(document_list))/ number_of_docs
        Pt0 = float(number_of_docs - len(document_list))/ number_of_docs
        Pc1 = float(len(classifications_index[subclass]))/ number_of_docs
        Et1c1 = Pt1 * Pc1 * number_of_docs # expected frequency of docs in subclass with term (assuming independence)
        Et0c1 = Pt0 * Pc1 * number_of_docs # expected frequency of docs in subclass without term (assuming independence)
        chi_score += math.pow( Nt1 - Et1c1, 2) / Et1c1 
        chi_score += math.pow( Nt0 - Et0c1, 2) / Et0c1
    return chi_score

In [None]:
term_accepted_chi_list = get_chi_index(min_doc_postings_lists, classifications_index, subclasses, doc_count).takeOrdered(TOP_N_FEATURES, lambda (term,score): -score)
term_accepted_chi_list = map(lambda (x,y): x, term_accepted_chi_list)

In [None]:
term_accepted_chi_list[:30]

#### Recreate term dictionary with just the accepted terms

In [None]:
# gets a bit slower at the end but finishes eventually 
term_dictionary = get_term_dictionary(term_accepted_chi_list)

In [None]:
min_doc_postings_lists = min_doc_postings_lists.filter(lambda (term, postings): term in term_accepted_chi_list).cache()

In [None]:
number_of_terms = min_doc_postings_lists.count()
number_of_terms

#### Save Reduced Postings List

In [None]:
# Save Postings List
# min_doc_postings_lists.map(lambda (term, postings_list): ",".join([term, json.dumps(postings_list)])).repartition(1).saveAsTextFile(postings_list_output)
min_doc_postings_lists.map(lambda postings: json.dumps(postings)).repartition(1).saveAsTextFile(postings_list_chi_selected_output.format(str(TOP_N_FEATURES)))

### Start creating term weighting postings

#### Create Training Set

In [None]:
%%time
tf_postings = min_doc_postings_lists
tf_doc_index = create_doc_index(tf_postings, term_dictionary).filter(lambda (doc_id, postings): doc_id in training_documents)

sublinear_tf_postings = tf_postings.mapValues(lambda postings: {docId:  calculate_sublinear_tf(tf) for docId, tf in postings.items()}).cache()
sublinear_tf_doc_index = create_doc_index(sublinear_tf_postings, term_dictionary).filter(lambda (doc_id, postings): doc_id in training_documents)

tf_idf_postings = tf_postings.mapValues(lambda postings: {docId:  calculate_tf_idf(tf, len(postings), doc_count) for docId, tf in postings.items()}).cache()
tf_id_doc_index = create_doc_index(tf_postings, term_dictionary).filter(lambda (doc_id, postings): doc_id in training_documents)

# need to collect the document lengths since they are used in the BM25 calculation
doc_lengths_rdd = tf_doc_index.mapValues(lambda postings_dictionary: reduce(lambda x, term: x + postings_dictionary[term], postings_dictionary, 0))
avg_doc_length = doc_lengths_rdd.map(lambda (term, count): count).reduce(lambda count1, count2: count1 + count2) / doc_count
doc_lengths_dict = doc_lengths_rdd.collectAsMap()

bm25_postings = tf_postings.mapValues(lambda postings: {docId: calculate_bm25(tf, len(postings), doc_count, doc_lengths_dict[docId], avg_doc_length) for docId, tf in postings.items()}).cache()
bm25_doc_index = create_doc_index(bm25_postings, term_dictionary).filter(lambda (doc_id, postings): doc_id in training_documents)

#### Create Validation Set

In [None]:
%%time
tf_doc_index_val = create_doc_index(tf_postings, term_dictionary).filter(lambda (doc_id, postings): doc_id in validation_documents)
sublinear_tf_doc_index_val = create_doc_index(sublinear_tf_postings, term_dictionary).filter(lambda (doc_id, postings): doc_id in validation_documents)
tf_id_doc_index_val = create_doc_index(tf_postings, term_dictionary).filter(lambda (doc_id, postings): doc_id in validation_documents)
bm25_doc_index_val = create_doc_index(bm25_postings, term_dictionary).filter(lambda (doc_id, postings): doc_id in validation_documents)

## Actual Training

In [None]:
training_evaluations = {}
validation_evaluations = {}

i=0
for section in sections:
    classification = section
    i+=1
    training_evaluations[classification] = {}
    validation_evaluations[classification] = {}
    representations_to_test = [("tf", tf_doc_index), ("tf-sublinear", sublinear_tf_doc_index), ("tf-idf", tf_id_doc_index), ("bm25", bm25_doc_index)]
    
    for name, doc_index in representations_to_test:
        docs_with_classes = doc_index.map(lambda (doc_id, terms): (doc_id, (terms, doc_classification_map[doc_id])))
        training_vectors, svm = train_level(docs_with_classes, classification, number_of_terms)
        svm.save(sc, get_model_name(name, classification))
        labels = training_vectors.map(lambda p: p.label).collect()
        predictions = training_vectors.map(lambda p: svm.predict(p.features)).collect()
        training_evaluations[classification][name] = Evaluator(labels, predictions)
        # validation
        validation_vectors = get_labeled_points_from_doc_index(doc_index, doc_classification_map, number_of_terms)
        labels_val = validation_vectors.map(lambda p: p.label).collect()
        predictions_val = validation_vectors.map(lambda p: svm.predict(p.features)).collect()
        validation_evaluations[classification][name] = Evaluator(labels_val, predictions_val)
    
    rf_postings = tf_postings.mapValues(get_rf_postings(classification))
    rf_doc_index = create_doc_index(rf_postings, term_dictionary).filter(lambda (doc_id, postings): doc_id in training_documents)
    docs_with_classes = rf_doc_index.map(lambda (doc_id, terms): (doc_id, (terms, doc_classification_map[doc_id])))
    training_vectors, svm = train_level(docs_with_classes, classification, number_of_terms)
    svm.save(sc, get_model_name("rf", classification))
    labels = training_vectors.map(lambda p: p.label).collect()
    predictions = training_vectors.map(lambda p: svm.predict(p.features)).collect()
    training_evaluations[classification]["rf"] = Evaluator(labels, predictions)
    # validation
    validation_vectors = get_labeled_points_from_doc_index(doc_index, doc_classification_map, number_of_terms)
    labels_val = validation_vectors.map(lambda p: p.label).collect()
    predictions_val = validation_vectors.map(lambda p: svm.predict(p.features)).collect()
    validation_evaluations[classification][name] = Evaluator(labels_val, predictions_val)
    
    
    tf_rf_postings = tf_postings.mapValues(get_tf_rf_postings(classification))
    tf_rf_doc_index = create_doc_index(tf_rf_postings, term_dictionary).filter(lambda (doc_id, postings): doc_id in training_documents)
    docs_with_classes = tf_rf_doc_index.map(lambda (doc_id, terms): (doc_id, (terms, doc_classification_map[doc_id])))
    training_vectors, svm = train_level(docs_with_classes, classification, number_of_terms)
    svm.save(sc, get_model_name("tf-rf", classification))
    labels = training_vectors.map(lambda p: p.label).collect()
    predictions = training_vectors.map(lambda p: svm.predict(p.features)).collect()
    training_evaluations[classification]["tf-rf"] = Evaluator(labels, predictions)
    # validation
    validation_vectors = get_labeled_points_from_doc_index(doc_index, doc_classification_map, number_of_terms)
    labels_val = validation_vectors.map(lambda p: p.label).collect()
    predictions_val = validation_vectors.map(lambda p: svm.predict(p.features)).collect()
    validation_evaluations[classification][name] = Evaluator(labels_val, predictions_val)

In [None]:
def get_coverage_error(test_labeled_points, classifications, method):
    test_labeled_points.cache()
    y_score = np.zeros(test_labeled_points.count(), len(classifications))
    y_true = np.zeros(test_labeled_points.count(), len(classifications))
    
    i = 0
    for classification in classifications:
        binarySvm = SVMModel.load(sc, get_model_name(method, classification))
        binarySvm.clearThreshold()
        predictions = test_labeled_points.map(lambda p: binarySvm.predict(p.features))
        labels = test_labeled_points.map(lambda p: p.labels)
        y_score[:][i] = predictions
        y_true[:][i] = labels
    return coverage_error(y_score, y_true)

## Testing

In [None]:
%%time
tf_doc_index_test = create_doc_index(tf_postings, term_dictionary).filter(lambda (doc_id, postings): doc_id in validation_documents)
sublinear_tf_doc_index_test = create_doc_index(sublinear_tf_postings, term_dictionary).filter(lambda (doc_id, postings): doc_id in validation_documents)
tf_id_doc_index_test = create_doc_index(tf_postings, term_dictionary).filter(lambda (doc_id, postings): doc_id in validation_documents)
bm25_doc_index_test = create_doc_index(bm25_postings, term_dictionary).filter(lambda (doc_id, postings): doc_id in validation_documents)

In [None]:
method = "bm25"
test_vectors = get_labeled_points_from_doc_index(bm25_doc_index_test, doc_classification_map, number_of_terms)
get_coverage_error(test_vectors, sections, method)