# XKE Text Mining - Solutions
----
****

This notebook aims at introducing the user to the processing and analysis of text data in Spark with Scala.

## Settings
----

In [1]:
import os
os.environ['PYSPARK_PYTHON'] = 'python2'

In [2]:
import pyspark
sc = pyspark.SparkContext('local[*]')

KeyboardInterrupt: 

## 1 - Load data
----

### Text Corpus

##### Load the corpus of texts stored in `/opt/docker/notebooks/data/articles_blog/` into a DataFrame

> Hint 1: Texts are stored in directories associated with their year and month of release. To load every texts in one RDD, you can just use `*` instead of a directory name: 

`/articles_blog/*/*/*.txt`

> Hint 2: Each file has the following structure: Three information separated by `|`. The resulting DataFrame then must have three columns:
- title: String
- category: String
- content: String

> Hint 3: One way to proceed is to create a case class with the three targeted columns, load the data into a RDD thanks to the [sc.textFile](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkContext) function and then map it with the case class. The RDD will then have an associated schema, and you will therefore be able to create a DataFrame directly with the [createDataFrame](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.SQLContext) function with only the resulting rdd as argument.

In [221]:
rdd = sc.textFile('/Users/Yoann/Documents/Xebia/tests/text-mining/articles_blog/2014/08/*.txt')
TextInfo = Row('title', 'category', 'content')
rdd_parsed = rdd.map(lambda text: TextInfo(*text.split('|'))).first()
df = sqlContext.createDataFrame(rdd_parsed)

TypeError: Can not infer schema for type: <type 'unicode'>

In [1]:
df_corpus = (sqlContext
      .read
      .format('com.databricks.spark.csv')
      .options(header='false', delimiter='|')
      .load('/Users/Yoann/Documents/Xebia/tests/text-mining/articles_blog/*/*/*.txt')
      .withColumnRenamed('C0', 'title')
      .withColumnRenamed('C1', 'category')
      .withColumnRenamed('C2', 'content')
     )

In [2]:
df_corpus.printSchema()

root
 |-- title: string (nullable = true)
 |-- category: string (nullable = true)
 |-- content: string (nullable = true)



In [3]:
# Run this test to check if you got the right DataFrame size
assert(df_corpus.count() == 335)

### Stopwords

##### Load the stopwords stored in `/opt/docker/notebooks/data/stopwords_french.txt` in an Array[String]

In [11]:
stopwords = sc.textFile("/Users/Yoann/Documents/Xebia/tests/text-mining/stopwords_french.txt").collect()

In [12]:
len(stopwords)

237

In [14]:
# Run this test to check if you got the right array size
assert(len(stopwords) == 237)

## 2 - Tokenizer
----

Now that the data is available, it is time to pre-process it before we can use it in algorithms. The first thing to do is to tokenize each text to get an array of tokens (words) that will be used afterwards.

##### Create a function tokens: String => Array[String] with transforms a String into an array of tokens. The transformation can perform the following actions:

