## Input Data
### Data: 
- [Reuters-21578 Text Categorization Collection Data Set] from UCI Machine Learning Repository
- It contains 21,578 news articles from Reuters in 1987
- Available at: https://archive.ics.uci.edu/ml/datasets/reuters-21578+text+categorization+collection 

### Format:
- News: 21 SGML files
    - We only deal with news contents inside <body> </body> tags
- The other files will not be needed in this homework


1. <b>Shingling</b>: Converts a document into a set representation (Boolean vector)

2. <b>Min-Hashing</b>: Convert large sets to short signatures, while preserving similarity

3. <b>Locality-Sensitive Hashing</b>: Focus on pairs of signatures likely to be from similar documents. Candidate pairs!

Filtering sgm files using python library sgmllib for parsing process make to to documents

In [1]:
#list of dataset in my directory
data_path = "/home/twster/Spark/Projects/datasets/reuters21578"
%ls $data_path

[0m[01;32mall-exchanges-strings.lc.txt[0m*        [01;32mreut2-002.sgm[0m*  [01;32mreut2-013.sgm[0m*
[01;32mall-orgs-strings.lc.txt[0m*             [01;32mreut2-003.sgm[0m*  [01;32mreut2-014.sgm[0m*
[01;32mall-people-strings.lc.txt[0m*           [01;32mreut2-004.sgm[0m*  [01;32mreut2-015.sgm[0m*
[01;32mall-places-strings.lc.txt[0m*           [01;32mreut2-005.sgm[0m*  [01;32mreut2-016.sgm[0m*
[01;32mall-topics-strings.lc.txt[0m*           [01;32mreut2-006.sgm[0m*  [01;32mreut2-017.sgm[0m*
[01;32mcat-descriptions_120396.txt[0m*         [01;32mreut2-007.sgm[0m*  [01;32mreut2-018.sgm[0m*
[01;32mfeldman-cia-worldfactbook-data.txt[0m*  [01;32mreut2-008.sgm[0m*  [01;32mreut2-019.sgm[0m*
[01;32mlewis.dtd[0m*                           [01;32mreut2-009.sgm[0m*  [01;32mreut2-020.sgm[0m*
[01;32mREADME.txt[0m*                          [01;32mreut2-010.sgm[0m*  [01;32mreut2-021.sgm[0m*
[01;32mreut2-000.sgm[0m*                       [

### Parsing sgml files

function for parsing sgml files using sgmllib python library

In [2]:
import sgmllib
import fnmatch
import os
import re
import binascii

In [3]:
def _not_in_sphinx():
    
    return '__file__' in globals()
class ReutersParser(sgmllib.SGMLParser):

    """Utility class to parse a SGML file and yield documents one at a time."""

    def __init__(self, verbose=0):
        sgmllib.SGMLParser.__init__(self, verbose)
        self._reset()

    def _reset(self):
        self.in_title = 0
        self.in_body = 0
        self.in_topics = 0
        self.in_topic_d = 0
        self.title = ""
        self.body = ""
        self.topics = []
        self.topic_d = ""

    def parse(self, fd):
        self.docs = []
        for chunk in fd:
            self.feed(chunk)
            for doc in self.docs:
                yield doc
            self.docs = []
        self.close()

    def handle_data(self, data):
        if self.in_body:
            self.body += data
        elif self.in_title:
            self.title += data
        elif self.in_topic_d:
            self.topic_d += data

    def start_reuters(self, attributes):
        pass

    def end_reuters(self):
        self.body = re.sub(r'\s+', r' ', self.body)
        self.docs.append({'title': self.title,
                          'body': self.body,
                          'topics': self.topics})
        self._reset()

    def start_title(self, attributes):
        self.in_title = 1

    def end_title(self):
        self.in_title = 0

    def start_body(self, attributes):
        self.in_body = 1

    def end_body(self):
        self.in_body = 0

    def start_topics(self, attributes):
        self.in_topics = 1

    def end_topics(self):
        self.in_topics = 0

    def start_d(self, attributes):
        self.in_topic_d = 1

    def end_d(self):
        self.in_topic_d = 0
        self.topics.append(self.topic_d)
        self.topic_d = ""

In [4]:
class ReutersStreamReader():
    def iterdocs(self):
        """Iterate doc by doc, yield a dict."""
        for root, _dirnames, filenames in os.walk(data_path):
            for filename in fnmatch.filter(filenames, '*.sgm'):
                path = os.path.join(root, filename)
                parser = ReutersParser()
                for doc in parser.parse(open(path)):
                    yield doc

call function iterdoc

In [5]:
data_streamer = ReutersStreamReader().iterdocs()

### Pyspark libraries

In [6]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext

In [7]:
conf = SparkConf().setMaster("spark://sparklab-master:7077").setAppName("HW#3")
sc = SparkContext.getOrCreate(conf=conf)
sqlContext=SQLContext(sc)
sc

make parallelize from dataset.

In [8]:
data = sc.parallelize(data_streamer)

show dataset : 'title', 'topics', 'body'

In [9]:
data.take(10)

[{'body': 'Media group John Fairfax Ltd <FFXA.S> said that its flat first half net profit partly reflected the impact of changes in the Australian tax system. Fairfax earlier reported net earnings edged up 2.3 pct to 25.94 mln dlrs in the 26 weeks ended December 28 from 25.35 mln a year earlier although pre-tax profit rose 9.1 pct to 48.30 mln from 44.29 mln. Net would have risen 10.1 pct but for the increase in company tax to 49 pct from 46 and the imposition of the tax on fringe benefits, paid by employers and not the recipients, the company said in a statement. Fairfax also pointed to the cyclical downturn in revenue growth in the television industry as another reason for the flat first half earnings. It said it considered the result satisfactory in view of these factors. Fairfax said its flagship dailies, The Sydney Morning Herald and the Melbourne Age, boosted advertising volume, as did the Australian Financial Review, and posted extremely satisfactory performances. Magazines also

taking body files from dataset

In [10]:
#RDD from reuters dataset
DataBody = data.map(lambda x: (x['body']).replace("\x03",""))
#Dataframe from reuters dataset
df = DataBody.map(lambda x: (x,)).toDF().withColumnRenamed("_1", "reuters")

In [25]:
len(list(DataBody.collect()))

21578

## (1) Given the Reuters-21578 dataset, please calculate all k-shingles and output the set representation of the text dataset as a matrix.

In [11]:
def getShingles(line):
    k=5
    BodyText = ' '.join([line[:-1].strip()])
    BodyText = re.sub(' +', ' ', BodyText)  # remove double spaces
    # get all k-shingles and return their hash codes
    shingles = set()
    L = len(BodyText)
    for i in xrange(L-k+1):
        shingle = BodyText[i:i+k]
        crc = binascii.crc32(shingle) & 0xffffffff  #hash the shingle to a 32-bit integer
        shingles.add(crc)
    return shingles

In [12]:
shinglesBodyRDD = DataBody.map(getShingles)
#Save output to TextFiles
shinglesBodyRDD.saveAsTextFile("output/HW#4/Q1/KShinglesBodyText")

## (2) Given the set representation, compute the minhash signatures of all documents using MapReduce.

In [13]:
from pyspark.ml.feature import Tokenizer, CountVectorizer

#Tokenizer
tokens = Tokenizer(inputCol="reuters", outputCol="flat_output")
dfWithTokenizer = tokens.transform(df)

In [14]:
#CountVectorizer
dfCV = CountVectorizer(inputCol="flat_output",outputCol="features")
model = dfCV.fit(dfWithTokenizer)
dfCVector = model.transform(dfWithTokenizer)

In [15]:
from pyspark.ml.feature import MinHashLSH
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col

#feature Transformation
mh = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=5)
model = mh.fit(dfCVector)
dfCVector.show()

+--------------------+--------------------+--------------------+
|             reuters|         flat_output|            features|
+--------------------+--------------------+--------------------+
|Media group John ...|[media, group, jo...|(104121,[0,1,2,3,...|
|The Bank of Franc...|[the, bank, of, f...|(104121,[0,1,2,3,...|
|Shr 6.56p vs 50.3...|[shr, 6.56p, vs, ...|(104121,[0,1,2,3,...|
|                    |                  []| (104121,[96],[1.0])|
|                    |                  []| (104121,[96],[1.0])|
|                    |                  []| (104121,[96],[1.0])|
|                    |                  []| (104121,[96],[1.0])|
|Shr 14.58p vs 7.8...|[shr, 14.58p, vs,...|(104121,[1,3,8,10...|
|                    |                  []| (104121,[96],[1.0])|
|                    |                  []| (104121,[96],[1.0])|
|The Ministry of I...|[the, ministry, o...|(104121,[0,1,2,3,...|
|Clearing bank ste...|[clearing, bank, ...|(104121,[0,1,2,3,...|
|Unemployment in t...|[un

In [16]:
MinHashSignatures = model.transform(dfCVector)
MinHashSignatures.show()

+--------------------+--------------------+--------------------+--------------------+
|             reuters|         flat_output|            features|              hashes|
+--------------------+--------------------+--------------------+--------------------+
|Media group John ...|[media, group, jo...|(104121,[0,1,2,3,...|[[-2.025105913E9]...|
|The Bank of Franc...|[the, bank, of, f...|(104121,[0,1,2,3,...|[[-2.016181972E9]...|
|Shr 6.56p vs 50.3...|[shr, 6.56p, vs, ...|(104121,[0,1,2,3,...|[[-2.024735297E9]...|
|                    |                  []| (104121,[96],[1.0])|[[-3.52343011E8],...|
|                    |                  []| (104121,[96],[1.0])|[[-3.52343011E8],...|
|                    |                  []| (104121,[96],[1.0])|[[-3.52343011E8],...|
|                    |                  []| (104121,[96],[1.0])|[[-3.52343011E8],...|
|Shr 14.58p vs 7.8...|[shr, 14.58p, vs,...|(104121,[1,3,8,10...|[[-1.982054458E9]...|
|                    |                  []| (104121,[9

In [17]:
MinHashSignatures.select("hashes").rdd.flatMap(lambda x: x['hashes']).saveAsTextFile("output/HW#4/Q2/Text")

## (3) Implement the LSH algorithm by MapReduce and output the resulting candidate pairs of similar documents. 

In [22]:
model.approxSimilarityJoin(dfCVector, dfCVector, 0.6, distCol="JaccardDistance").filter("JaccardDistance != 0").\
    select(col("datasetA").alias("ReutersA"),
           col("datasetB").alias("ReutersB"),
           col("JaccardDistance")).show()

+--------------------+--------------------+-------------------+
|            ReutersA|            ReutersB|    JaccardDistance|
+--------------------+--------------------+-------------------+
|[Shr six cts vs s...|[Qtly div 47-1/2 ...| 0.5555555555555556|
|[3250 tonnes main...|[2650 tonnes main...| 0.5454545454545454|
|[The Bank of Engl...|[The Bank of Engl...| 0.5151515151515151|
|[Shr profit three...|[Shr 11 cts vs th...| 0.5263157894736843|
|[Qtly div 11 cts ...|[Qtly div nine ct...|0.47058823529411764|
|[Zayre Corp said ...|[Best Products Co...| 0.5853658536585367|
|[Shr 32 cts vs 32...|[Shr 25 cts vs 25...|0.47058823529411764|
|[DIst nine cts vs...|[Qtly div three c...| 0.5882352941176471|
|[Qtly div 10 cts ...|[Qtly div 15 cts ...| 0.4117647058823529|
|[Qtly div 10 cts ...|[Qtly div 15 cts ...|                0.4|
|[Qtly div 10 cts ...|[Qtly div 18 cts ...|             0.4375|
|[Qtly div 10 cts ...|[Qtly div 41.5 ct...|             0.4375|
|[Qtly div 10 cts ...|[Qtly div eight c.

## (4) Implement K-nearest neighbor (KNN) search using LSH and compare its performance with linear search.

In [36]:
print("Approximately searching dfCVector for 100 nearest neighbors of the key:")
vocabSize = 104121
key = Vectors.sparse(vocabSize, [1, 3], [1.0, 1.0])
model.approxNearestNeighbors(dfCVector, key, 100).show()

Approximately searching dfCVector for 100 nearest neighbors of the key:
+--------------------+--------------------+--------------------+--------------------+------------------+
|             reuters|         flat_output|            features|              hashes|           distCol|
+--------------------+--------------------+--------------------+--------------------+------------------+
|Thera-Care Inc sa...|[thera-care, inc,...|(104121,[1,2,3,4,...|[[-1.691699495E9]...|0.8947368421052632|
|National Distille...|[national, distil...|(104121,[1,2,3,6,...|[[-1.893870267E9]...|               0.9|
|Key Centurion Ban...|[key, centurion, ...|(104121,[0,1,3,6,...|[[-1.933129776E9]...|               0.9|
|Dataproducts Corp...|[dataproducts, co...|(104121,[1,2,3,6,...|[[-1.253971669E9]...|               0.9|
|Alcide Corp said ...|[alcide, corp, sa...|(104121,[0,1,3,6,...|[[-1.358720322E9]...|0.9047619047619048|
|<TelWatch Inc> sa...|[<telwatch, inc>,...|(104121,[1,3,6,9,...|[[-1.673110381E9]...|0.9

In [35]:
model.approxNearestNeighbors(dfCVector, key, 100).select("reuters","distCol").coalesce(1).write.format("csv").options(header="true").save("output/HW#4/Q4/"+"KNN")