In [1]:
import sys
import re
import numpy as np

from numpy import dot
from numpy.linalg import norm

In [2]:
from pyspark import SparkContext

In [3]:
from pyspark.sql import SparkSession

In [4]:
# Set the file paths on your local machine
# Change this line later on your python script when you want to run this on the CLOUD (GC or AWS)

wikiPagesFile="WikipediaPagesOneDocPerLine1000LinesSmall.txt"
wikiCategoryFile="wiki-categorylinks-small.csv.bz2"


In [5]:
sc = SparkContext("local[*]", "Assignment2")

In [6]:
# Read two files into RDDs

wikiCategoryLinks=sc.textFile(wikiCategoryFile)

wikiCats=wikiCategoryLinks.map(lambda x: x.split(",")).map(lambda x: (x[0].replace('"', ''), x[1].replace('"', '') ))

# Now the wikipages

wikiPages = sc.textFile(wikiPagesFile)

wikiCategoryLinks.take(2)

['"434042","1987_debut_albums"', '"434042","Albums_produced_by_Mike_Varney"']

In [7]:
wikiCats.take(2)

[('434042', '1987_debut_albums'), ('434042', 'Albums_produced_by_Mike_Varney')]

In [8]:
spark = SparkSession.builder.appName('Assignment2').getOrCreate()

In [9]:
df = spark.read.csv(wikiPagesFile)

# Uncomment this line if you want to take look inside the file. 
# df.take(1)

In [6]:
# wikiPages.take(1)

In [10]:
# Assumption: Each document is stored in one line of the text file
# We need this count later ... 
numberOfDocs = wikiPages.count()

print(numberOfDocs)
# Each entry in validLines will be a line from the text file
validLines = wikiPages.filter(lambda x : 'id' in x and 'url=' in x)

# Now, we transform it into a set of (docID, text) pairs
keyAndText = validLines.map(lambda x : (x[x.index('id="') + 4 : x.index('" url=')], x[x.index('">') + 2:][:-6])) 

# keyAndText.take(1)

1000


In [27]:
def buildArray(listOfIndices):
    
    returnVal = np.zeros(20000)
    
    for index in listOfIndices:
        returnVal[index] = returnVal[index] + 1
    
    mysum = np.sum(returnVal)
    
    returnVal = np.divide(returnVal, mysum)
    
    return returnVal


def build_zero_one_array (listOfIndices):
    
    returnVal = np.zeros (20000)
    
    for index in listOfIndices:
        if returnVal[index] == 0: returnVal[index] = 1
    
    return returnVal


def stringVector(x):
    returnVal = str(x[0])
    for j in x[1]:
        returnVal += ',' + str(j)
    return returnVal



def cousinSim (x,y):
	normA = np.linalg.norm(x)
	normB = np.linalg.norm(y)
	return np.dot(x,y)/(normA*normB)

In [28]:
# Now, we transform it into a set of (docID, text) pairs
keyAndText = validLines.map(lambda x : (x[x.index('id="') + 4 : x.index('" url=')], x[x.index('">') + 2:][:-6]))

# Now, we split the text in each (docID, text) pair into a list of words
# After this step, we have a data set with
# (docID, ["word1", "word2", "word3", ...])
# We use a regular expression here to make
# sure that the program does not break down on some of the documents

regex = re.compile('[^a-zA-Z]')

# remove all non letter characters
keyAndListOfWords = keyAndText.map(lambda x : (str(x[0]), regex.sub(' ', x[1]).lower().split()))
# better solution here is to use NLTK tokenizer

In [63]:
# Now get the top 20,000 words... first change (docID, ["word1", "word2", "word3", ...])
# to ("word1", 1) ("word2", 1)...
allWords = keyAndListOfWords.flatMap(lambda x: x[1]).map(lambda x: (x, 1))
allWords.take(2)

[('black', 1), ('people', 1)]

In [71]:
# Now, count all of the words, giving us ("word1", 1433), ("word2", 3423423), etc.
allCounts = allWords.reduceByKey(lambda x,y: x+y)
allCounts.take(2)

[('at', 4810), ('two', 1848)]

In [84]:
# Get the top 20,000 words in a local array in a sorted format based on frequency
# If you want to run it on your laptio, it may a longer time for top 20k words. 
topWords = allCounts.top(20000, key=lambda x: x[1])
# 
print("Top Words in Corpus:", allCounts.top(10, key=lambda x: x[1]))

Top Words in Corpus: [('the', 74530), ('of', 34512), ('and', 28479), ('in', 27758), ('to', 22583), ('a', 21212), ('was', 12160), ('as', 8811), ('for', 8773), ('on', 8435)]


