# Prerequisites

In [1]:
import pyspark
import stackoverflow
import logging


# Set up logging ... DEBUG for my code, WARNING for libraries
logging.getLogger("py4j").setLevel(logging.WARNING)
logging.basicConfig(format='%(asctime)s - %(levelname)-6s - %(name)10s - %(message)s',
                    datefmt='%Y-%m-%d %H:%M:%S',
                    level=logging.DEBUG)

LOGGER = logging.getLogger('my-spark')

# Connect a Spark session to the spark-master node
sc = pyspark.sql.SparkSession.builder.appName("InvertedIndex") \
                                     .master('spark://spark-master:7077') \
                                     .getOrCreate()

# Load the Stack Exchange Data Explorer CSV file from Hadoop:
contents = sc.read.csv('hdfs://hadoop:9000/final-project/QueryResults.csv',
                       header=True)

# As in other labs, I've isolated some code to other Python modules
# The 'stackoverflow' module has knowledge of (e.g.) tag formats and other
# useful utilities
sc.sparkContext.addPyFile('./stackoverflow.py')

21/10/09 17:15:32 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


# Inverse Index Implementation

In [2]:
# The end result will be something like
#    [(tag1, Post ID1), (tag2, Post ID1), (tag3, Post ID1),
#     (tag1, Post ID2), (tagX, Post ID2), (tagY, Post IDn)..., ]
mapped = contents.rdd.flatMap(lambda post: [(tag, post.Id) for tag in stackoverflow.extractTags(post.Tags)])

# groupByKey: For each Stack Overflow Tag, create an iterable of Post IDs
# mapValues: Convert the iterable to a Python 'list'
reduced = mapped.groupByKey().mapValues(list)

# Print a sample of the results as a sanity check
# take: Returns a list of tuples
#       Tag is the first element of the tuple
#       the list of associated Post IDs is the second element of the tuple
for tag, posts in reduced.take(5):
    print("Tag '{tag}' is associated with {count} Posts".format(tag = tag,
                                                                count = len(posts)))



Tag 'record' is associated with 3 Posts
Tag 'asp.net-mvc' is associated with 174 Posts
Tag 'android' is associated with 1921 Posts
Tag 'c++' is associated with 1279 Posts
Tag 'winapi' is associated with 67 Posts




# Verification of Results

In [3]:
import termcolor
import textwrap

SUCCESS_CHECK = termcolor.colored('\N{check mark}', 'green')
FAILED_X = termcolor.colored('\N{ballot x}', 'red')

# qt-creator is a good example to verify the results
expected_tag = 'qt-creator'

# Retrieve the inverse index for 'qt-creator'
# RDD.lookup will return a list of items that match the given key
#            (we know that there is only one entry for each key (tag), so
#            we grab the first element, which is the list of post IDs
#            for the qt-creator tag)
qt_creator_posts = reduced.lookup(expected_tag)[0]

# Create an RDD that is indexed by post ID, which will make it much
# easier to perform our test
indexed_contents = contents.rdd.map(lambda post: (post.Id, {'title': post.Title, 'tags': post.Tags}))

# For each post in the inverse index list, retrieve its tags and verify
# that qt-creator is present -- this spot-checks that the inverse index
# is correct
for p in qt_creator_posts:
    post = indexed_contents.lookup(p)[0]
    title = textwrap.shorten(post['title'], width=30, placeholder='...')
    tag_is_present = '<{tag}>'.format(tag=expected_tag) in post['tags']
    LOGGER.info("Post {id} ({title}) has tag '{tag}' {success}".format(id=p,
                                                                       title=title,
                                                                       tag=expected_tag,
                                                                       success=SUCCESS_CHECK if tag_is_present else FAILED_X))

2021-10-09 17:17:05 - INFO   -   my-spark - Post 65728407 (Can't build APK using Qt...) has tag 'qt-creator' [32m✓[0m
2021-10-09 17:17:10 - INFO   -   my-spark - Post 65881568 (how it works QMessageBox) has tag 'qt-creator' [32m✓[0m
2021-10-09 17:17:14 - INFO   -   my-spark - Post 65576972 (Qt Creator exclude...) has tag 'qt-creator' [32m✓[0m
2021-10-09 17:17:17 - INFO   -   my-spark - Post 65578658 (How to use valgrind...) has tag 'qt-creator' [32m✓[0m
2021-10-09 17:17:21 - INFO   -   my-spark - Post 65590944 (Qt Creator FakeVim using...) has tag 'qt-creator' [32m✓[0m
2021-10-09 17:17:25 - INFO   -   my-spark - Post 65774060 (Build a library in C++...) has tag 'qt-creator' [32m✓[0m
2021-10-09 17:17:28 - INFO   -   my-spark - Post 65777697 (Updating QLineSeries in...) has tag 'qt-creator' [32m✓[0m
2021-10-09 17:17:32 - INFO   -   my-spark - Post 65781403 (PyQt5 Designer - Wrong...) has tag 'qt-creator' [32m✓[0m


# Save the Results

In [4]:
reduced.saveAsTextFile('hdfs://hadoop:9000/final-project/inverted-index.txt')

