In [0]:
#Aplicação para detecção de anomalias 

Para a construção de um modelo utilizando o Spark e MLlib são necessários 6 passos:

#1) Construir e iniciar a seção SPARK
#2) Implementar o carregamento dos dados para o spark: Carregar o arquivo, especificar o formato desejado e lê os dados como um Dataframe do Spark
#3) Identificar as características a serem utilizadas para treinamento e teste do modelo
#4) Instanciar as classes e os objetos dos algoritmos a serem utilizados
#5) Utilizar o método fit() para realizar o treinamento do modelo
#6) Avaliar o modelo

In [0]:
from pyspark.sql import SparkSession #importa a biblioteca que cria a seção do spark

In [0]:
#inicia a seção para a utilização do spark
spark = SparkSession.builder.appName("DeteccaoAnomalias").getOrCreate() #cria a seção caso não exista ou obtém a já criada

In [0]:
%fs ls /FileStore/tables

path,name,size
dbfs:/FileStore/tables/Mall_Customers.csv,Mall_Customers.csv,4286
dbfs:/FileStore/tables/designation.json,designation.json,400
dbfs:/FileStore/tables/digitsNew.csv,digitsNew.csv,76775041
dbfs:/FileStore/tables/healthcare_dataset_stroke_data.csv,healthcare_dataset_stroke_data.csv,316971
dbfs:/FileStore/tables/iris_bezdekIris.csv,iris_bezdekIris.csv,4551
dbfs:/FileStore/tables/regressaoLinear-1.csv,regressaoLinear-1.csv,564
dbfs:/FileStore/tables/regressaoLinear.csv,regressaoLinear.csv,564
dbfs:/FileStore/tables/salary.json,salary.json,361
dbfs:/FileStore/tables/temperature-1.csv,temperature-1.csv,13971171
dbfs:/FileStore/tables/temperature.csv,temperature.csv,13971171


In [0]:
diretorioDataset="/FileStore/tables/temperature.csv"  #diretório que contém o arquivo a ser utilizado

In [0]:
data = spark.read.format("csv").options(header="true", inferschema="true").load(diretorioDataset)  #realiza a leitura do dataset

In [0]:
data.show(5,False)

In [0]:
data.columns #mostra as colunas do dataset

In [0]:
data.printSchema()

In [0]:
data.count()  #conta a quantidade de registros

In [0]:
#selecionando apenas o datatime e a coluna da cidade de Vancouver
dataAnalise=data.select('datetime','Vancouver')

In [0]:
dataAnalise.show(5)

Tratando os dados

In [0]:
#filtrando apenas os dados que não possuem valores nulos 
from pyspark.sql.functions import col
dataNotNull=dataAnalise.filter(col('Vancouver').isNotNull())

In [0]:
dataNotNull.show(5)

In [0]:
from pyspark.sql.functions import monotonically_increasing_id  #biblioteca para a construção dos índices
df_plots = dataNotNull.withColumn("indice", monotonically_increasing_id())  #cria os indices para realizar o plot

In [0]:
df_plots.show(5)

In [0]:
#mostrando os dados
display(df_plots.select("indice","Vancouver"))

indice,Vancouver
0,284.63
1,284.62904131
2,284.626997923
3,284.624954535
4,284.622911147
5,284.620867759
6,284.618824371
7,284.616780983
8,284.614737595
9,284.612694207


Aplicando o histograma para detecção de anomalias

In [0]:
#encontrando a média 
import numpy as np  #biblioteca utilizada para tratar vetores e matrizes
from pyspark.sql.functions import mean, stddev  #funções para encontrar a média e desvio padrão


In [0]:
list_stats = dataNotNull.select(mean(col('Vancouver')).alias('media'),stddev(col('Vancouver')).alias('desvioPadrao')).collect() #cria uma lista com os valores

In [0]:
media = list_stats[0]['media']
desvio = list_stats[0]['desvioPadrao']
print("Média: ", media)
print("Desvio Padrão: ", desvio)

In [0]:
df_stats = dataNotNull.select(mean(col('Vancouver')).alias('media'),stddev(col('Vancouver')).alias('desvioPadrao')) #cria o dataset com a média e o desvio padrão

In [0]:
df_stats.show()

In [0]:
#utilizando a função describe
dataNotNull.describe().show()

In [0]:
#definindo a funcao distancia
def distancia(x):
  media=283.8626
  desvio=6.6401
  return ((x - media)/desvio)

#definindo a funcao para verificar anomalias mais do que 2 desvios padrões (95% dos dados)
def anomalias(x):
  desvio=6.6401
  if (x>2):
    return 1
  else:
    return 0

#definindo as funções a serem utilizadas (registrando)
from pyspark.sql.types import DoubleType, IntegerType
distancia_udf_double = udf(lambda z: distancia(z), DoubleType())
anomalia_udf_int = udf(lambda z: anomalias(z), IntegerType())

In [0]:
data_new=dataNotNull.select('Vancouver',distancia_udf_double('Vancouver').alias('distancia'))
data_new.show()

