# Trabalho de Data Mining

Nome: Pablo Luiz Leon - Universidade Federal ABC

Esse notebook introduz os conceitos básicos do Spark através de sua interface com a linguagem Python. Para execução do trabalho da Disciplina Data Mining na Universidade Federal do ABC.



In [26]:
# Operação para Unir Todos os arquivos de dados em um único arquivo.

from pyspark import SparkContext
from pyspark import SparkConf
sc = SparkContext.getOrCreate()

import sys
import os
from glob import glob
import pandas as pd
import numpy as np
from os.path import isfile, join
from operator import add


def criarDados(RDD_Base, tamanho_Lista):
    """Cria uma RDD onde: 
    id: Identificador
    lista: quantidade de itens do RDD base como itens de lista
    valor da lista + 1: valor posterior do RDD base 

    Args:
        RDD_Base: Base de Dados.
        tamanho_list: qunantidade de valores para cada lista de cada item 

    Returns:
        RDD novo contedo: id / lista de valores / valor posterior da lista de dados do RDD de origem depois da criação da lista.
    """

    #Cria indicie das informações
    RDD_DadosComIndicie = RDDDadosSeparados.map(lambda x: x[1]).zipWithIndex()

    #Cria tupla (chave, valor)
    DadosListaRDD = RDD_DadosComIndicie.map(lambda x : [(i,(float(x[1]), float(x[0]))) for i in range(x[1]+1)] )

    #for i in DadosListaRDD.collect():
    #   print(i)

    #Pega todo o RDD e transforma em uma lista única de itens
    ListaUnicaTuplasRDD = DadosListaRDD.flatMap(lambda x : x)

    #Aplica GroupByKey para agrupar os dados pela chave e criar a lista de valores para cada chave.
    DadosAgrupadosPorChave = ListaUnicaTuplasRDD.groupByKey().map(lambda x: (x[0], list(x[1])))
    #Ordena os dados por chave de indice
    DadosAgrupadosPorChaveOrdenados = DadosAgrupadosPorChave.sortBy(lambda x: x[0])

    #for x in DadosAgrupadosPorChaveOrdenados.collect():
    #    print(x)
    
    #Remove as chaves que não fazem parte da estrutura final dos dados transformados.
    RDD_DadosLimpos = DadosAgrupadosPorChaveOrdenados.filter(lambda x: tamanho_Lista < len(x[1]))
    
    #for x in Teste.collect():
    #    print(x)
    #Monta os dados baseado na estrutura proposta (id, lista, valor)
    DadosFinal = RDD_DadosLimpos.map(lambda x: (x[0], 
                                      [x[1][i][1] for i in range(0,int(tamanho_Lista))], 
                                      x[1][int(tamanho_Lista)][1]))

    #print ('')
    #for x in DadosFinal.collect():
    #   print(x)
    
    return DadosFinal


def DistanciaEuclediana(RDDDados, DadosBase):
    """Calcula a Distancia Euclediana entre dois pontos:     
    
    distancia = ((x1 - x2)^2 + (y1 - y2)^2)^2;
    
    Args:
        RDD de Dados (id, lista de valores de distancia, valor).
        Lista com os valors de base para calculo
    
    Returns:
        RDD com a distancia euclediana calculada para todos os pontos de análise
    """

    RDD_Pre_Euclediana = RDDDados.map(lambda x:(x[0],
                                    [np.power((DadosBase[i] -  x[1][i]),2) for i in range (0, len(DadosBase))],
                                    x[2])
                         )
    
    RDDEucledianaFinal = (RDD_Pre_Euclediana.map(lambda x: (x[0],np.sqrt(sum(x[1])),x[2]))
                          .sortBy(lambda x: x[1]))

    
    return RDDEucledianaFinal

