In [1]:
spark
#loads spark

In [2]:
sc
#loads spark context: main entry point for Spark functionality. 
#Represents the connection to a spark cluster and can be used to create RDDs 
#https://spark.apache.org/docs/latest/api/java/org/apache/spark/SparkContext.html

In [3]:
from pyspark import SparkContext
sc = SparkContext.getOrCreate()

In [4]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
#main entry point for dataframe and SQL functionality
#https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=sparksession

In [5]:
df = spark.read.csv('gs://dataproc-staging-us-central1-18725869504-rzbbn2fq/QueryResults_Shrode.csv')
type(df)
#get data from bucket (google storage)
#look at data type
#data originally from stack overflow data explorer

pyspark.sql.dataframe.DataFrame

In [6]:
#make sure data loaded properly -- remember the data load doesnt't run until we run .take or .collect or save
df.take(5)

[Row(_c0=u'Id', _c1=u'Tags'),
 Row(_c0=u'41918662', _c1=u'amazon-web-services,docker,kubernetes'),
 Row(_c0=u'41918667', _c1=u'python,r'),
 Row(_c0=u'41918669', _c1=u'restler'),
 Row(_c0=u'41918671', _c1=u'php,preg-match')]

In [7]:
df.show(5)

+--------+--------------------+
|     _c0|                 _c1|
+--------+--------------------+
|      Id|                Tags|
|41918662|amazon-web-servic...|
|41918667|            python,r|
|41918669|             restler|
|41918671|      php,preg-match|
+--------+--------------------+
only showing top 5 rows



In [8]:
rdd = df.rdd
type(rdd)

pyspark.rdd.RDD

In [9]:
rdd.take(10)

[Row(_c0=u'Id', _c1=u'Tags'),
 Row(_c0=u'41918662', _c1=u'amazon-web-services,docker,kubernetes'),
 Row(_c0=u'41918667', _c1=u'python,r'),
 Row(_c0=u'41918669', _c1=u'restler'),
 Row(_c0=u'41918671', _c1=u'php,preg-match'),
 Row(_c0=u'41918674', _c1=u'javascript,node.js,express,ejs'),
 Row(_c0=u'41918675', _c1=u'php,codeigniter,ccavenue'),
 Row(_c0=u'41918686', _c1=u'javascript,angular,webpack,webpack-2,html-webpack-plugin'),
 Row(_c0=u'41918687', _c1=u'javascript,node.js,phpstorm,webstorm,jetbrains-ide'),
 Row(_c0=u'41918690', _c1=u'c,stack,dynamic-memory-allocation,realloc')]

In [10]:
no_null = rdd.filter(lambda x: x._c0 !='' and x._c1 !='') 

In [11]:
no_null.take(5)

[Row(_c0=u'Id', _c1=u'Tags'),
 Row(_c0=u'41918662', _c1=u'amazon-web-services,docker,kubernetes'),
 Row(_c0=u'41918667', _c1=u'python,r'),
 Row(_c0=u'41918669', _c1=u'restler'),
 Row(_c0=u'41918671', _c1=u'php,preg-match')]

In [26]:
def split_strip(x):
    return (x._c0.strip(), x._c1.split(','))
#.strip() strips whitespaces, .split(,) splits on commas

In [13]:
no_null.map(split_strip).take(5)

[(u'Id', [u'Tags']),
 (u'41918662', [u'amazon-web-services', u'docker', u'kubernetes']),
 (u'41918667', [u'python', u'r']),
 (u'41918669', [u'restler']),
 (u'41918671', [u'php', u'preg-match'])]

In [14]:
split_id = no_null.map(split_strip)

In [27]:
#MAP
tag_to_id = split_id.flatMap(lambda x: [(tag, x[0]) for tag in x[1]])
#basically a looping option 'for each tag in column 1, do tag, x[0]'

In [16]:
tag_to_id.take(10)

[(u'Tags', u'Id'),
 (u'amazon-web-services', u'41918662'),
 (u'docker', u'41918662'),
 (u'kubernetes', u'41918662'),
 (u'python', u'41918667'),
 (u'r', u'41918667'),
 (u'restler', u'41918669'),
 (u'php', u'41918671'),
 (u'preg-match', u'41918671'),
 (u'javascript', u'41918674')]

In [17]:
#REDUCE
grouped_by_key = tag_to_id.groupByKey()
grouped_by_key.take(5)

[(u'h.264', <pyspark.resultiterable.ResultIterable at 0x7fc7bc0abb90>),
 (u'biopython', <pyspark.resultiterable.ResultIterable at 0x7fc7bc0abbd0>),
 (u'screen-resolution',
  <pyspark.resultiterable.ResultIterable at 0x7fc7bc0abc50>),
 (u'nsstackview', <pyspark.resultiterable.ResultIterable at 0x7fc7bc0abc90>),
 (u'userscripts', <pyspark.resultiterable.ResultIterable at 0x7fc7bc0abcd0>)]

In [18]:
#FINISH INDEX
inverted_index = grouped_by_key.map(lambda x: (x[0], list(x[1])))

In [19]:
#inverted_index.collect()

In [20]:
inverted_index.take(5)

[(u'h.264',
  [u'45691558',
   u'43218009',
   u'45816914',
   u'45933221',
   u'42300009',
   u'42319421',
   u'45066008',
   u'41470525']),
 (u'biopython', [u'43216346', u'46990284', u'45869094']),
 (u'screen-resolution', [u'43345440', u'41468537']),
 (u'nsstackview', [u'46009170']),
 (u'userscripts', [u'42319639'])]

In [21]:
#Write data to disk
    #We can use saveAsTextFile function, which saves the data in a folder similar to when hadoop outputs it's mapreduce files

In [22]:
#Import note: Just like with HDFS, the output directory we write to cannot already exist. 
#So if you run this next cell once and want to run it again, you'll have to delete the folder from the bucket. 
#This can be either through the GUI or using the Google's API. In the GUI you'll have to refresh the page (or click the refresh button)
#to see any new files/folders.

In [23]:
from google.cloud import storage
client = storage.Client()
# https://console.cloud.google.com/storage/browser/[bucket-id]/
# bucket = client.get_bucket('bucket-id-here')
# command to delete the folder here

In [24]:
#remember we will write to our google bucket for our cluster.

In [29]:
inverted_index.saveAsTextFile('gs://dataproc-staging-us-central1-18725869504-rzbbn2fq/Shrode_inverted_index.csv')

In [25]:
#notice the output format in the folder is like HDFS, with files broken up in parts. 