In [0]:
from  pyspark.sql.functions import abs   #biblioteca necessária para cálculo do valor absoluto
data_new=data_new.select('Vancouver','distancia', abs(col('distancia')).alias("distanciaABS"))
data_new.show()

In [0]:
data_new=data_new.select('Vancouver','distancia', "distanciaABS", anomalia_udf_int("distanciaABS").alias("isAnomaly"))
data_new.show()

In [0]:
data_new.filter(col("isAnomaly")>0).show()

In [0]:
#visualizando o histograma
display(dataNotNull.select("Vancouver"))

Vancouver
284.63
284.62904131
284.626997923
284.624954535
284.622911147
284.620867759
284.618824371
284.616780983
284.614737595
284.612694207


Boxplot

In [0]:
#boxplot
display(dataNotNull.select("Vancouver"))

Vancouver
284.63
284.62904131
284.626997923
284.624954535
284.622911147
284.620867759
284.618824371
284.616780983
284.614737595
284.612694207


In [0]:
spark.sparkContext.parallelize( [np.array([1.0, 10.0, 100.0]), np.array([2.0, 20.0, 200.0]), np.array([3.0, 30.0, 300.0])])  # an RDD of Vectors

In [0]:
# Estatísticas com MLlib
from pyspark.mllib.stat import Statistics
coluna=dataNotNull.select("Vancouver")  #seleciona a coluna
coluna1= coluna.rdd.map(lambda x: [int (x[0])]) #aplica o map para transformar em vetor
estatistica=Statistics.colStats(coluna1) #aplica a estatística
print("Média: ",estatistica.mean())  # média
print("Variância: ", estatistica.variance())  # variância
print("Valores não nulos: ",estatistica.numNonzeros())  # numero de valores não zero

Aplicando o K-means

In [0]:
%fs ls /FileStore/tables	

path,name,size
dbfs:/FileStore/tables/Mall_Customers.csv,Mall_Customers.csv,4286
dbfs:/FileStore/tables/designation.json,designation.json,400
dbfs:/FileStore/tables/digitsNew.csv,digitsNew.csv,76775041
dbfs:/FileStore/tables/healthcare_dataset_stroke_data.csv,healthcare_dataset_stroke_data.csv,316971
dbfs:/FileStore/tables/iris_bezdekIris.csv,iris_bezdekIris.csv,4551
dbfs:/FileStore/tables/regressaoLinear-1.csv,regressaoLinear-1.csv,564
dbfs:/FileStore/tables/regressaoLinear.csv,regressaoLinear.csv,564
dbfs:/FileStore/tables/salary.json,salary.json,361
dbfs:/FileStore/tables/temperature-1.csv,temperature-1.csv,13971171
dbfs:/FileStore/tables/temperature.csv,temperature.csv,13971171


In [0]:
diretorioDataset="/FileStore/tables/worldcities.csv"  #diretório que contém o arquivo a ser utilizado

In [0]:
cities_df = spark.read.format("csv").options(header="true", inferschema="true").load(diretorioDataset)  #realiza a leitura do dataset

In [0]:
cities_df.printSchema()

In [0]:
#mostrando o dataset
cities_df.show()

In [0]:
#filtrando algumas cidades
cities_BR =cities_df.where(col("country")=="Brazil")
cities_BR.show(5)

In [0]:
cities_MX =cities_df.where(col("country")=="Mexico")
cities_MX.show(5)

In [0]:
cities_EUA =cities_df.where(col("country")=="United States")
cities_EUA.show(5)

In [0]:
#criando um novo dataset através da função join
df_concat = cities_BR.union(cities_MX)
df_concat.show(5)

In [0]:
#contando a quantidade de países diferentes
df_concat.groupby("country").count().show()

In [0]:
#adicionando cidades do Japão (nossas anomalias)
cities_JP =cities_df.where((col("city")=="Tokyo") | (col("city")=="Ōsaka"))
cities_JP.show()

In [0]:
#criando o dataset final
df_final = df_concat.union(cities_JP)

In [0]:
df_final.show()

In [0]:
from pyspark.ml.evaluation import ClusteringEvaluator  #biblioteca utilizada para a avaliação em cada um dos clusters
from pyspark.ml.clustering import KMeans #biblioteca utilizada para a criação do modelo de clusterização utilizando o K-means

In [0]:
from pyspark.ml.feature import VectorAssembler  #transformando os dados em vetores de características

vecAssembler = VectorAssembler(inputCols=["lat","lng"], outputCol="features") #utilizada para transformar os dados em um vetor (define o objeto)
new_df = vecAssembler.transform(df_final) #Aplico a transformação
new_df.show()

In [0]:
#aplica o processo de clusterização
kmeans = KMeans(k=3, seed=1)  # declara o objeto - 3 clusters 
model = kmeans.fit(new_df.select('features')) #aplica o treinamento

In [0]:
#cria o dataset com a indicação sobre qual cluster cada conjunto de dados foi adicionado
df_final = model.transform(new_df)
df_final.show() 

In [0]:
df_final.groupby('prediction').count().show()

In [0]:
df_final.where(col("prediction")=='2').show()