In [57]:
pip install pyspark

Note: you may need to restart the kernel to use updated packages.


Stops any previously running SparkContext (only one SparkContext can run at a time).
Configures a new Spark application to run locally and names it "Lab".
Starts a new SparkContext that allows you to create RDDs and execute Spark jobs


In [59]:
from pyspark import SparkConf, SparkContext

if 'sc' in locals():
    sc.stop()
    
config = SparkConf().setMaster('local').setAppName('Lab')
sc = SparkContext(conf=config)

Spark method reads the file and creates an RDD, where each line of the file becomes an element in the RDD.

In [61]:
wikidocs = sc.textFile('/Users/csuftitan/Desktop/avd DB/WikipediaPagesOneDocPerLine1000LinesSmall.txt')
wikicategories = sc.textFile('/Users/csuftitan/Desktop/avd DB/wiki-categorylinks-small.csv.bz2')

wikicategories RDD to create a key-value RDD where the key is the Wikipedia page title and the value is the associated category

In [63]:
wikiCats=wikicategories.map(lambda x: x.split(",")).map(lambda x: (x[0].replace('"', ''), x[1].replace('"', '') ))

# 1. Generating 20K dictionary

first 3 documents from the wikidocs RDD

In [66]:
wikidocs.take(3)

                                                                                

