In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql import Row
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import *

### Nesse notebook vamos aprender a usar a biblioteca `ml`.

### Como primeiro passo, vamos carregar a base de dados Tweets, obtida no [Kaggle](http://kaggle.com) e verificar a estrutura dela, pelo cabeçalho.

In [None]:
airlineRDD = sc.textFile('Tweets.csv')

header = airlineRDD.take(1)[0]
print header.split(',')

airlineDataRDD = (airlineRDD
                  .filter(lambda x: x!=header)
                  )

### Em seguida, vamos extrair apenas os campos de interesse: o campo \#1 que contém o sentimento e o campo \#10 que contém o texto.

### Para garantir compatibilidade precisamos fazer para cada linha:
* Codificar o texto em utf-8
* Dividir a linha em campos, com um parser de csv
* filtrar linhas que contém um número inválido de campos (<11)
* criar uma RDD de tuplas no formato (sentimento, texto), com sentimento transformado em 0 ou 1
* eliminar sentimentos inválidos
* eliminar lihas sem texto
* eliminar tuplas inválidas

In [None]:
def csvParse(s):
    import csv
    from StringIO import StringIO
    sio = StringIO(s)
    value = csv.reader(sio).next()
    sio.close()
    return value

def Sent2Id(sent):
    if sent == 'negative':
        return 0.0
    elif sent == 'positive':
        return 1.0
    else:
        return -1.0

sentimentLocRDD = (airlineDataRDD
                   .map(lambda line: line.encode('utf-8'))
                   .map(csvParse)
                   .filter(lambda x: len(x) >= 11)
                   .map(lambda x: (Sent2Id(x[1]),x[10]))
                   .filter(lambda x: x[0]>=0)
                   .filter(lambda x: len(x[1]))
                   .filter(lambda x: len(x)==2)
                  )

sentimentLocRDD.take(10)

### A biblioteca `ml` trabalha com DataFrames (que no futuro se tornarão DataSet), que são bases de dados estruturadas.

### Vamos criar nossa DataFrame com:

```
sqlContext.createDataFrame( RDD, lista de campos )
```

In [None]:
# sentiment classification
df = sqlContext.createDataFrame(sentimentLocRDD , ['label','tokens'])

### Os DataFrames permitem a utilização de comandos SQL tradicionais:

In [None]:
labeledData = df.select(df.label, df.tokens).where('label >= 0')

### Finalmente, utilizaremos o `Pipeline` para definir a sequência de processamento de nossa RDD (que pode ser aplicado em outras RDDs de mesma estrutura).

### Nosso Pipeline consistirá de:

* Tokenizar o texto com `Tokenizer`
* Criar vetor TF com `HashingTF`
* Aplicar um algoritmo de Regressão Logística para aprender a classificar nossos dados com `LogisticRegression`

In [None]:
tokenizer = Tokenizer(inputCol="tokens", outputCol="words")
hashingTF = HashingTF(numFeatures=1000, inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.01)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

model = pipeline.fit(labeledData)

### Uma vez que o modelo foi treinado, podemos aplicar esse mesmo Pipeline para predizer novos elementos com o método `transform`.

In [None]:
predictionsDF = model.transform(df)
for row in predictionsDF.take(10):
    print row[1].encode('utf-8'), row[5]