In [1]:
import findspark
findspark.init('/usr/hdp/2.6.3.0-235/spark2')

import pyspark
sc = pyspark.SparkContext(appName="InvertedIndex")

In [2]:
from pyspark.sql import SparkSession, SQLContext
# Use the SparkSql module to read data into a dataframe
spark = SparkSession.builder.appName('myAppName').getOrCreate()
df = spark.read.csv('hdfs:///user/vagrant/QueryResults.csv', mode="DROPMALFORMED", inferSchema=True, header=True)
# text_file = sc.textFile('hdfs:///user/vagrant/data/QueryResults.csv', 3) # number of partitions

In [3]:
df.columns

['Id', 'PostTypeID', 'CreationDate', 'Tags']

In [4]:
df.dtypes

[('Id', 'int'),
 ('PostTypeID', 'int'),
 ('CreationDate', 'string'),
 ('Tags', 'string')]

In [5]:
df.show(5)

+--------+----------+-------------+--------------------+
|      Id|PostTypeID| CreationDate|                Tags|
+--------+----------+-------------+--------------------+
|41412454|         1|1/1/2017 0:00|<postgresql><grai...|
|41412456|         1|1/1/2017 0:00|<python><shell><e...|
|41412462|         1|1/1/2017 0:02|<angularjs><provi...|
|41412463|         1|1/1/2017 0:02|<javascript><node...|
|41412468|         1|1/1/2017 0:03|    <mysql><boolean>|
+--------+----------+-------------+--------------------+
only showing top 5 rows



In [11]:
# Outputs a tuple, ([list of tags], id#)
# Output shown below


[(['<postgresql>', '<grails>', '<gsp>'], 41412454),
 (['<python>', '<shell>', '<expect>'], 41412456),
 (['<angularjs>', '<provider>', '<hybrid>'], 41412462),
 (['<javascript>', '<node.js>'], 41412463),
 (['<mysql>', '<boolean>'], 41412468),
 (['<arrays>', '<powershell>', '<append>'], 41412470),
 (['<tableau>'], 41412471),
 (['<html>', '<css>', '<html-lists>'], 41412472),
 (['<javascript>', '<html>'], 41412475),
 (['<sql>', '<sql-server>', '<greatest-n-per-group>'], 41412477)]

In [12]:
# x = tuple
# x[0] = list
# x[0][0] = first element of list in tuple
#  


[('<postgresql>', '41412454'),
 ('<grails>', '41412454'),
 ('<gsp>', '41412454'),
 ('<python>', '41412456'),
 ('<shell>', '41412456')]

In [13]:
# Just puts above output into a variable


In [14]:
# at this point we want to do some sort of key:value "groupBy" function, 
# with the tag being the key and a list of IDs being the value
# Normally I'd tack the grouping on the previous operation


[('<postgresql>', <pyspark.resultiterable.ResultIterable at 0x7f45e05c5198>),
 ('<grails>', <pyspark.resultiterable.ResultIterable at 0x7f45e05c5208>),
 ('<gsp>', <pyspark.resultiterable.ResultIterable at 0x7f45e05c52e8>),
 ('<python>', <pyspark.resultiterable.ResultIterable at 0x7f45e05c51d0>),
 ('<shell>', <pyspark.resultiterable.ResultIterable at 0x7f45e05c53c8>)]

In [15]:
# Just putting output above into a variable


In [16]:
# Here is the final output




[{'<postgresql>': ['41412454',
   '41412802',
   '41412967',
   '41413020',
   '41414313',
   '41414526',
   '41414638',
   '41415004',
   '41415357',
   '41418420',
   '41419008',
   '41419034',
   '41419860',
   '41421348',
   '41421429',
   '41421566',
   '41423132',
   '41423932',
   '41424233',
   '41424654',
   '41425472',
   '41425860',
   '41426077',
   '41426195',
   '41426207',
   '41426343',
   '41426992',
   '41427115',
   '41427678',
   '41429136',
   '41430110',
   '41430826',
   '41430877',
   '41431243',
   '41431273',
   '41431291',
   '41433483',
   '41433500',
   '41433609',
   '41434205',
   '41435107',
   '41435233',
   '41435339',
   '41435408',
   '41435811',
   '41436051',
   '41436997',
   '41437503',
   '41439346',
   '41440840',
   '41441045',
   '41441067',
   '41441139',
   '41441554',
   '41442451',
   '41443278',
   '41443338',
   '41443409',
   '41443722',
   '41443890',
   '41444014',
   '41444296',
   '41444890',
   '41445767',
   '41446269',
   '41446

In [17]:
# Inverted index will be a list of dictionaries, 
# Each dictionary will be {'<tag> : [list of IDs tag appears in]}


[{'<postgresql>': ['41412454',
   '41412802',
   '41412967',
   '41413020',
   '41414313',
   '41414526',
   '41414638',
   '41415004',
   '41415357',
   '41418420',
   '41419008',
   '41419034',
   '41419860',
   '41421348',
   '41421429',
   '41421566',
   '41423132',
   '41423932',
   '41424233',
   '41424654',
   '41425472',
   '41425860',
   '41426077',
   '41426195',
   '41426207',
   '41426343',
   '41426992',
   '41427115',
   '41427678',
   '41429136',
   '41430110',
   '41430826',
   '41430877',
   '41431243',
   '41431273',
   '41431291',
   '41433483',
   '41433500',
   '41433609',
   '41434205',
   '41435107',
   '41435233',
   '41435339',
   '41435408',
   '41435811',
   '41436051',
   '41436997',
   '41437503',
   '41439346',
   '41440840',
   '41441045',
   '41441067',
   '41441139',
   '41441554',
   '41442451',
   '41443278',
   '41443338',
   '41443409',
   '41443722',
   '41443890',
   '41444014',
   '41444296',
   '41444890',
   '41445767',
   '41446269',
   '41446

In [18]:
# Should probably do something about saving this back out, maybe to a JSON file...

import json

with open("inverted_index.json", "w") as f:
    json.dump(inverted_index, f)