In [1]:
# Install dependencies via unix ternimal:
#xcode-select --install
#brew cask install java
#brew install scala
#brew install apache-spark
#pip install findspark 
#pip install pyspark

In [2]:
# Import Libraries:
#import findspark (not needed since spark PATH has been defined)
#findspark.init('/usr/hdp/2.6.3.0-235/spark2')
import pyspark
import matplotlib.pyplot as plt
from functools import reduce
from collections import Counter
from nltk.corpus import stopwords

# Inverted Indexing Problem

### Introduction:

The inverted indexing project is Thien Nghiem's submission for MSDS Data Engineering class of Spring 2018.

This Jupyter Notebook contains the solution for the inverted index summarization of the Stack Overflow dataset. The objective of this project is to index large datasets on keywords for conveniency in tracing terms back to records that contain specific values. This process is similar to how indexes were built by search engines to improve search performance.

In the context of big data, I will use Apache Spark to perform the MapReduce component of this project. Next, I will use Spark SQL and dataframe for querying and displaying the indexing results from the MapReduce job. This project illustrates the integration of various Data Engineering components such as Python, HDFS, Spark, SQL, MapReduce, to solve the problem of inverted indexing on large datasets. 

### Motivation:

It is often convenient to index large data sets on keywords, so that searches can trace terms back to records that contain specific values. While building an inverted index does require extra processing up front, taking the time to do so can greatly reduce the amount of time it takes to find something.

Search engines build indexes to improve search performance. Imagine entering a keyword and letting the engine crawl the Internet and build a list of pages to return to you. Such a query would take an extremely long amount of time to complete. By building an inverted index, the search engine knows all the web pages related to a keyword ahead of time and these results are simply displayed to the user. These indexes are often ingested into a database for fast query responses. Building an inverted index is a fairly straightforward application of MapReduce because the framework handles a majority of the work.


#### Data source:
The data has been retrieved from: https://data.stackexchange.com/stackoverflow/

***SQL query:***
SELECT Id, Tags FROM Posts WHERE Tags is not null;


In [3]:
# Import query results to spark:
sc = pyspark.SparkContext(appName="Reverse Indexing") # sc = spark context
text_file = sc.textFile("QueryResults.csv")

### Map:

In [4]:
# Step 1- Preprocessing: Remove the commas and characters
step1 = text_file.map(lambda x: x.replace(',','').replace('<', ' ').replace('>',' ').split())
step1.take(5)

[['1596943', 'html', 'asp.net', 'footer'],
 ['1596945', 'macos', 'gcc', 'otool'],
 ['1596947', 'android'],
 ['1596950', 'c#', '.net', 'wmi', 'device-manager'],
 ['1596951', 'java', 'database', 'hibernate', 'orm', 'jpa']]

In [5]:
# Step 2: Invert Indexing using Tags instead of Userids
step2 = step1.flatMap(lambda wlist: [[wlist[i],wlist[0]] for i in range(1,len(wlist))])
step2.take(5)

[['html', '1596943'],
 ['asp.net', '1596943'],
 ['footer', '1596943'],
 ['macos', '1596945'],
 ['gcc', '1596945']]

### Reduce:

Reduce the list by key. In this case, we reduce by adding up all the values that shared the same key (tag)

In [6]:
# create a tuple contain Tag and Ids that associated with the tag
step3 = step2.reduceByKey(lambda total, count: total + ',' + count).map(lambda x: (x[0],x[1].split(",")))
step3.take(1)