In [85]:
# We'll create a RDD that has a set of (word, dictNum) pairs
# start by creating an RDD that has the number 0 through 20000
# 20000 is the number of words that will be in our dictionary
topWordsK = sc.parallelize(range(20000))

# Now, we transform (0), (1), (2), ... to ("MostCommonWord", 1)
# ("NextMostCommon", 2), ...
# the number will be the spot in the dictionary used to tell us
# where the word is located
dictionary = topWordsK.map (lambda x : (topWords[x][0], x))

print("Word Postions in our Feature Matrix. Last 20 words in 20k positions: ", dictionary.top(20, lambda x : x[1]))

Word Postions in our Feature Matrix. Last 20 words in 20k positions:  [('quebecor', 19999), ('poten', 19998), ('kasada', 19997), ('yadnya', 19996), ('drift', 19995), ('iata', 19994), ('satire', 19993), ('expreso', 19992), ('olimpico', 19991), ('auxiliaries', 19990), ('tenses', 19989), ('petherick', 19988), ('stowe', 19987), ('infimum', 19986), ('parramatta', 19985), ('rimpac', 19984), ('hyderabad', 19983), ('cubes', 19982), ('meats', 19981), ('chaat', 19980)]


In [92]:
################### TASK 2  ##################

# Next, we get a RDD that has, for each (docID, ["word1", "word2", "word3", ...]),
# ("word1", docID), ("word2", docId), ...

allWordsWithDocID = keyAndListOfWords.flatMap(lambda x: ((j, x[0]) for j in x[1]))


In [116]:
# Now join and link them, to get a set of ("word1", (dictionaryPos, docID)) pairs
allDictionaryWords = dictionary.join(allWordsWithDocID)

In [117]:
allDictionaryWords.take(3)

[('of', (1, '431949')), ('of', (1, '431949')), ('of', (1, '431949'))]

In [122]:
# Now, we drop the actual word itself to get a set of (docID, dictionaryPos) pairs
justDocAndPos = allDictionaryWords.map(lambda x: (x[1][1],x[1][0]))
justDocAndPos.take(3)

[('431949', 1), ('431949', 1), ('431949', 1)]

In [124]:
# Now get a set of (docID, [dictionaryPos1, dictionaryPos2, dictionaryPos3...]) pairs
allDictionaryWordsInEachDoc = justDocAndPos.groupByKey()


# The following line this gets us a set of
# (docID,  [dictionaryPos1, dictionaryPos2, dictionaryPos3...]) pairs
# and converts the dictionary positions to a bag-of-words numpy array...
allDocsAsNumpyArrays = allDictionaryWordsInEachDoc.map(lambda x: (x[0], buildArray(x[1])))

print(allDocsAsNumpyArrays.take(3))

[('431971', array([0.08553655, 0.02488336, 0.05132193, ..., 0.        , 0.        ,
       0.        ])), ('431999', array([0.06479482, 0.03239741, 0.03887689, ..., 0.        , 0.        ,
       0.        ])), ('432000', array([0.07192043, 0.04475899, 0.02371844, ..., 0.        , 0.        ,
       0.        ]))]


In [139]:
# Now, create a version of allDocsAsNumpyArrays where, in the array,
# every entry is either zero or one.
# A zero means that the word does not occur,
# and a one means that it does.

zeroOrOne = allDocsAsNumpyArrays.map(lambda x: (x[0], 1*(x[1] > 0)))

# Now, add up all of those arrays into a single array, where the
# i^th entry tells us how many
# individual documents the i^th word in the dictionary appeared in
dfArray = zeroOrOne.reduce(lambda x1, x2: ("", np.add(x1[1], x2[1])))[1]
dfArray

array([919, 920, 869, ...,   1,   1,   1])

In [140]:
# Create an array of 20,000 entries, each entry with the value numberOfDocs (number of docs)
multiplier = np.full(20000, numberOfDocs)

In [149]:
# Get the version of dfArray where the i^th entry is the inverse-document frequency for the
# i^th word in the corpus
idfArray = np.log(np.divide(np.full(20000, numberOfDocs), dfArray))

# Finally, convert all of the tf vectors in allDocsAsNumpyArrays to tf * idf vectors
allDocsAsNumpyArraysTFidf = allDocsAsNumpyArrays.map(lambda x: (x[0], np.multiply(x[1], idfArray)))

allDocsAsNumpyArraysTFidf.take(2)

# use the buildArray function to build the feature array
# allDocsAsNumpyArrays = allDictionaryWordsInEachDoc.map(lambda x: (x[0], buildArray(x[1])))