- Split on spaces (mandatory)
- Remove punctuation (can be done with `replaceAll("[^a-z\\sA-Z]","")` 
- Convert to lowercase
- Remove every stopwords

In [15]:
import re
punctuation = re.compile(r'[-.?!,":;()|0-9]')

In [17]:
def extract_tokens(sentence):
    """Tokenization of a given sentence.
       Drop stopwords, punctuations, numbers.
       Change the sentence to lowercase."""
    
    if(sentence) :
        tokens = punctuation.sub(' ', sentence.replace("'", " ")).lower().split()
        tokens_filtered = [word for word in tokens if (word not in stopwords_french) and (len(word) > 2)]
        return tokens_filtered
    else :
        return []

In [18]:
# Test your function
extract_tokens("Hello, World!")

['hello', 'world']

##### Create a Spark UDF (User Defined Function) which uses the previous tokens function

> Hint: Use the [udf](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.UserDefinedFunction) function with your previous function as unique argument

##### Add a new column named `tokens` to the df_corpus DataFrame containing the result of the tokenizer UDF used on the `content` column

> Hint 1: Use the [withColumn](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrame) method of the df_corpus DataFrame

> Hint 2: To apply the UDF on a DataFrame column, just do the following: `yourFunction(yourDF(col_name))`

In [23]:
from pyspark.sql.functions import udf
from pyspark.sql.types import *

df_tokens = df_corpus.withColumn('tokens', udf(lambda sentence: customTokenizer(sentence), ArrayType(StringType()))(df_corpus.content))

In [24]:
# Check if your DataFrame has the right column names
df_tokens.dtypes

[('title', 'string'),
 ('category', 'string'),
 ('content', 'string'),
 ('tokens', 'array<string>')]

In [25]:
df_tokens.show(2)

+--------------------+-------------+--------------------+--------------------+
|               title|     category|             content|              tokens|
+--------------------+-------------+--------------------+--------------------+
|2014-01-08-crafts...|        Craft|  Pour coder tous...|[coder, jours, pl...|
|2014-01-10-androi...|AndroidMobile|  En tant que dév...|[tant, développeu...|
+--------------------+-------------+--------------------+--------------------+
only showing top 2 rows



##### What are the 10 most used words in the corpus ?

> Hint 1: Use the `explode` function to have a DataFrame with one word per line

> Hint 2: You can perform the following operations
- Use the select function and use the explose function on the `tokens` column inside, name it "word"
- Group By the "word" column
- Use the count function to count the number of occurrences of each word
- Order By the count result, descending, and show the 10 first resulting words

In [26]:
from pyspark.sql.functions import explode, desc

df_words = df_tokens.select(explode(df_tokens.tokens).alias('word'))
df_words.groupBy('word').count().orderBy(desc('count')).show()

+-----------+-----+
|       word|count|
+-----------+-----+
|       plus| 1576|
|      cette|  909|
|      c’est|  804|
|       d’un|  787|
|       code|  625|
|      faire|  617|
|     projet|  584|
|      d’une|  583|
|       bien|  576|
|     permet|  559|
|    données|  527|
|      xebia|  458|
|    exemple|  418|
|      tests|  407|
|       test|  406|
|        the|  395|
|      n’est|  369|
|       faut|  360|
|       http|  357|
|application|  355|
+-----------+-----+
only showing top 20 rows



> You should observe that even with a descent tokenizing, the most used words are still not very usefull to characterize the articles.

## 3 - Word2Vec
----

Now that we have tokens, we can use them in some algorithms to extract useful features from them. One algorithm that we can use is [Word2Vec](https://spark.apache.org/docs/latest/ml-features.html#word2vec), which has an implementation in Sparl ML.

In [29]:
from pyspark.ml.feature import Word2Vec

### Training model

##### Instanciate a new Word2Vec object with the following settings
- inputCol: "tokens"
- outputCol: "w2c_features"
- vectorSize: 20
- minCount: 10
- maxIter: 20

In [30]:
word2Vec = Word2Vec(vectorSize=50, minCount=10, maxIter=50, inputCol='tokens', outputCol='w2c_features')

##### Train a model on the `df_tokens` DataFrame using the fit method

> This might take a few minutes to run depending on the parameters you chose

> You can try different parameters from the ones proposed, but avoid providing values that are too high when you don't work on a cluster

In [31]:
word2Vec_model = word2Vec.fit(df_tokens)

We now have on our hands a learned Word2Vec model that we can use and query.

##### Check how the learning phase went by finding synonyms of a few words of your choice

> Hint 1: Use the `findSynonyms(word, num_synonyms)` method called on the learned model

> Hint 2: The result of that function is a DataFrame. Use the show method on it to print the results

In [32]:
word2Vec_model.findSynonyms("data", 10).show()

+------------+------------------+
|        word|        similarity|
+------------+------------------+
|  randomrdds|1.5048996962724155|
|       spark|1.4857349004700415|
|      julien|1.4086488466388656|
| rdd[vector]|1.4075429554443495|
|     science|1.3309079500675987|
|         rdd|1.3281083572419359|
|labeledpoint|1.2972150087372551|
|         big|1.2776634622492866|
|      vector| 1.250472278752603|
|      pandas|1.2460220934070498|
+------------+------------------+