[('html',
  ['1596943',
   '33668105',
   '33681754',
   '33681755',
   '41673571',
   '42401334',
   '19020553',
   '33324159',
   '32689570',
   '24671562',
   '16359272',
   '8382136',
   '26420415',
   '26420430',
   '32946083',
   '32946104',
   '24672075',
   '40732817',
   '40732864',
   '40732888',
   '40732906',
   '40732072',
   '41429386',
   '24671028',
   '4923',
   '4973',
   '40733081',
   '40733153',
   '16360215',
   '33828277',
   '26062623',
   '39438711',
   '24674050',
   '24674064',
   '40733828',
   '40733840',
   '40733868',
   '32691429',
   '32691517',
   '40732437',
   '40732448',
   '24671968',
   '24671971',
   '40734234',
   '32692381',
   '32692422',
   '32691050',
   '32691120',
   '24673178',
   '24672104',
   '24672112',
   '24672122',
   '24672201',
   '4880',
   '24675355',
   '8236666',
   '32691857',
   '32691862',
   '24673953',
   '24674005',
   '24674020',
   '24674021',
   '16905095',
   '34472917',
   '33165353',
   '33165355',
   '16361606',


In [7]:
# Mapping function to count the number of ids for each tag:
step4 = step3.flatMap(lambda k: ([k[0],len(k[i])] for i in range(1,len(k))))
step4.take(5)

[['html', 1978],
 ['footer', 10],
 ['macos', 298],
 ['gcc', 96],
 ['android', 2594]]

## Spark SQL and DataFrames


#### Double check the result above using SQL query on the original dataset:

In [25]:
from pyspark import SparkContext, SQLContext
from pyspark.sql.types import Row
from pyspark.sql import SparkSession

In [35]:
spark1 = SparkSession.builder.appName('Data_queries').config("spark.master","local").getOrCreate()
df1 = spark1.read.csv("QueryResults.csv",mode="DROPMALFORMED", inferSchema=True, header=False)
df1.show(5)

+-------+--------------------+
|    _c0|                 _c1|
+-------+--------------------+
|1596943|<html><asp.net><f...|
|1596945| <macos><gcc><otool>|
|1596947|           <android>|
|1596950|<c#><.net><wmi><d...|
|1596951|<java><database><...|
+-------+--------------------+
only showing top 5 rows



In [39]:
df1.createOrReplaceTempView("StackOverflow")
sqlDF = spark1.sql("SELECT count(*) FROM StackOverflow where _c1 like '%<html>%'")
sqlDF.show()

+--------+
|count(1)|
+--------+
|    1978|
+--------+



The result for the HTLM tag query is identical to the MapReduce result above. This confirms that **the script for Spark MapReduce worked correctly.**

### Find the top 5 tags:

In [40]:
spark2 = SparkSession.builder.appName('Inverted_index').config("spark.master","local").getOrCreate()
df2 = spark2.createDataFrame(step4, schema = ['Tag', 'Count'])

In [41]:
# the order of sparkdf is random 
# I will perform a SQL query to find the most popular tags based on counts
df2.show(5)

+-------+-----+
|    Tag|Count|
+-------+-----+
|   html| 1978|
| footer|   10|
|  macos|  298|
|    gcc|   96|
|android| 2594|
+-------+-----+
only showing top 5 rows



In [42]:
# Data Structure of the "Count" column is int64 (long)
# this allows sorting based on numeric values
df2.printSchema()

root
 |-- Tag: string (nullable = true)
 |-- Count: long (nullable = true)



In [44]:
# Using temporary df for SQL queries:
df2.createOrReplaceTempView("StackOverflow")
sqlDF = spark2.sql("SELECT COUNT(*) FROM StackOverflow")
sqlDF.show()

+--------+
|count(1)|
+--------+
|   12798|
+--------+



In [45]:
# SQL query for the most popular tags:
sqlDF = spark2.sql("SELECT tag, count FROM StackOverflow ORDER BY count DESC")
sqlDF.show()

+-------------+-----+
|          tag|count|
+-------------+-----+
|           c#| 4565|
|         java| 4277|
|   javascript| 4146|
|          php| 3809|
|       jquery| 2797|
|      android| 2594|
|       python| 2390|
|          c++| 2056|
|         html| 1978|
|        mysql| 1693|
|      asp.net| 1585|
|          sql| 1558|
|         .net| 1504|
|          css| 1473|
|          ios| 1302|
|       iphone| 1155|
|            c| 1102|
|ruby-on-rails| 1030|
|  objective-c|  963|
|   sql-server|  831|
+-------------+-----+
only showing top 20 rows



### So what are the 5 most popular tags?

 From this project, I found out that C# was the most popular tag on Stack Overflow recently. 
 
 The top 5 tags include c#, java, javascript, php and jquery. My favorite language - Python, is at the 7th place on this list.

## Additional queries and analysis: 

SQL query for this extension: SELECT Id, Title FROM Posts WHERE Tags is not null;

### Find the most common words in the title of each Stack overflow tag:

In this extension of the project, I would to know more about the common words that appear in the titles of the post. These popular words often reflect the topics of interest that related to each tag. In order to obtain this information, I will perform the word count problem using Spark MapReduce. Some additional packages include NLTK to clean up the words and Counter package for counting words in the title.

**Question: What are the top 5 words in the titles of each tag? Do these words make sense in the context of each tag?**

In [46]:
sc.stop() # need to stop the running Spark Context before starting a new one 
sc = pyspark.SparkContext(appName="Title Word Count") 
text_file = sc.textFile("QueryResults2.csv")

In [47]:
# step1 - preprocessing: split by commas
step1 = text_file.map(lambda x: x.split(",")).map(lambda y: [y[0].replace('-',''),y[1]])
step1.take(3)

[['How to use a variable storing an equal sign in an SQL query', '<php><sql>'],
 ['Javascript remote src not working', '<javascript>'],
 ['How to svn dump a specific folder and load it into a specific location',
  '<svn><tortoisesvn><svnadmin>']]

In [48]:
# step2: further split the tags
step2 = step1.map(lambda x: (x[0],x[1].replace('<', ' ').replace('>','').split()))
step2.take(1)

[('How to use a variable storing an equal sign in an SQL query',
  ['php', 'sql'])]

In [49]:
# step3: mapping the title to each tag separately 
step3 = step2.flatMap(lambda w: ((x,w[0]) for x in w[1]))
step3.take(5)

[('php', 'How to use a variable storing an equal sign in an SQL query'),
 ('sql', 'How to use a variable storing an equal sign in an SQL query'),
 ('javascript', 'Javascript remote src not working'),
 ('svn',
  'How to svn dump a specific folder and load it into a specific location'),
 ('tortoisesvn',
  'How to svn dump a specific folder and load it into a specific location')]

In [50]:
# step4: reduce by tag, adding up all the words in all the titles of that tag
# notice that the list contains stopwords such as "to", "a", "in", etc,. 
step4 = step3.reduceByKey(lambda total, count: total + ',' + count).map(lambda x: (x[0],x[1].lower().replace(",",'').split()))
step4.take(5)

[('php',
  ['how',
   'to',
   'use',
   'a',
   'variable',
   'storing',
   'an',
   'equal',
   'sign',
   'in',
   'an',
   'sql',
   'querysplit',
   'sentence',
   'into',
   'character',
   'limited',
   'arraysfile_get_contents',
   'failure',
   'on',
   'windowsmysql',
   'selecting',
   'random',
   'rows',
   'with',
   'one',
   'unique',
   'row',
   'within',
   'one',
   'querysoap',
   'and',
   'ntlm',
   'authentication',
   'between',
   'debian',
   'an',
   'iis',
   'serverphp',
   'file',
   'renamephp:',
   'is',
   'it',
   'possible',
   'to',
   'retrieve',
   'the',
   'class',
   'name',
   'of',
   'a',
   'child',
   'class?greek',
   'character',
   'insertion',
   'in',
   'php',
   'compared',
   'to',
   'sql',
   'server',
   'management',
   'studiocheck',
   'for',
   'malicious',
   'xml',
   'before',
   'allowing',
   'dtd',
   'loading?mysql',
   'data',
   'query',
   'is',
   'not',
   'refreshingreplacing',
   'wordpress',
   'footer',
   '

In [52]:
# step5: remove the stopwords using the nltk package
step5 = step4.map(lambda w: ((w[0],[word for word in w[1] if word not in stopwords.words('english')])))
step5.take(5)

[('php',
  ['use',
   'variable',
   'storing',
   'equal',
   'sign',
   'sql',
   'querysplit',
   'sentence',
   'character',
   'limited',
   'arraysfile_get_contents',
   'failure',
   'windowsmysql',
   'selecting',
   'random',
   'rows',
   'one',
   'unique',
   'row',
   'within',
   'one',
   'querysoap',
   'ntlm',
   'authentication',
   'debian',
   'iis',
   'serverphp',
   'file',
   'renamephp:',
   'possible',
   'retrieve',
   'class',
   'name',
   'child',
   'class?greek',
   'character',
   'insertion',
   'php',
   'compared',
   'sql',
   'server',
   'management',
   'studiocheck',
   'malicious',
   'xml',
   'allowing',
   'dtd',
   'loading?mysql',
   'data',
   'query',
   'refreshingreplacing',
   'wordpress',
   'footer',
   'custom',
   'html',
   'onemysql',
   'results',
   'getting',
   'repeated',
   'comparing',
   'results',
   'loopphp',
   'mysql',
   'login',
   'header',
   'issue$stmt',
   '=',
   '$conn>prepare($sql)',
   'returns',
   'unde

In [53]:
# step6: counting the word using Counter package
step6 = step5.flatMap(lambda z: [z[0], Counter(z[1]).most_common(5)])
step6.take(40)

['php',
 [('php', 496), ('using', 244), ('mysql', 159), ('array', 155), ('file', 136)],
 'svn',
 [('svn', 41),
  ('subversion', 12),
  ('repository', 10),
  ('files', 7),
  ('version', 6)],
 'tortoisesvn',
 [('svn', 5),
  ('tortoisesvn', 5),
  ('windows', 3),
  ('specific', 2),
  ('folder', 2)],
 'r',
 [('r', 65), ('data', 55), ('using', 41), ('values', 35), ('multiple', 27)],
 'bigdata',
 [('r', 1),
  ('custom', 1),
  ('parser', 1),
  ('functionreturning', 1),
  ('lots', 1)],
 'customization',
 [('custom', 7), ('add', 2), ('create', 2), ('cell', 1), ('action', 1)],
 'contacts',
 [('contacts', 3), ('add', 2), ('custom', 2), ('android', 2), ('cell', 1)],
 'parse.com',
 [('parse', 14), ('data', 8), ('using', 7), ('swift', 6), ('push', 5)],
 'push',
 [('push', 11), ('send', 3), ('notification', 3), ('git', 3), ('data', 2)],
 'html',
 [('html', 242), ('using', 136), ('div', 131), ('text', 93), ('page', 90)],
 'google-chrome',
 [('chrome', 52), ('google', 12), ('using', 9), ('working', 9), 

**Conclusion:** 

The final result contains the list of tags and their associated common words used in the titles. As expected, the most common word for each tag often is the name of the tag itself. This suggest that user often include the tag word in the title for better clarity. It's interesting to see some title words correlate closely with their tag such as "php" and "mysql" or "numpy" and "python".

Due to the nature of these related words, perhaps they could be used to enhance the suggestion engine of the website, such as common topic raised by a specific community.