def kNN(k, RDDTreino, Dados):
    """Aplica o Algoritmo de kNN - K Nearst Neirbohrs:     
    
    Args:
        k ==> Amostra de K próximo.
        RDDTreino ==> (id, lista de valores de distancia, valor).
        Dados ==> Dado de base para calculo de kNN (Ultimo valor da série de Dados)
    
    Returns:
        Valor predito para o kNN indicado.
    """
    #Calcula Distancia Euclediana dos pontos Retorna um RDD com os pontos calculados por Indice
    RDDDadosCalculados = DistanciaEuclediana(RDDTreino, Dados)
    
    #Ordena os dados Calculados para discubrir a distancia menor (Ordenação pelos valores de distancia euclediana).
    RDDDadosCalculadosOrd = RDDDadosCalculados.sortBy(lambda x: x[1])
    
    #for x in RDDDadosCalculadosOrd.collect():
    #    print(x)
    
    valor_k_Dados = RDDDadosCalculadosOrd.take(2)
    #Pega o Identificador para filtrar os dados
    pFiltro = valor_k_Dados[1][0]
    
    #pega o último item do RDD de valores
    RDDlastItem = RDDTreino.filter(lambda x: x[2] == 0)
    
    
    #pega o Valor predito pelo kNN
    pChangeValue = valor_k_Dados[1][2]
    
    #Cria um RDD com o valor da chave a ser predito colocando o valor da predição.
    pRDDInvertido = RDDlastItem.map(lambda x: (x[0],x[1], float(x[2]+pChangeValue)))
    
    #Aplica Join dos RDD para devolver o valor com indice da predição calculada.
    RDDUnion = RDDTreino.union(pRDDInvertido)
    
    #for x in RDDUnion.collect():
    #    print(x)
    
    #Aplica filtro para excluir chave de registro que tem valor zero.
    RDDDadosFinal = RDDUnion.filter(lambda x: x[2] != 0)
    
    for x in RDDDadosFinal.collect():
        print(x)
    
    return RDDDadosFinal.top(1)

arquivo = os.path.join('Data', 'C:/UFABC/DADOS/ETFs/Data/aaxj_teste.txt') 
 
# lê o arquivo com os dados e carrega em um RDD        
DadosRDD1 = (sc.textFile(arquivo, 8).collect())

DadosRDD = sc.parallelize(DadosRDD1,8)
#Separa os dados do Arquivo (ativo, Data e Valor)
RDDDadosSeparados = DadosRDD.map(lambda x: x.split(";"))


k = 1
tam_lista = 2

RDD_Teste = criarDados(RDDDadosSeparados, tam_lista)

print("Dados de Analise")

for x in RDD_Teste.collect():
    print(x)



print("\n\tAplicando k-NN")

print("\t\tValor de K informado: "+ str(k))
print("\t\tTamanho da lista de dados informado: "+str(tam_lista))

print("\t\tTotal de Registros da base de Treino: "+ str(RDD_Teste.count()))
print("\t\tk Sugerido baseado no tamanho da base de dados: {:.2f}" .format(np.log(RDD_Teste.count())))


plistDado = RDD_Teste.top(1)

ResultadoRDDkNN = kNN(k, RDD_Teste, plistDado[0][1])
print("\n\n\t\tValor de kNN: "+str(ResultadoRDDkNN))

Dados de Analise
(0, [44886.0, 43875.0], 43283.0)
(1, [43875.0, 43283.0], 43892.0)
(2, [43283.0, 43892.0], 44071.0)
(3, [43892.0, 44071.0], 43248.0)
(4, [44071.0, 43248.0], 43.66)
(5, [43248.0, 43.66], 44457.0)
(6, [43.66, 44457.0], 44475.0)
(7, [44457.0, 44475.0], 44171.0)
(8, [44475.0, 44171.0], 43875.0)
(9, [44171.0, 43875.0], 42235.0)
(10, [43875.0, 42235.0], 41153.0)
(11, [42235.0, 41153.0], 41188.0)
(12, [41153.0, 41188.0], 42.29)
(13, [41188.0, 42.29], 41.08)
(14, [42.29, 41.08], 0.0)

	Aplicando k-NN
		Valor de K informado: 1
		Tamanho da lista de dados informado: 2
		Total de Registros da base de Treino: 15
		k Sugerido baseado no tamanho da base de dados: 2.71
(0, [44886.0, 43875.0], 43283.0)
(1, [43875.0, 43283.0], 43892.0)
(2, [43283.0, 43892.0], 44071.0)
(3, [43892.0, 44071.0], 43248.0)
(4, [44071.0, 43248.0], 43.66)
(5, [43248.0, 43.66], 44457.0)
(6, [43.66, 44457.0], 44475.0)
(7, [44457.0, 44475.0], 44171.0)
(8, [44475.0, 44171.0], 43875.0)
(9, [44171.0, 43875.0], 42235.