['<doc id="431949" url="https://en.wikipedia.org/wiki?curid=431949" title="Black people and Mormonism">Black people and MormonismWhile at least two black men held the priesthood in the early church, from the mid-1800s until 1978, The Church of Jesus Christ of Latter-day Saints (LDS Church) had a policy which prevented most men of black African descent from being ordained to the church\'s lay priesthood. Under the temple and priesthood restrictions before 1978, black members of African descent could not receive the priesthood or participate in temple ordinances besides baptisms for the dead. For a time in the 1960s and 1970s, they were not allowed to perform baptisms for the dead either. For young men and men in the LDS church, holding the priesthood is required to hold leadership roles, perform baptisms, to bless the sacrament, to bless babies and to bless the sick. Since black men could not hold the priesthood, they were excluded from holding leadership roles and performing these ritu

Returning all elements from the wikicategories RDD as a list.

In [68]:
wikicategories.collect()

['"434042","1987_debut_albums"',
 '"434042","Albums_produced_by_Mike_Varney"',
 '"434042","Articles_with_hAudio_microformats"',
 '"434042","Articles_with_short_description"',
 '"434042","CS1_German-language_sources_(de)"',
 '"434042","Cacophony_(band)_albums"',
 '"434042","Jason_Becker_albums"',
 '"434042","Marty_Friedman_albums"',
 '"434042","Shrapnel_Records_albums"',
 '"455070","All_articles_needing_additional_references"',
 '"455070","All_articles_with_dead_external_links"',
 '"455070","Articles_needing_additional_references_from_February_2011"',
 '"455070","Articles_with_OS_grid_coordinates"',
 '"455070","Articles_with_dead_external_links_from_November_2016"',
 '"455070","Articles_with_dead_external_links_from_September_2017"',
 '"455070","Articles_with_permanently_dead_external_links"',
 '"455070","Borough_of_Broxbourne"',
 '"455070","Cheshunt"',
 '"455070","Commons_category_link_is_on_Wikidata"',
 '"455070","Coordinates_on_Wikidata"',
 '"455070","Towns_in_Hertfordshire"',
 '"455

 Filtering the wikidocs RDD to keep only the documents (lines) that contain both 'id=' and 'url='

In [70]:
validDocs = wikidocs.filter(lambda x: 'id=' in x and 'url=' in x)

Processing each document in the validDocs RDD to extract two pieces of information key-value pairs where key= id and value=text-content from each document.

In [72]:
keyAndText = validDocs.map(lambda x: (x[x.index('id="') + 4 : x.index('" url=')], x[x.index('">') + 2:][:-6]))

Taking first 3 documents

In [74]:
keyAndText.take(3)

[('431949',
  'Black people and MormonismWhile at least two black men held the priesthood in the early church, from the mid-1800s until 1978, The Church of Jesus Christ of Latter-day Saints (LDS Church) had a policy which prevented most men of black African descent from being ordained to the church\'s lay priesthood. Under the temple and priesthood restrictions before 1978, black members of African descent could not receive the priesthood or participate in temple ordinances besides baptisms for the dead. For a time in the 1960s and 1970s, they were not allowed to perform baptisms for the dead either. For young men and men in the LDS church, holding the priesthood is required to hold leadership roles, perform baptisms, to bless the sacrament, to bless babies and to bless the sick. Since black men could not hold the priesthood, they were excluded from holding leadership roles and performing these rituals. Temple ordinances are necessary for members to receive the endowment and marriage s

Cleaning basically Removing non-alphabetic special characters, punctuation, and numberslike  and Converting to lowercase and splitting into words

In [76]:
import re
regex = re.compile('[^a-zA-Z]')

keyAndListOfWords = keyAndText.map(lambda x : (str(x[0]), regex.sub(' ', x[1]).lower().split()))

In [77]:
keyAndListOfWords.take(2)

[('431949',
  ['black',
   'people',
   'and',
   'mormonismwhile',
   'at',
   'least',
   'two',
   'black',
   'men',
   'held',
   'the',
   'priesthood',
   'in',
   'the',
   'early',
   'church',
   'from',
   'the',
   'mid',
   's',
   'until',
   'the',
   'church',
   'of',
   'jesus',
   'christ',
   'of',
   'latter',
   'day',
   'saints',
   'lds',
   'church',
   'had',
   'a',
   'policy',
   'which',
   'prevented',
   'most',
   'men',
   'of',
   'black',
   'african',
   'descent',
   'from',
   'being',
   'ordained',
   'to',
   'the',
   'church',
   's',
   'lay',
   'priesthood',
   'under',
   'the',
   'temple',
   'and',
   'priesthood',
   'restrictions',
   'before',
   'black',
   'members',
   'of',
   'african',
   'descent',
   'could',
   'not',
   'receive',
   'the',
   'priesthood',
   'or',
   'participate',
   'in',
   'temple',
   'ordinances',
   'besides',
   'baptisms',
   'for',
   'the',
   'dead',
   'for',
   'a',
   'time',
   'in',
   

Counting the total frequency of each word across all documents in the RDD

In [79]:
allCounts = keyAndListOfWords.flatMap(lambda r: r[1]).map(lambda word: (word,1) ).reduceByKey(lambda a,b: a+b)

Extracting the top 20,000 most frequent words from the RDD

In [81]:
topWords = allCounts.top(20000, lambda x:x[1])

                                                                                

Converting into tuples

In [83]:
print("Top Words in Corpus: \n", allCounts.top(10, key=lambda x: x[1]))

Top Words in Corpus: 
 [('the', 74530), ('of', 34512), ('and', 28479), ('in', 27758), ('to', 22583), ('a', 21212), ('was', 12160), ('as', 8811), ('for', 8773), ('on', 8435)]


In [84]:
topWordsK = sc.parallelize(range(20000))

In [85]:
dictionary = topWordsK.map (lambda x : (topWords[x][0], x))

dictionary.take(5)

[('the', 0), ('of', 1), ('and', 2), ('in', 3), ('to', 4)]

In [86]:
print("Word Postions in our Feature Matrix. Last 20 words in 20k positions: \n", dictionary.top(20, lambda x : x[1]))

Word Postions in our Feature Matrix. Last 20 words in 20k positions: 
 [('azzam', 19999), ('zawahiri', 19998), ('sadat', 19997), ('jahiliyyah', 19996), ('alim', 19995), ('osama', 19994), ('xenocide', 19993), ('buggers', 19992), ('mayhem', 19991), ('mbb', 19990), ('rlv', 19989), ('hystp', 19988), ('whiteknighttwo', 19987), ('composites', 19986), ('payloads', 19985), ('silbervogel', 19984), ('hotol', 19983), ('vthl', 19982), ('choisy', 19981), ('gisela', 19980)]


In [87]:
import numpy as np
from numpy import dot
from numpy.linalg import norm

def buildArray(listOfIndices):
    
    returnVal = np.zeros(20000)
    
    for index in listOfIndices:
        returnVal[index] = returnVal[index] + 1
    
    mysum = np.sum(returnVal)
    
    returnVal = np.divide(returnVal, mysum)
    
    return returnVal


def build_zero_one_array (listOfIndices):
    
    returnVal = np.zeros (20000)
    
    for index in listOfIndices:
        if returnVal[index] == 0: returnVal[index] = 1
    
    return returnVal


def stringVector(x):
    returnVal = str(x[0])
    for j in x[1]:
        returnVal += ',' + str(j)
    return returnVal



def cousinSim (x,y):
	normA = np.linalg.norm(x)
	normB = np.linalg.norm(y)
	return np.dot(x,y)/(normA*normB)

Creating an RDD where each word from all documents is paired with its respective document ID

In [89]:
allWordsWithDocID = keyAndListOfWords.flatMap(lambda x: ((j, x[0]) for j in x[1]))
allWordsWithDocID.take(5)

[('black', '431949'),
 ('people', '431949'),
 ('and', '431949'),
 ('mormonismwhile', '431949'),
 ('at', '431949')]

Combine information from two datasets: one containing dictionary-like data about words and another linking words to document IDs

In [91]:
allDictionaryWords = dictionary.join(allWordsWithDocID)
allDictionaryWords.take(3)

                                                                                

[('of', (1, '431949')), ('of', (1, '431949')), ('of', (1, '431949'))]

In [92]:
justDocAndPos = allDictionaryWords.map(lambda r: (r[1][1], r[1][0]))

justDocAndPos.take(5)

[('431949', 1), ('431949', 1), ('431949', 1), ('431949', 1), ('431949', 1)]

In [93]:
allDictionaryWordsInEachDoc = justDocAndPos.groupByKey()

In [94]:
allDictionaryWordsInEachDoc.take(3)

                                                                                

[('431949', <pyspark.resultiterable.ResultIterable at 0x125064d70>),
 ('431959', <pyspark.resultiterable.ResultIterable at 0x1351feea0>),
 ('431968', <pyspark.resultiterable.ResultIterable at 0x1351ffe00>)]

In [95]:
allDocsAsNumpyArrays = allDictionaryWordsInEachDoc.map(lambda x: (x[0], np.array(list(x[1])))  )

In [96]:
allDocsAsNumpyArrays.take(3)

[('431949', array([    1,     1,     1, ..., 16561, 16562, 16563])),
 ('431959', array([    1,     1,     1, ..., 16586, 16590, 16591])),
 ('431968',
  array([    1,     1,     1,     1,     1,     1,     1,     1,     1,
             1,     1,     1,     1,     1,     1,     1,     1,     1,
             1,     1,     1,     1,     1,     1,     1,     1,     1,
             2,     2,     2,     2,     2,     2,     2,     2,     2,
             2,     2,     2,     2,     2,     4,     4,     4,     4,
             4,     4,     4,     4,     4,     4,     4,     4,     4,
             4,     4,     4,     4,     4,     4,     4,     7,     7,
             7,     7,     7,     7,     7,     8,     8,     8,    12,
            12,    12,    12,    12,    12,    12,    12,    12,    13,
            13,    13,    16,    16,    19,    19,    19,    23,    23,
            23,    23,    23,    23,    26,    26,    26,    26,    26,
            33,    34,    34,    38,    44,    44,    49, 

# 2. Create the TF-IDF Array

In [98]:
allDocsAsNumpyArrays = allDictionaryWordsInEachDoc.map(lambda x: (x[0], buildArray(x[1])))

print(allDocsAsNumpyArrays.take(3))

[('431949', array([0.07558791, 0.02900336, 0.02329227, ..., 0.        , 0.        ,
       0.        ])), ('431959', array([0.08685662, 0.04136029, 0.03584559, ..., 0.        , 0.        ,
       0.        ])), ('431968', array([0.07207207, 0.04864865, 0.02522523, ..., 0.        , 0.        ,
       0.        ]))]


In [99]:
# Now, create a version of allDocsAsNumpyArrays where, in the array,
# every entry is either zero or one.
# A zero means that the word does not occur,
# and a one means that it does.
zeroOrOne = allDocsAsNumpyArrays.mapValues(lambda x: np.array([0 if i==0 else 1 for i in x ]))

In [100]:
# Now, add up all of those arrays into a single array, where the
# i^th entry tells us how many
# individual documents the i^th word in the dictionary appeared in
dfArray = zeroOrOne.reduce(lambda x1, x2: ("", np.add(x1[1], x2[1])))[1]

                                                                                

In [101]:
dfArray

array([919, 920, 869, ...,   2,   1,   1])

In [102]:
# Create an array of 20,000 entries, each entry with the value numberOfDocs (number of docs)
numberOfDocs = wikidocs.count()

multiplier = np.full(20000, numberOfDocs)


In [103]:
multiplier

array([1000, 1000, 1000, ..., 1000, 1000, 1000])

In [104]:
# Get the version of dfArray where the i^th entry is the inverse-document frequency for the
# i^th word in the corpus
idfArray = np.log(np.divide(np.full(20000, numberOfDocs), dfArray) )

In [105]:
idfArray

array([0.08446916, 0.08338161, 0.14041215, ..., 6.2146081 , 6.90775528,
       6.90775528])

In [106]:
# Finally, convert all of the tf vectors in allDocsAsNumpyArrays to tf * idf vectors
allDocsAsNumpyArraysTFidf = allDocsAsNumpyArrays.map(lambda x: (x[0], np.multiply(x[1], idfArray)))

allDocsAsNumpyArraysTFidf.take(2)

[('431949',
  array([0.00638485, 0.00241835, 0.00327052, ..., 0.        , 0.        ,
         0.        ])),
 ('431959',
  array([0.00733671, 0.00344869, 0.00503316, ..., 0.        , 0.        ,
         0.        ]))]

# 3. Implement the getPrediction function

In [108]:
wikiCats.take(5)

[('434042', '1987_debut_albums'),
 ('434042', 'Albums_produced_by_Mike_Varney'),
 ('434042', 'Articles_with_hAudio_microformats'),
 ('434042', 'Articles_with_short_description'),
 ('434042', 'CS1_German-language_sources_(de)')]

In [109]:
# Now, we join it with categories, and map it after join so that we have only the wikipageID 
# This joun can take time on your laptop. 
# You can do the join once and generate a new wikiCats data and store it. Our WikiCategories includes all categories
# of wikipedia. 

featuresRDD = wikiCats.join(allDocsAsNumpyArraysTFidf).map(lambda r: (r[1][0], r[1][1]) )

# Cache this important data because we need to run kNN on this data set
featuresRDD.cache()

featuresRDD.take(10)

24/12/13 13:48:31 WARN BlockManager: Task 27 already completed, not releasing lock for rdd_44_0
                                                                                

[('All_articles_with_unsourced_statements',
  array([0.00619833, 0.00281363, 0.00376037, ..., 0.        , 0.        ,
         0.        ])),
 ('Articles_with_inconsistent_citation_formats',
  array([0.00619833, 0.00281363, 0.00376037, ..., 0.        , 0.        ,
         0.        ])),
 ('Articles_with_unsourced_statements_from_February_2013',
  array([0.00619833, 0.00281363, 0.00376037, ..., 0.        , 0.        ,
         0.        ])),
 ('CS1:_long_volume_value',
  array([0.00619833, 0.00281363, 0.00376037, ..., 0.        , 0.        ,
         0.        ])),
 ('CS1_maint:_archived_copy_as_title',
  array([0.00619833, 0.00281363, 0.00376037, ..., 0.        , 0.        ,
         0.        ])),
 ('Continued_fractions',
  array([0.00619833, 0.00281363, 0.00376037, ..., 0.        , 0.        ,
         0.        ])),
 ('Gamma_and_related_functions',
  array([0.00619833, 0.00281363, 0.00376037, ..., 0.        , 0.        ,
         0.        ])),
 ('All_articles_with_unsourced_statem

In [110]:
featuresRDD.count()

                                                                                

13780

In [111]:
def getPrediction(textInput, k):
    # Create an RDD out of the textIput
    myDoc = sc.parallelize(('', textInput))

    # Flat map the text to (word, 1) pair for each word in the doc
    wordsInThatDoc = myDoc.flatMap(lambda x : ((j, 1) for j in regex.sub(' ', x).lower().split()))

    # This will give us a set of (word, (dictionaryPos, 1)) pairs
    allDictionaryWordsInThatDoc = dictionary.join(wordsInThatDoc).map(lambda x: (x[1][1], x[1][0])).groupByKey()

    # Get tf array for the input string
    myArray = buildArray(allDictionaryWordsInThatDoc.top(1)[0][1])

    # Get the tf * idf array for the input string
    myArray = np.multiply(myArray, idfArray) #idf of training data

    # Get the distance from the input text string to all database documents, using cosine similarity (np.dot() )
    distances = featuresRDD.map(lambda x : (x[0], np.dot(x[1], myArray)))
    # distances = allDocsAsNumpyArraysTFidf.map (lambda x : (x[0], cousinSim (x[1],myArray)))
    # get the top k distances
    topK = distances.top(k, lambda x : x[1])
    
    # and transform the top k distances into a set of (docID, 1) pairs
    docIDRepresented = sc.parallelize(topK).map(lambda x : (x[0], 1))

    # now, for each docID, get the count of the number of times this document ID appeared in the top k
    numTimes = docIDRepresented.reduceByKey(lambda a,b: a+b)
    
    # Return the top 1 of them.
    # Ask yourself: Why we are using twice top() operation here?
    return numTimes.top(k, lambda x: x[1])

In [112]:
print(getPrediction('Sport Basketball Volleyball Soccer', 10))

                                                                                

[('All_article_disambiguation_pages', 1), ('All_disambiguation_pages', 1), ('Disambiguation_pages_with_short_description', 1), ('Human_name_disambiguation_pages', 1), ('Bullfighters', 1), ('Lists_of_sportspeople_by_sport', 1), ('1931_births', 1), ('2015_deaths', 1), ("Air_Force_Falcons_men's_basketball_coaches", 1), ('All_articles_with_dead_external_links', 1)]


In [113]:
print(getPrediction('What is the capital city of Australia?', 10))

                                                                                

[('All_Wikipedia_articles_written_in_Australian_English', 2), ('All_set_index_articles', 2), ('Articles_with_short_description', 2), ('Royal_Australian_Navy_ship_names', 1), ('Set_indices_on_ships', 1), ('Use_Australian_English_from_April_2018', 1), ('Use_dmy_dates_from_April_2018', 1)]


In [114]:
print(getPrediction('How many goals Vancouver score last year?', 10))

                                                                                

[('All_stub_articles', 2), ('1990s_Canadian_teen_drama_television_series', 1), ('1991_Canadian_television_series_debuts', 1), ('1994_Canadian_television_series_endings', 1), ('CBC_Television_shows', 1), ('Canadian_television_program_stubs', 1), ('Television_shows_set_in_Vancouver', 1), ('1979_births', 1), ('Ak_Bars_Kazan_players', 1)]


# 4. Implement the code using Dataframes

In [116]:
import pyspark
from pyspark.sql import SparkSession

In [117]:
spark = SparkSession.builder.appName('With Dataframe').getOrCreate()

In [118]:
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)

In [119]:
df =spark.read.text('/Users/csuftitan/Desktop/avd DB/WikipediaPagesOneDocPerLine1000LinesSmall.txt')

In [120]:
df.show(10)
df.printSchema()

+--------------------+
|               value|
+--------------------+
|<doc id="431949" ...|
|<doc id="431952" ...|
|<doc id="431953" ...|
|<doc id="431959" ...|
|<doc id="431962" ...|
|<doc id="431968" ...|
|<doc id="431969" ...|
|<doc id="431970" ...|
|<doc id="431971" ...|
|<doc id="431972" ...|
+--------------------+
only showing top 10 rows

root
 |-- value: string (nullable = true)



In [121]:
df.columns

['value']

In [122]:
from pyspark.sql.functions import regexp_extract, split, explode, col, lower, regexp_replace

In [123]:
doc_start_pattern = r'<doc id="(\d+)"'
doc_end_pattern = r'</doc>'

In [124]:
df_with_id = df.withColumn("id", regexp_extract(col("value"), doc_start_pattern, 1))

In [125]:
df_with_id_text = df_with_id.withColumn('text', 
                           regexp_replace(col('value'), r'<doc[^>]*>|</doc>', ''))

In [126]:
df_idAndText = df_with_id_text.drop('value')

In [127]:
df_idAndText.show(10)

+------+--------------------+
|    id|                text|
+------+--------------------+
|431949|Black people and ...|
|431952|Norwegian Militar...|
|431953|Doug Harvey (ice ...|
|431959|Staten Island Gre...|
|431962|USS Dale (DLG-19)...|
|431968|Cyclic guanosine ...|
|431969|Touch (disambigua...|
|431970|USS DaleUSS "Dale...|
|431971|Touch (manga)Touc...|
|431972|Touch (Unix)touch...|
+------+--------------------+
only showing top 10 rows



In [128]:
df = df_idAndText

In [129]:
df.count() # no. of rows

1000

In [130]:
len(df.columns) # no. of columns

2

In [131]:
df_cleaned = df.withColumn('cleaned_text', 
                           regexp_replace(lower(col('text')), r'[^a-z\s]', '')) \
               .withColumn('words', split(col('cleaned_text'), '\s+'))

# Step 3: Explode the words column to have one word per row
df_words = df_cleaned.select(explode(col('words')).alias('word'))

# Step 4: Filter out empty strings and null values (remove non-words)
df_filtered = df_words.filter(col('word') != "")

# Step 5: Group by the word and count the frequency
word_counts = df_filtered.groupBy('word').count().orderBy(col('count').desc())

# Step 6: Get the top 20,000 most frequent words
top_20k_words_df = word_counts.limit(20000)

# Step 7: Collect the top words and save them in an array
# top_20k_words = top_20k_words_df.select('word').rdd.flatMap(lambda x: x).collect()

# # Step 8: Display the top 10 most frequent words (optional)
# print(top_20k_words[:10])

  .withColumn('words', split(col('cleaned_text'), '\s+'))


In [132]:
df_cleaned

id,text,cleaned_text,words
431949,Black people and ...,black people and ...,"[black, people, a..."
431952,Norwegian Militar...,norwegian militar...,"[norwegian, milit..."
431953,Doug Harvey (ice ...,doug harvey ice h...,"[doug, harvey, ic..."
431959,Staten Island Gre...,staten island gre...,"[staten, island, ..."
431962,USS Dale (DLG-19)...,uss dale dlguss d...,"[uss, dale, dlgus..."
431968,Cyclic guanosine ...,cyclic guanosine ...,"[cyclic, guanosin..."
431969,Touch (disambigua...,touch disambiguat...,"[touch, disambigu..."
431970,"USS DaleUSS ""Dale...",uss daleuss dale ...,"[uss, daleuss, da..."
431971,Touch (manga)Touc...,touch mangatouch ...,"[touch, mangatouc..."
431972,Touch (Unix)touch...,touch unixtouch i...,"[touch, unixtouch..."


In [133]:
top_20k_words_df.show(10)

[Stage 74:>                                                         (0 + 1) / 1]

+----+-----+
|word|count|
+----+-----+
| the|72168|
|  of|34447|
| and|28353|
|  in|26124|
|  to|22458|
|   a|20583|
| was|12146|
| for| 8677|
|  as| 8642|
|  is| 8377|
+----+-----+
only showing top 10 rows



                                                                                

In [134]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, col, expr

# Create a window specification for row numbering
window_spec = Window.orderBy(col('count').desc())

# Add an 'index' column with row numbers, starting from 0
top_20k_words_df_with_index = top_20k_words_df.withColumn('index', (row_number().over(window_spec) - 1))

# Show the result
top_20k_words_df_with_index.show(10)


24/12/13 13:49:02 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/13 13:49:02 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/13 13:49:02 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/13 13:49:04 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/13 13:49:04 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/13 13:49:04 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+----+-----+-----+
|word|count|index|
+----+-----+-----+
| the|72168|    0|
|  of|34447|    1|
| and|28353|    2|
|  in|26124|    3|
|  to|22458|    4|
|   a|20583|    5|
| was|12146|    6|
| for| 8677|    7|
|  as| 8642|    8|
|  is| 8377|    9|
+----+-----+-----+
only showing top 10 rows



                                                                                

In [135]:
top_20k = top_20k_words_df_with_index.drop('count')

In [136]:
top_20k.show(5)

24/12/13 13:49:06 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/13 13:49:06 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/13 13:49:06 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/13 13:49:08 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/13 13:49:08 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/13 13:49:08 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+----+-----+
|word|index|
+----+-----+
| the|    0|
|  of|    1|
| and|    2|
|  in|    3|
|  to|    4|
+----+-----+
only showing top 5 rows



                                                                                

In [137]:
get_idx_top20k = lambda w: top_20k.filter(col('word')==w).head()[1]

In [138]:
get_idx_top20k('the')

24/12/13 13:49:09 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/13 13:49:09 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/13 13:49:09 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/13 13:49:12 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/13 13:49:12 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/13 13:49:12 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
          

0

In [139]:
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- text: string (nullable = true)



In [140]:
df = df.withColumn("id", col("id").cast("integer"))

In [141]:
df.filter(col('id')==431949).text

Column<'text'>

In [142]:
top_20k_words_df_with_index.printSchema()

root
 |-- word: string (nullable = false)
 |-- count: long (nullable = false)
 |-- index: integer (nullable = false)



In [143]:
top_20k_words_dict = {row['word']: row['index'] for row in top_20k_words_df_with_index.collect()}

24/12/13 13:49:13 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/13 13:49:13 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/13 13:49:13 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/13 13:49:15 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/13 13:49:15 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/13 13:49:15 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
          

In [144]:
df_tokenized = df_cleaned.drop('cleaned_text','text')

In [145]:
df_tokenized.show(10)

+------+--------------------+
|    id|               words|
+------+--------------------+
|431949|[black, people, a...|
|431952|[norwegian, milit...|
|431953|[doug, harvey, ic...|
|431959|[staten, island, ...|
|431962|[uss, dale, dlgus...|
|431968|[cyclic, guanosin...|
|431969|[touch, disambigu...|
|431970|[uss, daleuss, da...|
|431971|[touch, mangatouc...|
|431972|[touch, unixtouch...|
+------+--------------------+
only showing top 10 rows



In [146]:
## TF array of each document
import numpy as np
from pyspark.sql import functions as F
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, IntegerType, FloatType

# # Define a UDF to map words to their positions in the Top 20K dictionary
# def make_tf_array(words):
    
#     positions = [top_20k_words_dict[word] for word in words if word in top_20k_words_dict]

#     tf = [0] * 20000
#     for pos in positions:
#         tf[pos] = tf[pos] + 1
        
#     # Sum of all frequencies
#     total_frequency = sum(tf)  

#     if total_frequency > 0:  # Avoid division by zero
#         tf = [freq/total_frequency for freq in tf]

#     return tf

# # Register the UDF
# tf_udf = udf(make_tf_array, ArrayType(FloatType()))

In [147]:
def make_tf_array(words):
    # Initialize numpy array with zeros for 20,000 elements
    tf = np.zeros(20000)

    # Update the tf array based on the positions in top_20k_words_dict
    for word in words:
        if word in top_20k_words_dict:
            pos = top_20k_words_dict[word]
            tf[pos] += 1

    # Normalize by total frequency (sum of all counts)
    total_frequency = np.sum(tf)
    
    if total_frequency > 0:  # Avo≠≠id division by zero
        tf = tf / total_frequency

    # Return as list for Spark compatibility
    return tf.tolist()

# Register the UDF
tf_udf = udf(make_tf_array, ArrayType(FloatType()))

In [148]:
df_tf = df_tokenized.select('id', tf_udf('words').alias('tf_array'))

In [149]:
df_tf.printSchema()

root
 |-- id: string (nullable = true)
 |-- tf_array: array (nullable = true)
 |    |-- element: float (containsNull = true)



In [150]:
df_tf.show(10)

[Stage 90:>                                                         (0 + 1) / 1]

+------+--------------------+
|    id|            tf_array|
+------+--------------------+
|431949|[0.077315405, 0.0...|
|431952|[0.07973422, 0.03...|
|431953|[0.09065156, 0.02...|
|431959|[0.08871745, 0.04...|
|431962|[0.085910656, 0.0...|
|431968|[0.078125, 0.0527...|
|431969|[0.15384616, 0.07...|
|431970|[0.0, 0.0, 0.0, 0...|
|431971|[0.087027915, 0.0...|
|431972|[0.071428575, 0.0...|
+------+--------------------+
only showing top 10 rows



                                                                                

In [151]:
import pyspark.sql.functions as F
from pyspark.sql import Window
import math

# Step 1: Define a UDF to calculate IDF
def calculate_idf(df_count, total_docs):
    return math.log(total_docs / df_count)

# Register the UDF
calculate_idf_udf = F.udf(lambda df_count, total_docs: calculate_idf(df_count, total_docs), 'float')

# Step 2: Get the total number of documents
total_documents = df_tokenized.select(F.countDistinct('id')).collect()[0][0]

# Step 3: Calculate document frequency (df) for each word in top_20k_words_df_with_index
# First, explode the `words` column in `df_tokenized` to have one word per row
exploded_df = df_tokenized.select(F.col('id'), F.explode(F.col('words')).alias('word'))

# Step 4: Calculate document frequency (df) for each word
doc_freq_df = exploded_df.groupBy('word').agg(F.countDistinct('id').alias('df'))

# Step 5: Join with `top_20k_words_df_with_index` to get word and index
idf_df = top_20k_words_df_with_index.join(doc_freq_df, 'word')

# Step 6: Apply the IDF formula
idf_df = idf_df.withColumn('idf', calculate_idf_udf(F.col('df'), F.lit(total_documents)))

# Show the final dataframe with IDF
df_idf = idf_df.select('word', 'df', 'idf','index')

df_idf.show(10)

24/12/13 13:49:21 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/13 13:49:21 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/13 13:49:23 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/13 13:49:23 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/13 13:49:23 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
[Stage 102:>                                                        (0 + 1) / 1]

+---------+---+----------+-----+
|     word| df|       idf|index|
+---------+---+----------+-----+
| carnegie|  4|  5.521461|10982|
|     some|420|0.86750054|   55|
|      few|179| 1.7203695|  323|
|connected| 67| 2.7030628| 1198|
|     earl| 34| 3.3813949|  466|
|    those|216| 1.5324769|  182|
|  melodic|  6|  5.115996| 7541|
|      art| 69| 2.6736488|  519|
|   travel| 49|  3.015935| 1487|
|    crest| 11|   4.50986| 3526|
+---------+---+----------+-----+
only showing top 10 rows



                                                                                

In [152]:
idf_dict = {row['index']: row['idf'] for row in df_idf.collect()}

24/12/13 13:49:31 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/13 13:49:31 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/13 13:49:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/13 13:49:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/13 13:49:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
                                                                                

In [153]:
idf_dict

{10982: 5.521461009979248,
 55: 0.8675005435943604,
 323: 1.7203694581985474,
 1198: 2.7030627727508545,
 466: 3.381394863128662,
 182: 1.5324769020080566,
 7541: 5.11599588394165,
 519: 2.6736488342285156,
 1487: 3.015934944152832,
 3526: 4.509860038757324,
 3651: 3.8167128562927246,
 2286: 3.296837329864502,
 4773: 4.199705123901367,
 1630: 3.10109281539917,
 3525: 4.268697738647461,
 195: 1.487220287322998,
 14289: 5.80914306640625,
 5648: 4.342805862426758,
 9159: 4.9618449211120605,
 3264: 4.074542045593262,
 18907: 6.214608192443848,
 5647: 4.422848701477051,
 2531: 3.296837329864502,
 14535: 6.907755374908447,
 7982: 5.11599588394165,
 8476: 4.9618449211120605,
 16202: 5.521461009979248,
 7538: 4.9618449211120605,
 6240: 4.342805862426758,
 5927: 4.268697738647461,
 3097: 3.8632328510284424,
 5929: 4.199705123901367,
 6591: 4.605170249938965,
 4963: 5.2983174324035645,
 7820: 4.710530757904053,
 7366: 4.710530757904053,
 5179: 4.9618449211120605,
 1878: 3.2701690196990967,
 4580

In [154]:
df_tf

                                                                                

id,tf_array
431949,"[0.077315405, 0.0..."
431952,"[0.07973422, 0.03..."
431953,"[0.09065156, 0.02..."
431959,"[0.08871745, 0.04..."
431962,"[0.085910656, 0.0..."
431968,"[0.078125, 0.0527..."
431969,"[0.15384616, 0.07..."
431970,"[0.0, 0.0, 0.0, 0..."
431971,"[0.087027915, 0.0..."
431972,"[0.071428575, 0.0..."


In [155]:
def make_tfidf_array(tf_array):
    return [tf*idf_dict[i] for i,tf in enumerate(tf_array)]

# register udf
tfidf_udf = udf(make_tfidf_array, ArrayType(FloatType()))

In [156]:
idf_array = np.array([idf_dict.get(i, 0) for i in range(20000)])  # 20000 is the assumed size of top words

def make_tfidf_array(tf_array):
    # Convert the input list to a NumPy array for vectorized operations
    tf_np_array = np.array(tf_array)
    
    # Perform element-wise multiplication to compute TF-IDF values
    tfidf_array = tf_np_array * idf_array  # Element-wise multiplication
    
    return tfidf_array.tolist()  # Return the result as a list for Spark compatibility

# Register the UDF using the optimized function
tfidf_udf = udf(make_tfidf_array, ArrayType(FloatType()))

In [157]:
df_tfidf = df_tf.select('id', tfidf_udf('tf_array').alias('tfidf_array'))

In [158]:
df_tfidf

id,tfidf_array
431949,"[0.0066149426, 0...."
431952,"[0.0068218913, 0...."
431953,"[0.0077559557, 0...."
431959,"[0.007590478, 0.0..."
431962,"[0.007350334, 0.0..."
431968,"[0.0066842097, 0...."
431969,"[0.013162752, 0.0..."
431970,"[0.0, 0.0, 0.0, 0..."
431971,"[0.0074459244, 0...."
431972,"[0.0061112777, 0...."


In [159]:
## Join categories of same documentID with tfidf array 

In [160]:
df = spark.read.csv('/Users/csuftitan/Desktop/avd DB/wiki-categorylinks-small.csv.bz2')

In [161]:
df_cat = df.toDF('index','category')

In [162]:
df_cat

index,category
434042,1987_debut_albums
434042,Albums_produced_b...
434042,Articles_with_hAu...
434042,Articles_with_sho...
434042,CS1_German-langua...
434042,Cacophony_(band)_...
434042,Jason_Becker_albums
434042,Marty_Friedman_al...
434042,Shrapnel_Records_...
455070,All_articles_need...


In [163]:
df_cat_tfidf = df_tfidf.join(df_cat, df_tfidf.id==df_cat.index).select(df_tfidf.id, df_cat.category, df_tfidf.tfidf_array)

In [164]:
df_cat_tfidf.show(10)

[Stage 123:>                                                        (0 + 1) / 1]

+------+--------------------+--------------------+
|    id|            category|         tfidf_array|
+------+--------------------+--------------------+
|431949|Webarchive_templa...|[0.0066149426, 0....|
|431949|Use_mdy_dates_fro...|[0.0066149426, 0....|
|431949|The_Church_of_Jes...|[0.0066149426, 0....|
|431949|Pages_using_web_c...|[0.0066149426, 0....|
|431949|  Mormonism_and_race|[0.0066149426, 0....|
|431949|Latter_Day_Saint_...|[0.0066149426, 0....|
|431949|History_of_The_Ch...|[0.0066149426, 0....|
|431949|Criticism_of_Morm...|[0.0066149426, 0....|
|431949|CS1_maint:_BOT:_o...|[0.0066149426, 0....|
|431949|CS1:_Julian–Grego...|[0.0066149426, 0....|
+------+--------------------+--------------------+
only showing top 10 rows



                                                                                

In [165]:
df_cat_tfidf.select('tfidf_array')

24/12/13 13:49:58 WARN PythonUDFRunner: Detected deadlock while completing task 0.0 in stage 125 (TID 105): Attempting to kill Python Worker
                                                                                

tfidf_array
"[0.0066149426, 0...."
"[0.0066149426, 0...."
"[0.0066149426, 0...."
"[0.0066149426, 0...."
"[0.0066149426, 0...."
"[0.0066149426, 0...."
"[0.0066149426, 0...."
"[0.0066149426, 0...."
"[0.0066149426, 0...."
"[0.0066149426, 0...."


In [166]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import FloatType
import numpy as np
import re

# Assuming `top_20k_words_dict` and `idf_dict` are already defined

# Step 1: Optimize the `calculate_TFIDF` function using NumPy
def calculate_TFIDF(textInput):
    """
    Calculate the TF-IDF vector for the given input text using NumPy.
    """
    clean_text = re.sub(r'[^A-Za-z0-9 ]+', '', textInput)
    words = clean_text.lower().split()
    
    # Step 1.1: Get the positions of words from the dictionary
    positions = [top_20k_words_dict[word] for word in words if word in top_20k_words_dict]
    
    # Step 1.2: Create the term frequency array
    tf = np.zeros(20000)  # Initialize with zeros
    
    # Step 1.3: Update the frequency for each word
    np.add.at(tf, positions, 1)
    
    # Step 1.4: Normalize the term frequencies
    total_frequency = np.sum(tf)
    if total_frequency > 0:  # Avoid division by zero
        tf /= total_frequency
    
    # Step 1.5: Calculate the TF-IDF by multiplying with the IDF array
    idf_array = np.array([idf_dict.get(i, 0) for i in range(20000)])  # Convert IDF dictionary to NumPy array
    tfidf = tf * idf_array  # Element-wise multiplication to compute TF-IDF
    
    return tfidf

# Step 2: Optimize the dot product function using NumPy
def dot_product(tfidf_array, text_tfidf_broadcast):
    """
    Compute the dot product between the input TF-IDF vector and the document's TF-IDF vector.
    """
    text_tfidf = text_tfidf_broadcast.value  # Access the broadcasted vector
    return float(np.dot(text_tfidf, tfidf_array))  # Use NumPy dot product

# Step 3: Define the getPrediction function with NumPy optimizations
def getPrediction(textInput, k):
    """
    Returns top k categories that are closest to the given text input based on TF-IDF similarity.
    
    Args:
    textInput (str): The input text for which the closest categories are to be found.
    k (int): The number of top categories to return.
    
    Returns:
    list: A list of top k categories.
    """
    # Step 3.1: Calculate the TF-IDF of the input text using the optimized function
    text_tfidf = calculate_TFIDF(textInput)  # This function returns the NumPy TF-IDF vector for the input text
    
    # Step 3.2: Broadcast the TF-IDF vector to all workers
    text_tfidf_broadcast = sc.broadcast(text_tfidf)

    # Step 3.3: Define a UDF using the optimized dot product function
    dot_product_udf = udf(lambda tfidf_array: dot_product(tfidf_array, text_tfidf_broadcast), FloatType())

    # Step 3.4: Apply the UDF to calculate the dot product (similarity score) for each category's TF-IDF
    df_with_dot = df_cat_tfidf.withColumn("distance", dot_product_udf("tfidf_array"))

    # Step 3.5: Get the top k categories with the highest dot product (distance)
    top_k_df = df_with_dot.orderBy(col("distance").desc()).limit(k)

    # Step 3.6: Collect and return the top k categories
    result = [row['category'] for row in top_k_df.select('category').collect()]

    # Step 3.7: Unpersist the broadcast variable to free up memory
    text_tfidf_broadcast.unpersist()

    return result


In [167]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import FloatType
import numpy as np
import re

def calculate_TFIDF(textInput):
    clean_text = re.sub(r'[^A-Za-z0-9 ]+', '', textInput)
    words = clean_text.lower().split()
    positions = [top_20k_words_dict[word] for word in words if word in top_20k_words_dict]

    tf = [0] * 20000
    for pos in positions:
        tf[pos] = tf[pos] + 1
        
    # Sum of all frequencies
    total_frequency = sum(tf)  

    if total_frequency > 0:  # Avoid division by zero
        tf = [freq/total_frequency for freq in tf]

    # Calculate tf-idf by multiplying with idf
    tfidf = [tf[i] * idf_dict.get(i, 0) for i in range(20000)]
    
    return tfidf
# Assume `df_cat_tfidf` DataFrame and `calculate_TFIDF` function are already defined

# Step 1: Define a UDF to calculate dot product using broadcasted text_tfidf
def dot_product(tfidf_array, text_tfidf_broadcast):
    text_tfidf = text_tfidf_broadcast.value  # Access the broadcasted vector
    return float(np.dot(text_tfidf, tfidf_array))

# Step 2: Define the getPrediction function
def getPrediction(textInput, k):
    """
    Returns top k categories that are closest to the given text input based on TF-IDF similarity.
    
    Args:
    textInput (str): The input text for which the closest categories are to be found.
    k (int): The number of top categories to return.
    
    Returns:
    list: A list of top k categories.
    """
    # Step 3: Calculate the TF-IDF of the input text
    text_tfidf = calculate_TFIDF(textInput)  # This function should return the TF-IDF vector for the input text
    
    # Step 4: Broadcast the TF-IDF vector to all workers (recreate the broadcast variable for every input)
    text_tfidf_broadcast = sc.broadcast(text_tfidf)

    # Step 5: Register the UDF inside the function after broadcasting the vector
    dot_product_udf = udf(lambda tfidf_array: dot_product(tfidf_array, text_tfidf_broadcast), FloatType())

    # Step 6: Apply the UDF to calculate the dot product (similarity score) for each category's TF-IDF
    df_with_dot = df_cat_tfidf.withColumn("distance", dot_product_udf("tfidf_array"))

    # Step 7: Get top k categories with the highest dot product (distance)
    top_k_df = df_with_dot.orderBy(col("distance").desc()).limit(k)

    # Step 8: Collect and return the top k categories
    result = [row['category'] for row in top_k_df.select('category').collect()]

    # Step 9: Unpersist the broadcast variable to free up memory
    text_tfidf_broadcast.unpersist()

    return result


In [168]:
# Example usage:


In [169]:
top_k_categories = getPrediction('Sport Basketball Volleyball Soccer', 10)
print(top_k_categories)

[Stage 129:>                                                        (0 + 1) / 1]

['Human_name_disambiguation_pages', 'Disambiguation_pages_with_short_description', 'All_disambiguation_pages', 'All_article_disambiguation_pages', 'Wikipedia_articles_with_ISNI_identifiers', 'Wikipedia_articles_with_WorldCat-VIAF_identifiers', 'Wikipedia_articles_with_VIAF_identifiers', 'Wikipedia_articles_with_SNAC-ID_identifiers', 'Wikipedia_articles_with_NLK_identifiers', 'Wikipedia_articles_with_LCCN_identifiers']


                                                                                

In [170]:
top_k_categories = getPrediction('What is the capital city of Australia?', 10)
print(top_k_categories)

[Stage 131:>                                                        (0 + 1) / 1]

['All_set_index_articles', 'Use_dmy_dates_from_April_2018', 'Use_Australian_English_from_April_2018', 'Set_indices_on_ships', 'Royal_Australian_Navy_ship_names', 'Articles_with_short_description', 'All_Wikipedia_articles_written_in_Australian_English', 'All_Wikipedia_articles_written_in_Australian_English', 'Royal_Australian_Navy_ship_names', 'Use_Australian_English_from_April_2018']


                                                                                

In [171]:
top_k_categories = getPrediction('How many goals Vancouver score last year?', 10)
print(top_k_categories)


                                                                                

['Television_shows_set_in_Vancouver', 'Canadian_television_program_stubs', 'CBC_Television_shows', 'All_stub_articles', '1994_Canadian_television_series_endings', '1991_Canadian_television_series_debuts', '1990s_Canadian_teen_drama_television_series', 'Torpedo_Nizhny_Novgorod_players', 'HC_CSKA_Moscow_players', 'Sportspeople_from_Kazan']
