# Title: Search Engine Beta 1.0
<p><b>Abstract:</b> Search engine build using Reuters data set and PySpark</p>
<p><b>Authors:</b> Uriel Antonio & Ernesto Louie Cortez</p>
<p><b>Date:</b>    05/24/2016</p>

In [42]:
from xml.etree import ElementTree
import re
from StringIO import StringIO
from bs4 import BeautifulSoup
import os 
import pandas as pd
from collections import Counter
from bokeh.plotting import output_notebook, show
from bokeh.charts import Scatter
import findspark
import os
findspark.init('/home/ubuntu/workspace/spark-1.6.0-bin-hadoop2.6')
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-csv_2.10:1.3.0 pyspark-shell'

import pyspark
try: 
    print(sc)
except NameError:
    sc = pyspark.SparkContext()
    print(sc)

from pyspark import SparkContext
from pyspark.mllib.feature import HashingTF
from pyspark.mllib.feature import IDF

<pyspark.context.SparkContext object at 0x7f716ccea390>


## Data Cleaning

<p><b>Abstract: </b>Reuters data is organized in a generalized markup format.  Using BeatifulSoup, articles are scraped for their Title, content, date and place acording to their respective markup tags</p>

<p><b>Challenges: </b>1) We initially made the assumption that all articles would have a date, place, title, and body.  During alpha 1 stage, it was found that titles and content became mismatched approximately 30 articles in.  Conditional statements were created to insert placeholders when content was missing.  2) The cleaned data was also littered with mark up tags for special symbols that also needed to be cleaned.    </p>


<p><b></b></p>

In [43]:
totstring=""

with open("Data/reut2-000.sgm",'r') as inF:
    for line in inF:
        string2=re.sub("&.*?>","",line,flags=re.UNICODE)
        string3=re.sub("\n"," ",string2,flags=re.UNICODE)
        string=re.sub("[^0-9a-zA-Z<>/\s=!-\"\"]+","",string3.lower())
        totstring+=string
    
soup= BeautifulSoup(totstring)

items_date=list()
items_places=list()
items_title=list()
items_body=list()


for a in soup.findAll("reuters"):
    if a.date != None:
        items_date.append(a.date.getText())
    else:
        items_date.append("N/D")
    if a.places != None:
        items_places.append(a.places.getText()) 
    else:
        items_places.append("N/L")
    if a.title != None:
        items_title.append(a.title.getText())  
    else:
        items_title.append("Untitled")
    if a.content != None:
        items_body.append(a.content.getText())
    else:
        items_body.append("No Content.")

## Resilitent Distributed Datasets

<p>Conversion to RDD format by zipping a numbered index list with the list of titles above.</p>

In [44]:
mylist = (x for x in range(1, len(items_title)+1))
rdd = sc.parallelize(zip(mylist, items_title))
print(rdd)

rdd.take(10)

ParallelCollectionRDD[129] at parallelize at PythonRDD.scala:423


[(1, u'bahia cocoa review'),
 (2, u'standard oil  to form financial unit'),
 (3, u'texas commerce bancshares  files plan'),
 (4, u'talking point/bankamerica  equity offer'),
 (5, u'national average prices for farmerowned reserve'),
 (6, u'argentine 1986/87 grain/oilseed registrations'),
 (7, u'red lion inns files plans offering'),
 (8, u'usx  debt dowgraded by moodys'),
 (9, u'champion products  approves stock split'),
 (10, u'computer terminal systems  completes sale')]

## Inverted Index

<p>Created the inverted idex using flatMap() function.</p>

In [45]:
index = rdd.flatMap(lambda row : [(word, row[0]) for word in row[1].split()] )#\
           # .groupByKey() \
           # .map(lambda x : (x[0], list(x[1]))) 
index.take(10)

[(u'bahia', 1),
 (u'cocoa', 1),
 (u'review', 1),
 (u'standard', 2),
 (u'oil', 2),
 (u'to', 2),
 (u'form', 2),
 (u'financial', 2),
 (u'unit', 2),
 (u'texas', 3)]

## TF-IDF

<p><b>Abstract: </b>Using HashingTF() and IDF() functions, the index is transformed into TF-IDF.</p>

<p><b>Challenges: </b> Unresolved issues are implementing querying on TFIDF RDD.</p>

In [58]:
hashingTF = HashingTF()
tf = hashingTF.transform(index)
tf.cache()
idf = IDF().fit(tf)
tfidf = idf.transform(tf)

tfidf.take(15)

[SparseVector(1048576, {1: 7.3886, 911486: 8.0818}),
 SparseVector(1048576, {1: 7.3886, 278288: 7.6763}),
 SparseVector(1048576, {1: 7.3886, 650410: 7.6763}),
 SparseVector(1048576, {2: 6.829, 991951: 6.9832}),
 SparseVector(1048576, {2: 6.829, 871875: 5.7304}),
 SparseVector(1048576, {2: 6.829, 725041: 3.4519}),
 SparseVector(1048576, {2: 6.829, 968518: 7.1655}),
 SparseVector(1048576, {2: 6.829, 295556: 6.1359}),
 SparseVector(1048576, {2: 6.829, 98590: 4.8629}),
 SparseVector(1048576, {3: 6.9832, 838206: 7.1655}),
 SparseVector(1048576, {3: 6.9832, 1017185: 8.0818}),
 SparseVector(1048576, {3: 6.9832, 428756: 7.6763}),
 SparseVector(1048576, {3: 6.9832, 751424: 6.6955}),
 SparseVector(1048576, {3: 6.9832, 168917: 6.4723}),
 SparseVector(1048576, {4: 7.1655, 58257: 7.6763})]

In [61]:

def get_results_tfidf(qry, idx_body, n):
    score = Counter()
    for term in qry.collect():
        if term in idx_body:
            i = math.log(float(n)/(1+len(idx_body[term])))
            for doc in idx_body[term]:
                score[doc] += idx_body[term][doc] * i
                
    results=[]
    for x in [[r[0],r[1]] for r in zip(score.keys(), score.values())]:
        if x[1] > 0:
            results.append([x[1],x[0]])
    
    sorted_results = sorted(results, key=lambda t: t[0] * -1 )
    #type(score2)
    #print(score2)
    return sorted_results

results = get_results_tfidf('japan temp talk', tfidf, 10)

print(results)
#print_results(results,10)

AttributeError: 'str' object has no attribute 'collect'