In [1]:
spark

In [2]:
sc

In [3]:
from pyspark.sql.functions import regexp_replace
from google.cloud import storage

In [4]:
df = spark.read.csv("gs://dataproc-staging-us-central1-330795877686-cs3qbyk4/Lacey_Conrad_FinalQuery.csv",
                    mode="DROPMALFORMED",
                    inferSchema=True,
                    header=True)

In [5]:
type(df)

pyspark.sql.dataframe.DataFrame

In [6]:
df.take(5)

[Row(Id=30246138, Tags=u'<html><webvtt>'),
 Row(Id=30246145, Tags=u'<html><css>'),
 Row(Id=30246147, Tags=u'<javascript><jquery><ajax><get><ip>'),
 Row(Id=30246148, Tags=u'<c#><windows-phone-8.1>'),
 Row(Id=30246149, Tags=u'<vb.net>')]

In [7]:
df.show(5)

+--------+--------------------+
|      Id|                Tags|
+--------+--------------------+
|30246138|      <html><webvtt>|
|30246145|         <html><css>|
|30246147|<javascript><jque...|
|30246148|<c#><windows-phon...|
|30246149|            <vb.net>|
+--------+--------------------+
only showing top 5 rows



In [8]:
# keep this, it probably works
df2 = df.withColumn('Tags', regexp_replace(df['Tags'], '[<>]', ' '))

In [9]:
df2

DataFrame[Id: int, Tags: string]

In [10]:
type(df2)

pyspark.sql.dataframe.DataFrame

In [11]:
df2.show(5)

+--------+--------------------+
|      Id|                Tags|
+--------+--------------------+
|30246138|       html  webvtt |
|30246145|          html  css |
|30246147| javascript  jque...|
|30246148| c#  windows-phon...|
|30246149|             vb.net |
+--------+--------------------+
only showing top 5 rows



In [12]:
df2.select("Tags").show()

+--------------------+
|                Tags|
+--------------------+
|       html  webvtt |
|          html  css |
| javascript  jque...|
| c#  windows-phon...|
|             vb.net |
| javascript  jque...|
| c#  json  linq-t...|
| python  profilin...|
|             jquery |
| php  ajax  json ...|
| java  api  rest ...|
|             pandas |
|       google-oauth |
| python  python-3...|
|     c#  sql-server |
|             python |
| html  css  flexbox |
| python  django  ...|
| c#  asp.net-mvc ...|
| ios  parse-platf...|
+--------------------+
only showing top 20 rows



In [13]:
type(df2)

pyspark.sql.dataframe.DataFrame

In [14]:
blank_tags = df2.rdd.filter(lambda x: x.Tags is not None)

In [15]:
type(blank_tags)

pyspark.rdd.PipelinedRDD

In [16]:
def split_strip(x):
    return (x.Tags.strip().split(' '), x.Id)

In [17]:
strip = blank_tags.map(split_strip)

In [18]:
strip.take(5)

[([u'html', u'', u'webvtt'], 30246138),
 ([u'html', u'', u'css'], 30246145),
 ([u'javascript', u'', u'jquery', u'', u'ajax', u'', u'get', u'', u'ip'],
  30246147),
 ([u'c#', u'', u'windows-phone-8.1'], 30246148),
 ([u'vb.net'], 30246149)]

In [19]:
type(strip)

pyspark.rdd.PipelinedRDD

In [20]:
word_to_id = strip.flatMap(lambda x: [(word, x[1]) for word in x[0]])

In [21]:
word_to_id.take(10)

[(u'html', 30246138),
 (u'', 30246138),
 (u'webvtt', 30246138),
 (u'html', 30246145),
 (u'', 30246145),
 (u'css', 30246145),
 (u'javascript', 30246147),
 (u'', 30246147),
 (u'jquery', 30246147),
 (u'', 30246147)]

In [22]:
grouped_by_word = word_to_id.groupByKey()

In [23]:
grouped_by_word.take(10)

[(u'', <pyspark.resultiterable.ResultIterable at 0x7f30d4b39cd0>),
 (u'h.265', <pyspark.resultiterable.ResultIterable at 0x7f30d4b39d10>),
 (u'h.264', <pyspark.resultiterable.ResultIterable at 0x7f30d4b39d50>),
 (u'biopython', <pyspark.resultiterable.ResultIterable at 0x7f30d4b39d90>),
 (u'screen-resolution',
  <pyspark.resultiterable.ResultIterable at 0x7f30d4b39dd0>),
 (u'userscripts', <pyspark.resultiterable.ResultIterable at 0x7f30d4b39e10>),
 (u'prefix', <pyspark.resultiterable.ResultIterable at 0x7f30d4b39e50>),
 (u'tcp-ip', <pyspark.resultiterable.ResultIterable at 0x7f30d4b39e90>),
 (u'netcdf', <pyspark.resultiterable.ResultIterable at 0x7f30d4b39ed0>),
 (u'sublime-text-plugin',
  <pyspark.resultiterable.ResultIterable at 0x7f30d4b39f10>)]

In [24]:
inverse_index = grouped_by_word.map(lambda x: {x[0]: list(x[1])})

In [25]:
inverse_index.take(10)

[{u'': [30246138,
   30246145,
   30246147,
   30246147,
   30246147,
   30246147,
   30246148,
   30246152,
   30246152,
   30246152,
   30246156,
   30246156,
   30246166,
   30246166,
   30246166,
   30246170,
   30246170,
   30246170,
   30246172,
   30246172,
   30246172,
   30246172,
   30246182,
   30246182,
   30246185,
   30246187,
   30246187,
   30246188,
   30246188,
   30246188,
   30246190,
   30246190,
   30246190,
   30246191,
   30246191,
   30246191,
   30246192,
   30246192,
   30246194,
   30246194,
   30246197,
   30246197,
   30246197,
   30246197,
   30246198,
   30246198,
   30246198,
   30246200,
   30246200,
   30246200,
   30246209,
   30246209,
   30246210,
   30246210,
   30246214,
   30246214,
   30246218,
   30246218,
   30246218,
   30246218,
   30246219,
   30246221,
   30246221,
   30246221,
   30246225,
   30246225,
   30246228,
   30246228,
   30246228,
   30246230,
   30246237,
   30246237,
   30406055,
   30406056,
   30406058,
   30406058,
   3040

In [26]:
type(inverse_index)

pyspark.rdd.PipelinedRDD

In [27]:
client = storage.Client()

In [33]:
inverse_index.saveAsTextFile('gs://dataproc-staging-us-central1-330795877686-cs3qbyk4/inverted_index2')