# print(allDocsAsNumpyArraysTFidf.take(2))


[('431971',
  array([0.0072252 , 0.00207481, 0.00720622, ..., 0.        , 0.        ,
         0.        ])),
 ('431999',
  array([0.00547316, 0.00270135, 0.00545879, ..., 0.        , 0.        ,
         0.        ]))]

In [150]:
wikiCats.take(1)

[('434042', '1987_debut_albums')]

In [159]:
# Now, we join it with categories, and map it after join so that we have only the wikipageID 
# This join can take time on your laptop. 
# You can do the join once and generate a new wikiCats data and store it. Our WikiCategories includes all categories
# of wikipedia. 

featuresRDD = wikiCats.join(allDocsAsNumpyArraysTFidf).map(lambda x: (x[1][0], x[1][1]))

# Cache this important data because we need to run kNN on this data set. 
featuresRDD.cache()
featuresRDD.take(10)

[('Asteroid_spectral_classes',
  array([0.00674469, 0.00348744, 0.00373721, ..., 0.        , 0.        ,
         0.        ])),
 ('S-type_asteroids',
  array([0.00674469, 0.00348744, 0.00373721, ..., 0.        , 0.        ,
         0.        ])),
 ('All_stub_articles',
  array([0.00787168, 0.00328745, 0.00754904, ..., 0.        , 0.        ,
         0.        ])),
 ('Military_communications_of_the_United_States',
  array([0.00787168, 0.00328745, 0.00754904, ..., 0.        , 0.        ,
         0.        ])),
 ('United_States_Department_of_Defense_agencies',
  array([0.00787168, 0.00328745, 0.00754904, ..., 0.        , 0.        ,
         0.        ])),
 ('United_States_military_stubs',
  array([0.00787168, 0.00328745, 0.00754904, ..., 0.        , 0.        ,
         0.        ])),
 ('2004_European_Parliament_election',
  array([0.00711319, 0.0026331 , 0.00394139, ..., 0.        , 0.        ,
         0.        ])),
 ('All_articles_that_may_contain_original_research',
  array([0.0

In [161]:
# Let us count and see how large is this data set. 

#wikiAndCatsJoind.count()
featuresRDD.count()

#*** do you mean to check the size of featuresRDD instead of wikiAndCatsJoind?

13780

In [204]:
featuresRDD.take(5)

[('Asteroid_spectral_classes',
  array([0.00674469, 0.00348744, 0.00373721, ..., 0.        , 0.        ,
         0.        ])),
 ('S-type_asteroids',
  array([0.00674469, 0.00348744, 0.00373721, ..., 0.        , 0.        ,
         0.        ])),
 ('All_stub_articles',
  array([0.00787168, 0.00328745, 0.00754904, ..., 0.        , 0.        ,
         0.        ])),
 ('Military_communications_of_the_United_States',
  array([0.00787168, 0.00328745, 0.00754904, ..., 0.        , 0.        ,
         0.        ])),
 ('United_States_Department_of_Defense_agencies',
  array([0.00787168, 0.00328745, 0.00754904, ..., 0.        , 0.        ,
         0.        ]))]

In [164]:
# Finally, we have a function that returns the prediction for the label of a string, using a kNN algorithm
def getPrediction (textInput, k):
    # Create an RDD out of the textIput
    myDoc = sc.parallelize (('', textInput))

    # Flat map the text to (word, 1) pair for each word in the doc
    wordsInThatDoc = myDoc.flatMap (lambda x : ((j, 1) for j in regex.sub(' ', x).lower().split()))

    # This will give us a set of (word, (dictionaryPos, 1)) pairs
    allDictionaryWordsInThatDoc = dictionary.join (wordsInThatDoc).map (lambda x: (x[1][1], x[1][0])).groupByKey ()

    # Get tf array for the input string
    myArray = buildArray (allDictionaryWordsInThatDoc.top (1)[0][1])

    # Get the tf * idf array for the input string
    myArray = np.multiply (myArray, idfArray)

    # Get the distance from the input text string to all database documents, using cosine similarity (np.dot() )
    distances = featuresRDD.map (lambda x : (x[0], np.dot (x[1], myArray)))
    # distances = allDocsAsNumpyArraysTFidf.map (lambda x : (x[0], cousinSim (x[1],myArray)))
    # get the top k distances
    topK = distances.top (k, lambda x : x[1])
    
    # and transform the top k distances into a set of (docID, 1) pairs
    docIDRepresented = sc.parallelize(topK).map (lambda x : (x[0], 1))

    # now, for each docID, get the count of the number of times this document ID appeared in the top k
    numTimes = docIDRepresented.reduceByKey(lambda x,y: x+y)
    
    # Return the top 1 of them.
    # Ask yourself: Why we are using twice top() operation here?
    return numTimes.top(k, lambda x: x[1])

In [21]:
print(getPrediction('Sport Basketball Volleyball Soccer', 10))

[('1931_births', 1), ('All_disambiguation_pages', 1), ('2015_deaths', 1), ('Disambiguation_pages_with_short_description', 1), ('Bullfighters', 1), ("Air_Force_Falcons_men's_basketball_coaches", 1), ('Human_name_disambiguation_pages', 1), ('All_article_disambiguation_pages', 1), ('Lists_of_sportspeople_by_sport', 1), ('All_articles_with_dead_external_links', 1)]


In [166]:
print(getPrediction('What is the capital city of Australia?', 10))

[('All_Wikipedia_articles_written_in_Australian_English', 2), ('All_set_index_articles', 2), ('Articles_with_short_description', 2), ('Royal_Australian_Navy_ship_names', 1), ('Set_indices_on_ships', 1), ('Use_Australian_English_from_April_2018', 1), ('Use_dmy_dates_from_April_2018', 1)]


In [167]:
print(getPrediction('How many goals Vancouver score last year?', 10))

[('All_stub_articles', 2), ('CBC_Television_shows', 1), ('1979_births', 1), ('1990s_Canadian_teen_drama_television_series', 1), ('1991_Canadian_television_series_debuts', 1), ('1994_Canadian_television_series_endings', 1), ('Canadian_television_program_stubs', 1), ('Television_shows_set_in_Vancouver', 1), ('Ak_Bars_Kazan_players', 1)]


In [168]:
# Congradulations, you have implemented a prediction system based on Wikipedia data. 
# You can use this system to generate automated Tags or Categories for any kind of text 
# that you put in your query.
# This data model can predict categories for any input text. 

## Task 3: Using Dataframes

### Task 3.1

In [177]:
df2 = spark.read.csv(wikiCategoryFile)

In [183]:
df2.columns

['_c0', '_c1']

In [194]:
# number of categories used for each wikipage
df2.groupBy('_c0').count().show()

+------+-----+
|   _c0|count|
+------+-----+
|455070|   20|
|434061|    8|
|455037|    4|
|455790|    9|
|456150|   14|
|429120|    3|
|429107|   15|
|433861|   15|
|434000|   17|
|429031|   34|
|418448|    6|
|456178|    4|
|429164|   40|
|455361|   12|
|433488|   32|
|455962|   10|
|433664|   23|
|432147|   48|
|429110|   14|
|432184|   10|
+------+-----+
only showing top 20 rows



In [195]:
df2.groupBy('_c0').count().select("count").describe().show()

+-------+------------------+
|summary|             count|
+-------+------------------+
|  count|               999|
|   mean|13.793793793793794|
| stddev| 11.06201871439371|
|    min|                 1|
|    max|                76|
+-------+------------------+



### Task 3.2

In [203]:
# 10 mostly used wikipedia categories
df2.groupBy('_c1').count().sort("count", ascending = False).show(10)

+--------------------+-----+
|                 _c1|count|
+--------------------+-----+
|Articles_with_sho...|  273|
|All_articles_with...|  222|
|Wikipedia_article...|  212|
|Wikipedia_article...|  212|
|Wikipedia_article...|  196|
|Commons_category_...|  178|
|Wikipedia_article...|  157|
|Wikipedia_article...|  155|
|All_articles_need...|  154|
|Webarchive_templa...|  140|
+--------------------+-----+
only showing top 10 rows



## Task 4: Removing Stop Words, do Stemming and redo the task 2 

### Task 4.1 - Remove Stop Words

In general, we could possibly have different prediction results if removing the stop words. However, I don't think the result will change heavily. Yes, the stop words may appear more frequenctly in a document. At first, we did measure how frequenctly a word appears in a document. However, we eventually change it to 0 and 1. A zero means that the word does not occur, and a one means that it does. Therefore, even the stop words appear many times for a particularly document, we only count it 1, means it does appear in the document. It has the same weight as a word that appear only one time in the document. Therefore, removing the stop words do not bring a huge impact in this case. 

### Task 4.2 - Do English word stemming

Yes. I think stemming will change the result heavily. For example, if the input text has a word "games". In the dictionary, we only has the word "game" not "games". In Task2, the function considers the word "game" never appears in the input text. It has big impact than previous situation. In previous situation, a stop word may appear many times but we still count it to appear one time. But if a word consider not appear at all, it will not take into account in distance calculation toward a particular category. 

In [None]:
sc.stop()