# Using GraphLab Create with Apache Spark

In this notebook we demonstrate how to use Apache Spark with GraphLab Create. For this notebook, we will utilize [Apache Spark](http://spark.apache.org/) as a platform for doing large-scale data engineering.

The project is to learn a topic model using Wikipedia data, to see what topics are most represented in Wikipedia. The parts required for this project are:
1. [Set up environment](#Step-1:-Set-up-environment)
1. [Turn Raw Wikipedia text into Bag of Words, Using Spark](#Step-2:-Turn-Raw-Wikipedia-text-into-Bag-of-Words,-Using-Spark)
1. [Ingest Spark RDD as SFrame](#Step-3:-Ingest-Spark-RDD-as-SFrame)
1. [Learn Topic Model](#Step-4:-Learn-Topic-Model)
1. [Explore topics](#Step-5:-Explore-the-Topics)
1. [Save Results to Spark RDD](#Step-6:-Save-Results-to-Spark-RDD)

**Note:** Setting up Spark and PySpark are out of scope for this notebook, but are required for following along. 

By using PySpark and GraphLab Create together this notebook shows how easy it is to use both systems together. If you are interested in details of how Apache Spark integration with GraphLab Create happens, check out our open-source [spark-sframe package](https://github.com/turi-code/spark-sframe).

#### Note: This notebook requires GraphLab Create >=1.7 and Spark >=1.3

### Step 1: Set up environment

There are many different ways to configure PySpark, but in order to use it in a standalone Python script (not in pysspark shell or using spark-submit) a handful of environment variables need to be set up correctly. The most convenient way to set these environment variables is by setting them in the shell configuration (ex. ```~/.bash_profile``` or ```~/.zshrc```). For instructive purposes, here are the variables that need to be set. 

**Note:** Running this notebook as is may not configure these environment variables correctly.

In [1]:
# To use GraphLab Create within PySpark, you need to set the $SPARK_HOME and $PYTHONPATH
# environment variables on the driver. A common usage:
!export SPARK_HOME="your-spark-home-dir"
!export PYTHONPATH=$SPARK_HOME/python/:$SPARK_HOME/python/lib/py4j-0.8.2.1-src.zip:$PYTHONPATH

### Step 2: Turn Raw Wikipedia text into Bag of Words, Using Spark

In [2]:
import graphlab as gl
from pyspark import SparkContext
import os
import requests

In [3]:
# Set up the SparkContext object
# this can be 'local' or 'yarn-client' in PySpark
# Remember if using yarn-client then all the paths should be accessible
# by all nodes in the cluster.
sc = SparkContext('local')

Now that we have the SparkContext setup, let's download the Wikipedia data as an RDD. For this notebook we will only use a subset of the data, but there is code below to use the full dataset (which is about ~5GB).

#### Download the Wikipedia Data

In [4]:
import requests

def download_file(url, save_path):
    local_filename = url.split('/')[-1]
    r = requests.get(url, stream=True)
    with open(os.path.join(save_path, local_filename), 'wb') as f:
        for chunk in r.iter_content(chunk_size=1024): 
            if chunk: # filter out keep-alive new chunks
                f.write(chunk)
                f.flush()
    return local_filename

# File to download
file_list = [16] 

# If you want to use this entire Wikipedia dataset, uncomment the following line.
# This will download ~5GB of data split over 36 files.
# file_list = range(37)

# Download location for Wikipedia data
save_path = '/tmp/wikipedia'

# Actually download the files, if the location doesn't exist yet.
if not os.path.exists(save_path):
    os.mkdir(save_path)
    for idx in file_list: 
        url = 'https://static.turi.com/datasets/wikipedia/raw/w%d' % idx
        print "Downloading '%s', saving to: '%s'" % (url, save_path)
        download_file(url, save_path) # This will download 146MB of data.

In [5]:
rawRdd = sc.textFile('file:///%s/' % save_path).zipWithIndex()

Now that the rdd is defined, let's see the first few lines to confirm it is structured the way we want.

In [6]:
rawRdd.take(1)

[(u'alainconnes alain connes is one of the leading specialists on operator algebras  in his early work on von neumann algebras in the 1970s he succeeded in obtaining the almost complete classification of injective factors  following this he made contributions in operator ktheory and index theory which culminated in the baumconnes conjecture he also introduced cyclic cohomology in the early 1980s as a first step in the study of noncommutative differential geometry connes has applied his work in areas of mathematics and theoretical physics including number theory differential geometry and particle physics connes was awarded the fields medal in 1982 the crafoord prize in 2001 and the gold medal of the cnrs in 2004   he is a member of the french academy of sciences and several foreign academies and societies including the danish academy of sciences norwegian academy of sciences russian academy of sciences and us national academy of sciences ',
  0)]

This looks good, it has a document on each line, with a subsequent index value. Since we want to split documents by space, it is important to remove any extra spaces that exist. When examining the document closely we see there are extra spaces, so let's clean those up and split each document by space. Also, let's put the index for the document as the first entry, so we have an 'id' key and then the words.

In [7]:
# replace multiple spaces with a single space using re.sub, then trim whitespace and split on single space.
import re
splitRdd = rawRdd.map(lambda (a,b): (b, re.sub("[ ]+", " ", a).strip().split(" ")))
splitRdd.take(1)

[(0,
  [u'alainconnes',
   u'alain',
   u'connes',
   u'is',
   u'one',
   u'of',
   u'the',
   u'leading',
   u'specialists',
   u'on',
   u'operator',
   u'algebras',
   u'in',
   u'his',
   u'early',
   u'work',
   u'on',
   u'von',
   u'neumann',
   u'algebras',
   u'in',
   u'the',
   u'1970s',
   u'he',
   u'succeeded',
   u'in',
   u'obtaining',
   u'the',
   u'almost',
   u'complete',
   u'classification',
   u'of',
   u'injective',
   u'factors',
   u'following',
   u'this',
   u'he',
   u'made',
   u'contributions',
   u'in',
   u'operator',
   u'ktheory',
   u'and',
   u'index',
   u'theory',
   u'which',
   u'culminated',
   u'in',
   u'the',
   u'baumconnes',
   u'conjecture',
   u'he',
   u'also',
   u'introduced',
   u'cyclic',
   u'cohomology',
   u'in',
   u'the',
   u'early',
   u'1980s',
   u'as',
   u'a',
   u'first',
   u'step',
   u'in',
   u'the',
   u'study',
   u'of',
   u'noncommutative',
   u'differential',
   u'geometry',
   u'connes',
   u'has',
   u'applie

Now each document is a tuple of (index, list of words). Let's transform that into a list of (index, word) tuples instead. We will use flatMap for that.

In [8]:
zipRdd = splitRdd.flatMap(lambda (a,b): zip([a] * len(b),b))
zipRdd.take(1)

[(0, u'alainconnes')]

Great, now we have things formatted the way we want, let's start aggregating to generate word counts per document.

In [9]:
wordRdd = zipRdd.map(lambda composite_word: (composite_word, 1)).reduceByKey(lambda a, b: a + b)
wordRdd.take(2)

[((12267, u'from'), 1), ((45931, u'corporate'), 1)]

And finally, let's create a dictionary with word as the key and count as the value.

In [10]:
bagRdd = wordRdd.map(lambda (a,b):(a[0],(a[1],b))).groupByKey().map(lambda (a,b):(a,{word_count[0]:word_count[1] for word_count in b.data}))
bagRdd.take(1)

[(44715,
  {u'1810': 1,
   u'1920': 2,
   u'8000': 1,
   u'about': 2,
   u'ago': 1,
   u'and': 1,
   u'applecross': 2,
   u'at': 1,
   u'back': 1,
   u'bay': 1,
   u'be': 1,
   u'been': 1,
   u'camas': 1,
   u'can': 1,
   u'carron': 1,
   u'church': 1,
   u'coast': 1,
   u'corner': 1,
   u'cottages': 1,
   u'course': 1,
   u'eilean': 3,
   u'eileanmrcrowlinislands': 1,
   u'evidence': 1,
   u'excavations': 1,
   u'families': 2,
   u'from': 1,
   u'gaelic': 1,
   u'hannait': 1,
   u'has': 1,
   u'have': 1,
   u'home': 1,
   u'human': 1,
   u'in': 4,
   u'is': 1,
   u'island': 1,
   u'loch': 1,
   u'mesolithic': 1,
   u'mouth': 1,
   u'mr': 3,
   u'na': 1,
   u'near': 1,
   u'northeast': 1,
   u'of': 4,
   u'off': 1,
   u'on': 1,
   u'ross': 1,
   u'ruined': 1,
   u'scottish': 1,
   u'seen': 1,
   u'settlement': 1,
   u'several': 1,
   u'shown': 1,
   u'since': 1,
   u'situated': 1,
   u'slipped': 1,
   u'south': 1,
   u'the': 6,
   u'time': 1,
   u'times': 1,
   u'to': 3,
   u'uninhabit

### Step 3: Ingest Spark RDD as SFrame

Now that we have the raw Wikipedia text converted into a bag-of-words using Spark, it is easy to ingest that into GraphLab Create as an SFrame.

In [11]:
data = gl.SFrame.from_rdd(bagRdd,sc)
data = data.unpack('X1')
data.rename({'X1.0':'id','X1.1':'bag_of_words'})

[INFO] [1;32m1447228103 : INFO:     (initialize_globals_from_environment:282): Setting configuration variable GRAPHLAB_FILEIO_ALTERNATIVE_SSL_CERT_FILE to /Users/soroush/projects/graphlab-release-candidate/lib/python2.7/site-packages/certifi/cacert.pem
[0m[1;32m1447228103 : INFO:     (initialize_globals_from_environment:282): Setting configuration variable GRAPHLAB_FILEIO_ALTERNATIVE_SSL_CERT_DIR to 
[0mThis commercial license of GraphLab Create is assigned to engr@turi.com.

[INFO] Start server at: ipc:///tmp/graphlab_server-44276 - Server binary: /Users/soroush/projects/graphlab-release-candidate/lib/python2.7/site-packages/graphlab/unity_server - Server log: /tmp/graphlab_server_1447228103.log
[INFO] GraphLab Server Version: 1.6.916


id,bag_of_words
35840,"{'deco': 1, 'being': 1, 'able': 1, 'years': 1, ..."
33285,"{'september': 2, 'held': 13, 'mile': 1, ..."
17935,"{'switched': 1, 'money': 1, 'both': 1, 'years' ..."
22545,"{'and': 2, 'geologicallydistinct': ..."
23060,"{'playoffs': 1, 'september': 1, 'money': ..."
68270,"{'show': 1, 'german': 1, 'caused': 1, 'results': ..."
63510,"{'limited': 1, 'all': 4, 'code': 1, 'minors': 2, ..."
31255,"{'skip': 1, 'lack': 1, 'themes': 1, 'tenuous': ..."
42520,"{'and': 1, 'layer': 1, 'often': 2, 'process' ..."
43035,"{'help': 1, 'themed': 1, 'money': 1, 'global': 1, ..."


In [12]:
gl.canvas.set_target('ipynb')
data.show()

Looking at the most frequent words in the bag of words, it is obvious that 'stop words' are most prevalent. Let's remove them with one line, using GraphLab Create.

In [13]:
# Trim out stopwords
data['bag_of_words'] = data['bag_of_words'].dict_trim_by_keys(gl.text_analytics.stopwords(), exclude=True)
data.show()

Great, now the most frequent words are no longer stop words. We are ready to train a Topic Model on the data.

### Step 4: Learn Topic Model

Once we have an SFrame, training a Topic Model is one line. We are saying we are looking for the model to learn five topics, and to train for ten iterations.

In [18]:
# If running on entire dataset, might want to increase num_topics and num_iterations
model = gl.topic_model.create(data['bag_of_words'], num_topics=30, num_iterations=50)

PROGRESS: Learning a topic model
PROGRESS:        Number of documents     72269
PROGRESS:            Vocabulary size    631799
PROGRESS:    Running collapsed Gibbs sampling
PROGRESS: +-----------+---------------+----------------+-----------------+
PROGRESS: | Iteration | Elapsed Time  | Tokens/Second  | Est. Perplexity |
PROGRESS: +-----------+---------------+----------------+-----------------+
PROGRESS: | 10        | 33.58s        | 4.13992e+06    | 0               |
PROGRESS: | 20        | 58.34s        | 3.45949e+06    | 0               |
PROGRESS: | 30        | 1m 22s        | 3.81003e+06    | 0               |
PROGRESS: | 40        | 1m 48s        | 4.99686e+06    | 0               |
PROGRESS: | 50        | 2m 13s        | 3.10397e+06    | 0               |
PROGRESS: +-----------+---------------+----------------+-----------------+


### Step 5: Explore the Topics

First, let's get topic ids predicted from the model.

In [19]:
pred = model.predict(data['bag_of_words'])
pred

dtype: int
Rows: 72269
[25, 22, 24, 3, 11, 17, 0, 6, 4, 27, 7, 4, 19, 6, 20, 27, 23, 17, 9, 18, 14, 6, 28, 16, 7, 4, 20, 29, 10, 28, 3, 18, 21, 29, 2, 25, 6, 22, 23, 3, 23, 5, 26, 17, 7, 0, 21, 25, 23, 22, 12, 5, 22, 27, 6, 4, 12, 20, 17, 24, 1, 13, 5, 20, 20, 7, 20, 24, 29, 17, 20, 29, 6, 7, 15, 5, 20, 27, 5, 19, 13, 7, 22, 7, 10, 6, 29, 23, 18, 15, 4, 27, 17, 26, 18, 6, 11, 15, 17, 29, ... ]

Well, that is just showing predicted topic_id. Instead, let's add a column with the topic_id we just predicted, and create that as our results SFrame.

In [20]:
results = gl.SFrame({'doc_id':data['id'], 'topic_id':pred, 'bag_of_words':data['bag_of_words']})
results.swap_columns('doc_id', 'bag_of_words') # better SFrame formatting
results.print_rows(max_column_width=60)

+--------+-------------------------------------------------------------+----------+
| doc_id |                         bag_of_words                        | topic_id |
+--------+-------------------------------------------------------------+----------+
| 35840  | {'deco': 1, 'years': 1, 'held': 1, 'lyric': 4, 'headline... |    25    |
| 33285  | {'september': 2, 'held': 13, 'mile': 1, 'milwaukee': 1, ... |    22    |
| 17935  | {'strongly': 1, 'week': 1, 'google': 1, 'employees': 1, ... |    24    |
| 22545  | {'geologicallydistinct': 1, 'valleys': 1, 'upper': 1, 'n... |    3     |
| 23060  | {'playoffs': 1, 'september': 1, 'money': 1, 'lack': 1, '... |    11    |
| 68270  | {'8501': 1, 'specifies': 1, 'representative': 1, 'method... |    17    |
| 63510  | {'limited': 1, 'code': 1, 'minors': 2, 'chinese': 1, 'gl... |    0     |
| 31255  | {'show': 1, 'german': 7, 'skip': 1, 'differences': 2, 'l... |    6     |
| 42520  | {'layer': 1, 'process': 1, 'text': 1, 'distinguish': 1, ... |    

Now let's see which topic ids appear most frequently in this set of Wikipedia data

In [21]:
gl.canvas.set_target('ipynb')
results['topic_id'].show('Categorical')

Looking at this tells us that topic ids 22 and 6 are more common in this dataset. Let's find out what words are associated with those topics.

In [22]:
model.get_topics([22], output_type='topic_words').print_rows(max_column_width=100)

+--------------------------------+
|             words              |
+--------------------------------+
| [club, team, won, world, game] |
+--------------------------------+
[1 rows x 1 columns]



In [23]:
model.get_topics([6], output_type='topic_words').print_rows(max_column_width=100)

+--------------------------------------+
|                words                 |
+--------------------------------------+
| [music, band, song, released, songs] |
+--------------------------------------+
[1 rows x 1 columns]



Interesting. Wonder what this set of documents is about. Let's get the full list of topic words learned by the model.

In [24]:
topics = model.get_topics()
topics = topics.rename({'topic':'topic_id'})
topics

topic_id,word,score
0,government,0.0117483595063
0,court,0.0107839563289
0,law,0.00991599346935
0,police,0.00893230222847
0,act,0.00818006775015
1,game,0.0160061392737
1,made,0.00810626617731
1,player,0.00724486342588
1,original,0.00677612289733
1,version,0.00654543384139


That SFrame is less useful, let's groupby all the same topic ids and create a list of words.

In [25]:
topics.groupby(['topic_id'], {'topic_words':gl.aggregate.CONCAT("word")}).print_rows(max_column_width=80)

+----------+----------------------------------------------------+
| topic_id |                    topic_words                     |
+----------+----------------------------------------------------+
|    13    |    [chinese, china, japanese, japan, language]     |
|    24    | [company, business, million, services, management] |
|    2     |      [season, league, played, team, football]      |
|    26    |       [age, population, 18, years, average]        |
|    8     |     [states, members, people, united, public]      |
|    15    |       [war, army, military, forces, battle]        |
|    28    |       [station, line, city, railway, built]        |
|    14    |            [2009, 2008, 2010, 2007, 1]             |
|    20    |  [school, university, college, church, building]   |
|    19    | [research, medical, science, development, project] |
+----------+----------------------------------------------------+
[30 rows x 2 columns]



This is the appropriate format for recording the topics learned, by topic_id.

Great, so now we have the results SFrame and the Topics SFrame, both of which can be saved back as Spark RDDs.

### Step 6: Save Results to Spark RDD

So now we have all the results ready as two SFrames. The first has the bag-of-words with the predicted topic_id, and the second has the topic words for each topic_id. These are both tables we can save as Spark RDDs, so subsequent Spark programs can utilize the findings from the Topic Model.

In [26]:
# to save the predictions as an RDD
predictions_rdd = data.to_rdd(sc)
predictions_rdd.saveAsTextFile('file:///tmp/predictions.rdd')

/var/tmp/graphlab-soroush/44276/196a51bf-f47b-4dba-957f-82d885f05cec


In [27]:
# save the topic_ids with their topic words
topics_rdd = topics.to_rdd(sc)
topics_rdd.saveAsTextFile('file:///tmp/topics.rdd')

/var/tmp/graphlab-soroush/44276/a5bd2c8a-c9f0-4a33-8d42-993ecd5167e1


And that's it! GraphLab Create works well with Apache Spark, allowing you to leverage what you've already built in Spark with GraphLab Create. No need to save to intermediary formats just to train ML models in GraphLab Create.

For more information on using Apache Spark with GraphLab Create, check out the [User Guide section](https://turi.com/learn/userguide/#Spark_Integration) or [spark-sframe package](https://github.com/turi-code/spark-sframe).