# Analyzing the Gutenberg Books Corpus - part 2

In this notebook, we will use the Gutenberg Corpus in the same form as last week. 

In the [first analysis notebook](https://github.com/dslab2018/dslab2018.github.io/blob/master/notebooks/DSLab_week7_gutenberg_corpus.ipynb) we explored various RDD methods and in the end built an N-gram viewer for the gutenberg books project. Now, we will use the corpus to train a simple language classification model using [Spark's machine learning library](http://spark.apache.org/docs/latest/mllib-guide.html) and Spark DataFrames.

<div class="alert alert-success">
<h3>The structure of this lab is as follows:</h3>

<ol>
    <li>initializing Spark and loading data</li>
    <li>construction of Spark DataFrames</li>
    <li>using core DataFrame functionality and comparisons to RDD methods</li>
    <li>using the Spark ML library for vectorization</li>
    <li>building a classifier pipeline</li>
</div>

## Set up and launch the Spark runtime

Remember from the previous notebook that we have a saved configuration in `./spark_config/` -- so all we need to do is set the `SPARK_CONF_DIR` environment variable and our default configuration will be used: 

In [1]:
# set this to the base spark directory on your system
spark_home = '/users/mathilde/spark-2.2.0-bin-hadoop2.7'
try:
    import findspark
    findspark.init(spark_home)
except ModuleNotFoundError as e:
    print('Info: {}'.format(e))

import getpass
import pyspark

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .master("local[2]") \
    .appName("Gutenberg text modelling") \
    .config("spark.driver.host", "localhost") \
    .getOrCreate()

In [3]:
sc = spark.sparkContext
spark

## Load the data

**TODO**: 
* download the gutenberg_cleaned_rdd from [here](https://polybox.ethz.ch/index.php/s/rv4VTSmXvJhvq9B) and extract it into the `data` directory in the base path of this repository.
* load this as `cleaned_rdd` using `sc.sequenceFile`.

In [4]:
cleaned_rdd = sc.sequenceFile( '../data/gutenberg_cleaned_rdd' )

In [5]:
%time cleaned_rdd.cache().count()

CPU times: user 38.1 ms, sys: 13.6 ms, total: 51.7 ms
Wall time: 5.48 s


25198

In [6]:
cleaned_rdd.first()[1][:200]

'et_dunlap_publishers_made_in_the_united_states_of_america_copyright_1909_by_w_j_watt_company_with_love_and_gratitude_i_dedicate_this_book_to_my_father_tess_of_the_storm_country_chapter_i_one_september'

Note that there were a few further pre-processing steps: we removed all punctuation, made the text lowercase, and replaced whitespace characters with "_".

### Load in the metadata dictionary and broadcast it

Just as in the previous notebook, we will load our pre-generated metadata dictionary and broadcast it to all the executors. 

In [7]:
import json

with open('../data/gutenberg_metadata.json', 'r') as f :
    meta = json.load(f)

In [8]:
# TODO: create meta_b by broadcasting meta_dict
meta_b = sc.broadcast(meta)

## DataFrames

A [`DataFrame`](http://spark.apache.org/docs/latest/sql-programming-guide.html#creating-dataframes) is analogous to Pandas or R dataframes. They are since v2.0 the "official" API for Spark and importantly, the development of the [machine learning library](http://spark.apache.org/docs/latest/ml-guide.html) is focused exclusively on the DataFrame API. Many low-level optimizations have been developed for DataFrames in recent versions of Spark, so that the overheads of using Python with Spark have also been minimized somewhat. Using DataFrames allows you to specify types for your operations which means that they can be offloaded to the Scala backend and optimized by the runtime. 

However, you frequently will find that there simply is no easy way of doing a particular operation with the DataFrame methods and will need to resort to the lower-level RDD API. 

## Creating a DataFrame

Here we will create a DataFrame out of the RDD that we were using in the previous excercies. The DataFrame is a much more natural fit for this dataset. The inclusion of the book metadata is much more natural here, simply as columns which can then be used in queries. 

To begin, we will map the RDD elements to type [Row](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.Row) and recast the data as a DataFrame. Note that we are lazy here and are just using the default `StringType` for all columns, but we could be more specific and use e.g. `IntegerType` for the `gid` field. 

In [9]:
from pyspark.sql import Row
from pyspark.sql.types import IntegerType, StringType, ArrayType, StructField, StructType

# set up the Row 
df = spark.createDataFrame(
    cleaned_rdd.map(lambda x: Row(**meta_b.value[x[0]], text=x[1])), 
).cache()

For inspection, the `Row` class can be conveniently cast into a `dict`:

In [10]:
# first row
df.first().asDict()

{'author_id': '7802',
 'author_name': ['White', ' Grace Miller'],
 'birth_year': '1873',
 'death_year': '1952',
 'downloads': '29',
 'first_name': 'Grace Miller',
 'gid': '22064',
 'language': 'en',
 'last_name': 'White',
 'license': 'Public domain in the USA.',
 'subtitle': '',
 'text': 'et_dunlap_publishers_made_in_the_united_states_of_america_copyright_1909_by_w_j_watt_company_with_love_and_gratitude_i_dedicate_this_book_to_my_father_tess_of_the_storm_country_chapter_i_one_september_afternoon_not_many_years_ago_three_men_sat_on_the_banks_of_cayuga_lake_cleaning_the_fish_they_had_caught_in_their_nets_the_previous_night_when_they_glanced_up_from_their_work_and_looked_beyond_the_southern_borders_of_the_lake_they_could_see_rising_from_the_mantle_of_forestry_the_towers_and_spires_of_cornell_university_in_ithaca_city_an_observer_would_have_noticed_a_sullen_look_of_hatred_pass_unconsciously_over_their_faces_as_their_eyes_lighted_on_the_distant_buildings_for_the_citizens_of_ithaca_were_the_

In [11]:
df.columns

['author_id',
 'author_name',
 'birth_year',
 'death_year',
 'downloads',
 'first_name',
 'gid',
 'language',
 'last_name',
 'license',
 'subtitle',
 'text',
 'title']

The DataFrame includes convenience methods for quickly inspecting the data. For example:

In [12]:
df.describe('birth_year').show()

+-------+------------------+
|summary|        birth_year|
+-------+------------------+
|  count|             20934|
|   mean|1829.9672587614018|
| stddev|114.48079532175812|
|    min|           -100 BC|
|    max|               973|
+-------+------------------+



Certain operations are much more covenient with the DataFrame API, such as `groupBy`, which yields a special [`GroupedData`](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.GroupedData) object. Check out the API for the different operations you can perform on grouped data -- here we will use `count` to get the equivalent of our author-count from the previous exercise:

In [13]:
# TODO: complete the cell to use groupBy to show the most common authors
from pyspark.sql.functions import desc

(df.groupby( "author_name" )
   .count()
   .sort(desc("count"))
   .show()
)

+--------------------+-----+
|         author_name|count|
+--------------------+-----+
|           [Various]| 1654|
|                null|  835|
|         [Anonymous]|  278|
|[Balzac,  Honoré de]|  121|
|[Kingston,  Willi...|  113|
|      [Twain,  Mark]|  104|
|[Ballantyne,  R. ...|   95|
|[Jacobs,  W. W. (...|   94|
|           [Unknown]|   92|
|[Shakespeare,  Wi...|   87|
|    [Pepys,  Samuel]|   85|
|[Fenn,  George Ma...|   83|
| [Dumas,  Alexandre]|   75|
|     [Verne,  Jules]|   74|
|     [Sand,  George]|   73|
|[Howells,  Willia...|   70|
|[Churchill,  Wins...|   67|
| [Dickens,  Charles]|   61|
|[Henty,  G. A. (G...|   60|
|[Doyle,  Arthur C...|   58|
+--------------------+-----+
only showing top 20 rows



### Accessing columns

Columns can be accessed in a variety of ways, and usually you can just pass a column name to DataFrame methods: 

In [14]:
df.select('birth_year').show()

+----------+
|birth_year|
+----------+
|      1873|
|      1871|
|      1870|
|      1842|
|      1774|
|      1850|
|      1802|
|      1802|
|      null|
|      null|
|      1925|
|      1863|
|      1843|
|      1873|
|      1886|
|      1886|
|      1856|
|      1791|
|      1858|
|      null|
+----------+
only showing top 20 rows



However, columns are also objects, so they have methods of their own that can be useful. See [here](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=isin#pyspark.sql.Column). You can access them very simply like this:

In [15]:
df.birth_year

Column<b'birth_year'>

### Creating new columns

Lets make a new column with a publication date similar to the previous notebook:

In [16]:
df = df.withColumn('publication_year', (df.birth_year + 40))

**TODO**: Show author name, title and publication year; sort by publication_year in descending order of publication year

Hint: you can pass column names, column instances (e.g. `df.birth_year`) or an SQL query (for this you need to register a table first)

In [17]:
df.select(df.author_name,df.title,df.publication_year).sort(df.publication_year.desc()).show()

+-----------------+--------------------+----------------+
|      author_name|               title|publication_year|
+-----------------+--------------------+----------------+
|    [Blade,  Zoë]|            Identity|          2021.0|
|    [Blade,  Zoë]|     Less than Human|          2021.0|
|[Doctorow,  Cory]|Super Man and the...|          2011.0|
|[Doctorow,  Cory]|          Printcrime|          2011.0|
|[Doctorow,  Cory]|           Craphound|          2011.0|
|[Doctorow,  Cory]|Return to Pleasur...|          2011.0|
|[Doctorow,  Cory]|Shadow of the Mot...|          2011.0|
|[Doctorow,  Cory]|  A Place so Foreign|          2011.0|
|[Doctorow,  Cory]|      Little Brother|          2011.0|
|[Doctorow,  Cory]|Someone Comes to ...|          2011.0|
|[Doctorow,  Cory]|Home Again, Home ...|          2011.0|
|[Doctorow,  Cory]|Eastern Standard ...|          2011.0|
|[Doctorow,  Cory]|Ebooks: Neither E...|          2011.0|
|[Camacho,  Jorge]|La Majstro kaj Ma...|          2006.0|
|[Camacho,  Jo

# Language classification with Spark ML

Here we will use some of the same techniques we developed in the last excercise, but this time we will use the built-in methods of the [Spark ML library](http://spark.apache.org/docs/2.2.0/api/python/pyspark.ml#) instead of coding up our own transformation functions. We will apply the N-Gram technique to build a simple language classification model. 

The method is rather straightforward and outlined in [Cavnar & Trenkle 1994](http://odur.let.rug.nl/~vannoord/TextCat/textcat.pdf):

For each of the English/German training sets:

1. tokenize the text (spaces are also tokens, so we replace them with "_")
2. extract N-grams where 1 < N < 5
3. determine the most common N-grams for each corpus
4. encode both sets of documents using the combined top ngrams


## Character tokens vs. Word tokens
In the last notebook, we used words as "tokens" -- now we will use characters, even accounting for white space (which we have replaced with "_" above). We will use the two example sentences again:

    document 1: "a dog bit me"
    document 2: "i bit the dog back"

## SparkML feature transformers

The SparkML library includes many data transformers that all support the same API (much in the same vein as Scikit-Learn). Here we are using the [`CountVectorizer`](http://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.feature.CountVectorizer), [`NGram`](http://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.feature.NGram) and [`RegexTokenizer`](http://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.feature.RegexTokenizer). 

In [18]:
from pyspark.ml.feature import CountVectorizer, NGram, RegexTokenizer

### Define the transformations

We instantiate the three transformers that will be applied in turn. We will pass the output of one as the input of the next -- in the end our DataFrame will contain a column `vectors` that will be the vectorized version of the documents. 

In [19]:
regex_tokenizer = RegexTokenizer(inputCol="text", outputCol="tokens", gaps=False, pattern='\S')
ngram = NGram(n=2, inputCol='tokens', outputCol='ngrams')
count_vectorizer = CountVectorizer(inputCol="ngrams", outputCol="vectors", vocabSize=1000)

So lets see what this does to our test sentences:

In [20]:
test_df = spark.createDataFrame([('a dog bit me',), ('i bit the dog back',)], ['text'])

test_df.collect()

[Row(text='a dog bit me'), Row(text='i bit the dog back')]

**TODO** Figure out how to run the `test_df` through the two transformers and generate an `test_ngram_df`. `show()` the `text`, `tokens`, and `ngrams` columns.

In [21]:
test_ngram_df = ngram.transform( regex_tokenizer.transform( test_df ) )
test_ngram_df.show()

+------------------+--------------------+--------------------+
|              text|              tokens|              ngrams|
+------------------+--------------------+--------------------+
|      a dog bit me|[a, d, o, g, b, i...|[a d, d o, o g, g...|
|i bit the dog back|[i, b, i, t, t, h...|[i b, b i, i t, t...|
+------------------+--------------------+--------------------+



**TODO**: Fit the `CountVectorizer` with `n=2` ngrams and store in `test_cv_model`:

In [22]:
test_cv_model = count_vectorizer.fit(test_ngram_df)

In [23]:
# inspect the constructed vocabulary
test_cv_model.vocabulary

['d o',
 'g b',
 'b i',
 'o g',
 'i t',
 'a d',
 'b a',
 't m',
 'i b',
 'h e',
 'a c',
 'e d',
 't t',
 'm e',
 't h',
 'c k']

**TODO**: transform `test_ngram_df` into vectors and `show` them:

In [24]:
test_cv_model.transform( test_ngram_df ).show()

+------------------+--------------------+--------------------+--------------------+
|              text|              tokens|              ngrams|             vectors|
+------------------+--------------------+--------------------+--------------------+
|      a dog bit me|[a, d, o, g, b, i...|[a d, d o, o g, g...|(16,[0,1,2,3,4,5,...|
|i bit the dog back|[i, b, i, t, t, h...|[i b, b i, i t, t...|(16,[0,1,2,3,4,6,...|
+------------------+--------------------+--------------------+--------------------+



## ML Pipelines

Keeping track of these steps is a bit tedious -- if we wanted to repeat the above steps on different data, we would either have to write a wrapper function or re-execute all the cells again. It would be great if we could create a *pipeline* that encapsulated these steps and all we had to do was provide the inputs and parameters. 

The Spark ML library includes this concept of [Pipelines](https://spark.apache.org/docs/2.2.0/ml-pipeline.html) and we can use it to simplify complex ML workflows.

In [25]:
from pyspark.ml import Pipeline

**TODO:** define a `list` of pipeline stages that consists of the tokenizers and the `CountVectorizer` we defined above:

In [26]:
cv_pipeline = Pipeline(
    stages=[
        regex_tokenizer,
        ngram,
        count_vectorizer
    ]
)

In [27]:
# Executing the pipeline
(
    cv_pipeline.fit(test_df)
               .transform(test_df)
               .show()
)

+------------------+--------------------+--------------------+--------------------+
|              text|              tokens|              ngrams|             vectors|
+------------------+--------------------+--------------------+--------------------+
|      a dog bit me|[a, d, o, g, b, i...|[a d, d o, o g, g...|(16,[0,1,2,3,4,5,...|
|i bit the dog back|[i, b, i, t, t, h...|[i b, b i, i t, t...|(16,[0,1,2,3,4,6,...|
+------------------+--------------------+--------------------+--------------------+



This is much more concise and much less error prone! The really cool thing about pipelines is that I can now very easily change the parameters of the different components. Imagine we wanted to fit trigrams (`n=3`) instead of bigrams (`n=2`), and we wanted to change the name of the final column. We can reuse the same pipeline but feed it a *parameter map* specifying the changed parameter value:

In [28]:
# note the dictionaries added to fit() and transform() arguments
(
    cv_pipeline.fit(test_df, {ngram.n:3})
               .transform(test_df, {count_vectorizer.outputCol: 'new_vectors'})
               .show()
)

+------------------+--------------------+--------------------+--------------------+
|              text|              tokens|              ngrams|             vectors|
+------------------+--------------------+--------------------+--------------------+
|      a dog bit me|[a, d, o, g, b, i...|[a d o, d o g, o ...|(16,[0,1,2,9,11,1...|
|i bit the dog back|[i, b, i, t, t, h...|[i b i, b i t, i ...|(16,[0,1,2,3,4,5,...|
+------------------+--------------------+--------------------+--------------------+



### Building a more complex pipeline

For our language classification we want to use ngrams 1-3. We can build a function that will yield a pipeline with this more complex setup. Our procedure here is like this:

1. tokenize as before
2. assemble the ngram transformers to yield n=1, n=2, etc columns
3. vectorize using each set of ngrams giving partial vectors
4. assemble the vectors into one complete feature vector using [VectorAssembler](http://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.feature.VectorAssembler)

In [30]:
from pyspark.ml.feature import VectorAssembler

def ngram_vectorize(min_n=1, max_n=1, min_df=1):
    """Use a range of ngrams to vectorize a corpus"""
    
    tokenizer = RegexTokenizer(inputCol="text", outputCol="tokens", gaps=False, pattern='\S')
    ngrams = []    
    count_vectorizers = []
    
     
    for i in range(min_n, max_n+1):
        ngrams.append(
            NGram(n=i, inputCol='tokens', outputCol='ngrams_'+str(i))
        )
        count_vectorizers.append(
            CountVectorizer(inputCol='ngrams_'+str(i), outputCol='vectors_'+str(i), vocabSize=1000, minDF=min_df)
        )
    
    assembler = VectorAssembler(
        inputCols=['vectors_'+str(i) for i in range(min_n, max_n+1)], outputCol='features')
    
    return Pipeline(stages=[tokenizer] + ngrams + count_vectorizers + [assembler])

In [31]:
ngram_vectorize(1,3).fit(test_df).transform(test_df).select('features').show(truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|features                                                                                                                                                                         |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|(44,[0,1,2,3,4,5,6,7,11,12,13,14,15,16,17,19,25,28,29,30,37,39,40,41],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])         |
|[2.0,2.0,2.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,0.0,1.0,1.0,1.0,1.0,1.0,0.0,1.0,0.0,1.0,1.0,1.0,1.0,1.0,0.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,0.0,1.0,0.0,0.0,0.0,1.0,1.0]|
+---------------------------------------------------------------------------------------------------

### Preparing the DataFrames and models

For our language classifier we will use just two languages (English and either German or French). We need to create a DataFrame that is filtered to just include those languages. 

In addition, we will need this step of transforming raw string documents into vectors when we try the classifier on new data. We should therefore save the fitted NGram model for later. 

**TODO:** use the [`isin`](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=isin#pyspark.sql.Column.isin) method of the `language` column to filter the DF down to "en", "de" and "fr" languages.

In [32]:
lang_df = df.filter( df.language.isin('en','de','fr') ).cache()

**TODO:** Construct the `ngram_model` by using `ngram_vectorize`:

In [39]:
ngram_model = ngram_vectorize().fit(lang_df)

In [40]:
ngram_model.transform(lang_df).select('features').first()

Row(features=SparseVector(700, {0: 1720.0, 1: 973.0, 2: 690.0, 3: 522.0, 4: 471.0, 5: 518.0, 6: 520.0, 7: 500.0, 8: 402.0, 9: 541.0, 10: 284.0, 11: 329.0, 12: 187.0, 13: 156.0, 14: 136.0, 15: 142.0, 16: 164.0, 17: 175.0, 18: 109.0, 19: 153.0, 20: 145.0, 21: 48.0, 22: 82.0, 23: 4.0, 24: 13.0, 25: 1.0, 26: 9.0, 28: 3.0, 32: 1.0, 33: 2.0}))

## Building the classifier

We have successfully transformed the dataset into a representation that we can (almost) feed into a classifier. What we need still is a label column as well the final stage of the pipeline that will fit the actual model. 

To generate labels from the language column, we will use the `StringIndexer` as a part of our pipeline. For the classification we will use the simplest possible `LogisticRegression` -- once you've convinced yourself that you know how it works, go ahead and experiment with other [classifiers](http://spark.apache.org/docs/latest/api/python/pyspark.ml#module-pyspark.ml.classification).

In [41]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import StringIndexer

**TODO:** Set up a `classification_pipeline`. Use the N-gram model we defined above as a starting stage, followed by a `StringIndexer` and a `LogisticRegression` classifier. Make sure you read the documentation on these!

Note that we can use the pre-trained N-gram model -- the `Pipeline` will automatically infer that the stage is already complete and will only use it in the transformation step. 

In [42]:
classification_pipeline = Pipeline(
    stages=[ngram_model, 
            StringIndexer(inputCol='language', outputCol='label'),
            LogisticRegression(regParam=0.002, elasticNetParam=1, maxIter=10)
           ]
)

Run the classifier! The fitting will take a while -- you may want to run this first on a subset of the data

**TODO:** Use the `randomSplit` DataFrame method to generate `training` and `test` data:

In [44]:
# Split the training and test sets
training, test = lang_df.randomSplit([1.0, 2.0], 24)

In [45]:
%%time 
classifier = classification_pipeline.fit(training)

CPU times: user 341 ms, sys: 138 ms, total: 479 ms
Wall time: 60 s


In [46]:
# check the predictions 
for lang in ['en', 'fr', 'de']:
    print('Predictions for {0}'.format(lang))
    (classifier.transform(
        test.filter(test.language == lang))
            .select('label', 'probability', 'prediction')
            .show(10, truncate=False))

Predictions for en
+-----+----------------------------------------------------------------+----------+
|label|probability                                                     |prediction|
+-----+----------------------------------------------------------------+----------+
|0.0  |[0.9979993781099208,0.001039362152528426,9.612597375505918E-4]  |0.0       |
|0.0  |[0.9976538228914836,0.0015818677645751845,7.643093439411799E-4] |0.0       |
|0.0  |[0.9979762175896993,0.0011322976565881265,8.914847537125246E-4] |0.0       |
|0.0  |[0.996407261995345,0.0021970487479567513,0.0013956892566981687] |0.0       |
|0.0  |[0.9986060039348046,8.060316509569959E-4,5.879644142384917E-4]  |0.0       |
|0.0  |[0.997611633941847,0.001537629594641789,8.507364635112391E-4]   |0.0       |
|0.0  |[0.996996826984687,0.002120659447283608,8.825135680295064E-4]   |0.0       |
|0.0  |[0.9979594567837877,8.954057733882497E-4,0.0011451374428241771] |0.0       |
|0.0  |[0.9958908037837282,0.0028649234477379553,0.001244

You should be seeing mostly good agreement between `label` and `prediction`.

### Improving the model and continuing the exploration of the data

We have completed the basic model training, but many improvements are possible. One obvious improvement is hyperparameter tuning -- check out the [docs](http://spark.apache.org/docs/latest/ml-tuning.html#ml-tuning-model-selection-and-hyperparameter-tuning) for some examples and try it out!

Some other ideas for things you could do with this dataset: 

* try other [classifiers that are included in MLlib](http://spark.apache.org/docs/latest/mllib-classification-regression.html)
* build a regression model to predict year of publication (may be better with word ngrams)
* do clustering on the english books and see if sub-groups of the language pop up
* cluster by author -- do certain authors write in similar ways?

In [47]:
spark.stop()