# Extração de Features e Aprendizagem Não-Supervisionada com PySpark

## 1. Descrição do problema

### As Ferramentas

Neste tutorial, demonstraremos algumas funcionalidades do framework Spark para Python, especialmente o Word2Vec (algorítmo utilizado para extraçãode features numericas a partir de textos) e o K-means (algorítmo de aprendizagem não supervisionada muito utilizado para clusterização).

### O Dataset

O dataset é composto por abstracts curtos extraídos da Wikipedia, e está disponível <a href="http://wiki.dbpedia.org/services-resources/datasets/data-set-35/data-set-351">aqui</a> sob a categoria "short abstracts - en". O arquivo nt extraído ocupa cerca de 1.3GB. 

### O Problema

Nosso objetivo neste tutorial é dividir nossos documentos em grupos de documentos similares entre si. Para isso, precisaremos primeiramente preparar o texto utilizando a ferramenta <b>Word2Vec</b>, que extrairá para cada abstract um vetor de features númericas correspondentes. Utilizaremos, então, o algorítmo <b>K-means</b>, que utilizará a distância entre esses vetores de features númericas para dividir os dados em grupos. Para obter melhores resultados e performance no K-means, utilizaremos em conjunto a ferramenta <b>PCA</b>, que diminuíra a dimensionalidade dos nossos vetores de features númericas para que possamos trabalhar somente com as informações essenciais.

## 2. Configurações Básicas

Primeiramente, utilizamos a ferramenta "findpsark" para localizar o Spark (que já deve estar previamente instalado):

In [2]:
import findspark
findspark.init("/usr/local/spark") # substitua esse campo com o path para o Spark nos seu computador

Agora que o Spark foi localizado, podemos importá-lo para o nosso Jupyter Notebook e criar uma nova sessão:

In [23]:
from pyspark.sql import SparkSession, SQLContext
spark = SparkSession.builder    \
        .master("local[3]")    \
        .appName("Clustering")    \
        .config("spark.executor.memory", "4g")    \
        .getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)

Pronto! Agora que o Spark já está configurado, podemos ler os dados. 

In [24]:
data = sc.textFile("../short_abstracts_en.nt")

Vamos analisar os três primeiros item de nosso dataset para termos uma noção melhor sobre os dados que estamos utilizando:

In [5]:
data.take(3)

[u'<http://dbpedia.org/resource/Albedo> <http://www.w3.org/2000/01/rdf-schema#comment> "The albedo of an object is a measure of how strongly it reflects light from light sources such as the Sun. It is therefore a more specific form of the term reflectivity. Albedo is defined as the ratio of total-reflected to incident electromagnetic radiation. It is a unitless measure indicative of a surface\'s or body\'s diffuse reflectivity."@en .',
 u'<http://dbpedia.org/resource/Anarchism> <http://www.w3.org/2000/01/rdf-schema#comment> "Anarchism is a political philosophy which considers the state undesirable, unnecessary and harmful, and instead promotes a stateless society, or anarchy. It seeks to diminish or even abolish authority in the conduct of human relations. Anarchists may widely disagree on what additional criteria are required in anarchism."@en .',
 u'<http://dbpedia.org/resource/Achilles> <http://www.w3.org/2000/01/rdf-schema#comment> "In Greek mythology, Achilles was a Greek hero of 

Vemos acima que os dados seguem um mesmo formato: começando com dois links entre tags, seguidos por uma definição curta de algum conceito, seguido por "@en .", conotação que deve indicar a linguagem dos textos. Poderiamos limpar os dados removendo estes links, mas talvez eles sejam uteis para o agrupamento dos textos, então, primeiramente, vamos deixa-los aonde estão. <br> Dando continuidade à nossa rápida exploração do dataset, vamos ver quantas instâncias ele contém:

In [6]:
data.count()

3129527

Vemos que noss dataset possui mais de 3 milhões destes textos curtos. Felizmente, o Spark foi projetado para trabalhar com grandes volumes de dados e nos permitirá trabalhar com este dataset com facilidade. 

## 3. Extração de Features

Agora que o Spark está funcionando e os dados já foram lidos, damos início ao próximo passo de nosso projeto: a extração de features numéricas à partir dos textos. Esse tipo de transformação é aplicado pois facilita o trabalho dos algorítmos de aprendizagem de máquina. O k-means, por exemplo, faz a "clusterização" de dados à partir das distâncias entre eles. Como calcular a distância entre dois textos? Primeiro, representamos estes textos númericamente (utilizando algorítmos de extração de features numéricas), e então fornecemos estes vetores aos algorítmos de aprendizagem de máquina ao invés de utilizar os textos completos. 

Primeiramente, precisamos transformar nossa RDD de dados em uma dataframe, já dividindo nossos textos em palavras:

In [25]:
documentDF = spark.createDataFrame(data.map(lambda text: (text.split(" "),)), ["text"])

<b>Observação</b>: o Spark possui duas bibliotecas de Machine Learning diferentes, spark.ml e spark.mllib. A diferença entre as duas é que a spark.mllib é focada em RDDs (tipo de dados "básico" do Spark) e a spark.ml é focada em Spark Dataframes (tipo de abstração em cima das RDDs que facilita o trabalho com "colunas" de features). Neste tutorial, trabalharemos com a biblioteca de Dataframes spark.ml.

Vamos ver se a dataframe foi criada corretamente analisando seu primeiro item:

In [19]:
documentDF.first()

Row(text=[u'<http://dbpedia.org/resource/Albedo>', u'<http://www.w3.org/2000/01/rdf-schema#comment>', u'"The', u'albedo', u'of', u'an', u'object', u'is', u'a', u'measure', u'of', u'how', u'strongly', u'it', u'reflects', u'light', u'from', u'light', u'sources', u'such', u'as', u'the', u'Sun.', u'It', u'is', u'therefore', u'a', u'more', u'specific', u'form', u'of', u'the', u'term', u'reflectivity.', u'Albedo', u'is', u'defined', u'as', u'the', u'ratio', u'of', u'total-reflected', u'to', u'incident', u'electromagnetic', u'radiation.', u'It', u'is', u'a', u'unitless', u'measure', u'indicative', u'of', u'a', u"surface's", u'or', u"body's", u'diffuse', u'reflectivity."@en', u'.'])

Tudo certo! Podemos ver acima que o primeiro item é uma "Row" contendo uma única coluna "text" que engloba todo o nosso texto, já corretamente separado palavra por palavra.

Vamos agora aplicar o word2vec. Para uma primeira aplicação, começaremos com um numero relativamente grande de features (300) para garantir que a perda de informação seja pequena:

In [26]:
from pyspark.ml.feature import Word2Vec
word2Vec = Word2Vec(vectorSize=300, inputCol="text", outputCol="model")
model = word2Vec.fit(documentDF)

Py4JJavaError: An error occurred while calling o302.fit.
: java.lang.OutOfMemoryError: Java heap space
	at scala.collection.mutable.ArrayBuilder$ofFloat.mkArray(ArrayBuilder.scala:453)
	at scala.collection.mutable.ArrayBuilder$ofFloat.resize(ArrayBuilder.scala:459)
	at scala.collection.mutable.ArrayBuilder$ofFloat.sizeHint(ArrayBuilder.scala:464)
	at scala.Array$.fill(Array.scala:264)
	at org.apache.spark.mllib.feature.Word2Vec.doFit(Word2Vec.scala:354)
	at org.apache.spark.mllib.feature.Word2Vec.fit(Word2Vec.scala:319)
	at org.apache.spark.ml.feature.Word2Vec.fit(Word2Vec.scala:187)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:745)
