#Análise e mapeamento de persona de clientes
<center>Neste documento, vamos fazer uma <b>criação e análise de persona de clientes de um banco</b>, utilizando <b>PySpark</b> e um <b>algorítimo de Clusterização de Machine Learning</b>.<center>

<img src="https://img.shields.io/badge/Python-3.10.0%2B-orange" alt="drawing" width="80"/>

[![](https://img.shields.io/badge/Apache-License-green)](https://github.com/rgizsilva/Analise-de-dados-acidentes-rodoviarios/blob/main/LICENSE)

[![](https://img.shields.io/badge/Linkedin-Reginaldo-blue )](https://www.linkedin.com/in/rgiz/)

<center><img src="https://raw.githubusercontent.com/rgizsilva/Clusterizacao_clientes/main/persona.jpg" alt="drawing" width="850"/><center>

-----

A equipe de **marketing e produtos**, solicitou à equipe de dados que fizessem uma **coleta e análise de dados** dos clientes do banco em questão, separando as pessoas em grupos para serem oferecidos o produto e campanha ideal para cada tipo de pessoa.

Para isso, vamos utilizar um **algoritmo de clusterização não supervisionado** de *machine learning* utlizando o **K-Means**, agrupando as pessoas em *clusters* de acordo com suas familiaridades e assim criar uma **persona** de cada grupo.

Para nossa tarefa, será utilizado as seguintes ferramentas:
* <b>Hitachi Pentaho</b>  - Extração e limpeza dos dados (ETL)
* <b>PySpark</b>  - Manipulação e análise dos dados
* <b>Pandas</b>  - Manipulação dos dados
* <b>Databricks</b>  - Ambiente de desenvolvimento
* <b>Matplotlib</b>  -Geração de gráficos
* <b>Seaborn</b>  - Geração de gráficos
* <b>Scikit learning</b>  - Criação do algoritmo

----

##Extração e limpeza dos dados (ETL)

A nossa base de dados está no formato **Json** e vamos utilizar o *Hitachi Pentaho* para fazer uma primeira transformação e exportação dos dados, esse passo pode ser visto na imagem a baixo:

<center><img src="https://raw.githubusercontent.com/rgizsilva/Clusterizacao_clientes/main/pantaho_1.JPG" width="400"/><center>
<center><img src="https://raw.githubusercontent.com/rgizsilva/Clusterizacao_clientes/main/pentaho2.JPG" width="400"/><center>

Aqui foi feito um primeiro tratamento dos dados, onde fizemos *uploud* dos dados em formato *Json*, fizemos a correção dos tipos de cada atributo, e exportamos para o formato de *csv*.

##Importação dos dados

Após pré-tratado e exportado, vamos importar os dados hospedado na internet para nosso projeto.Por questão de compatibilidade, primeiro vamos usar a biblioteca pandas para importar o arquivo e depois converter nosso *dataset* para um *dataframe* do **pyspark**

In [0]:
#importando biblioteca pandas
import pandas as pd

#importando arquivo csv
df_pd = pd.read_csv('https://raw.githubusercontent.com/rgizsilva/Clusterizacao_clientes/main/Clientes_csv',sep=',',encoding='utf-8')
df_pd.head(5)

Unnamed: 0,idade,sexo,dependentes,escolaridade,estado_civil,salario_anual,tipo_cartao,meses_de_relacionamento,qtd_produtos,iteracoes_12meses,meses_inativo_12meses,limite_credito,valor_transacoes_12meses,qtd_transacoes_12meses
0,45,M,3,ensino medio,casado,$60K - $80K,blue,39,5,3,1,12691.51,1144.9,42
1,49,F,5,mestrado,solteiro,menos que $40K,blue,44,6,2,1,8256.96,1291.45,33
2,51,M,3,mestrado,casado,$80K - $120K,blue,36,4,0,1,3418.56,1887.72,20
3,40,F,4,ensino medio,na,menos que $40K,blue,34,3,1,4,3313.03,1171.56,20
4,40,M,3,sem educacao formal,casado,$60K - $80K,blue,21,5,0,1,4716.22,816.08,28


In [0]:
#importando biblioteca pyspark
from pyspark.sql import SparkSession

#criando uma sessão spark para trabalhar com pyspark
spark = SparkSession.builder.master('local').appName('Databricks').getOrCreate()

#converter data set para arquivo dataframe do pyspark
df= spark.createDataFrame(df_pd)

#conferindo o dataframe
df.show(5)

+-----+----+-----------+-------------------+------------+--------------+-----------+-----------------------+------------+-----------------+---------------------+--------------+------------------------+----------------------+
|idade|sexo|dependentes|       escolaridade|estado_civil| salario_anual|tipo_cartao|meses_de_relacionamento|qtd_produtos|iteracoes_12meses|meses_inativo_12meses|limite_credito|valor_transacoes_12meses|qtd_transacoes_12meses|
+-----+----+-----------+-------------------+------------+--------------+-----------+-----------------------+------------+-----------------+---------------------+--------------+------------------------+----------------------+
|   45|   M|          3|       ensino medio|      casado|   $60K - $80K|       blue|                     39|           5|                3|                    1|      12691.51|                  1144.9|                    42|
|   49|   F|          5|           mestrado|    solteiro|menos que $40K|       blue|                

In [0]:
#conferindo os tipos de variaveis do schema do daframe
df.printSchema()

root
 |-- idade: long (nullable = true)
 |-- sexo: string (nullable = true)
 |-- dependentes: long (nullable = true)
 |-- escolaridade: string (nullable = true)
 |-- estado_civil: string (nullable = true)
 |-- salario_anual: string (nullable = true)
 |-- tipo_cartao: string (nullable = true)
 |-- meses_de_relacionamento: long (nullable = true)
 |-- qtd_produtos: long (nullable = true)
 |-- iteracoes_12meses: long (nullable = true)
 |-- meses_inativo_12meses: long (nullable = true)
 |-- limite_credito: double (nullable = true)
 |-- valor_transacoes_12meses: double (nullable = true)
 |-- qtd_transacoes_12meses: long (nullable = true)



######Vamos corrigir substituindo os tipos das variáveis do dataframe

In [0]:
from pyspark.sql.types import (IntegerType,FloatType)
df2 = df = df \
  .withColumn("idade",df["idade"].cast(IntegerType()))   \
  .withColumn("dependentes",df["dependentes"].cast(IntegerType()))    \
  .withColumn("meses_de_relacionamento",df["meses_de_relacionamento"].cast(IntegerType()))   \
  .withColumn("qtd_produtos",df["qtd_produtos"].cast(IntegerType()))    \
  .withColumn("iteracoes_12meses",df["iteracoes_12meses"].cast(IntegerType()))   \
  .withColumn("limite_credito",df["limite_credito"].cast(FloatType()))    \
  .withColumn("meses_inativo_12meses",df["meses_inativo_12meses"].cast(IntegerType()))   \
  .withColumn("valor_transacoes_12meses",df["valor_transacoes_12meses"].cast(FloatType()))    \
  .withColumn("qtd_transacoes_12meses",df["qtd_transacoes_12meses"].cast(IntegerType()))
df= df2
df.printSchema()

root
 |-- idade: integer (nullable = true)
 |-- sexo: string (nullable = true)
 |-- dependentes: integer (nullable = true)
 |-- escolaridade: string (nullable = true)
 |-- estado_civil: string (nullable = true)
 |-- salario_anual: string (nullable = true)
 |-- tipo_cartao: string (nullable = true)
 |-- meses_de_relacionamento: integer (nullable = true)
 |-- qtd_produtos: integer (nullable = true)
 |-- iteracoes_12meses: integer (nullable = true)
 |-- meses_inativo_12meses: integer (nullable = true)
 |-- limite_credito: float (nullable = true)
 |-- valor_transacoes_12meses: float (nullable = true)
 |-- qtd_transacoes_12meses: integer (nullable = true)



In [0]:
#conferindo a quantidade de linhas
df.count()



Out[39]: 10127

In [0]:
#conferindo se existe algum dado duplicado
df.distinct().count()

Out[40]: 10127

######Agora, vamos corrigir valores nulos ou fora dos padrões

In [0]:
#conferindo se existem valores nulos
import pyspark.sql.functions as F
df.select(*[F.count(F.when(F.isnull(c), c)).alias(c) for c in df.columns]).show()

+-----+----+-----------+------------+------------+-------------+-----------+-----------------------+------------+-----------------+---------------------+--------------+------------------------+----------------------+
|idade|sexo|dependentes|escolaridade|estado_civil|salario_anual|tipo_cartao|meses_de_relacionamento|qtd_produtos|iteracoes_12meses|meses_inativo_12meses|limite_credito|valor_transacoes_12meses|qtd_transacoes_12meses|
+-----+----+-----------+------------+------------+-------------+-----------+-----------------------+------------+-----------------+---------------------+--------------+------------------------+----------------------+
|    0|   0|          0|           0|           0|            0|          0|                      0|           0|                0|                    0|             0|                       0|                     0|
+-----+----+-----------+------------+------------+-------------+-----------+-----------------------+------------+-----------------+-

In [0]:
#idade maáxima e idade mínima dos clientes
from pyspark.sql.functions import max,min
df_max = df.select(max('idade')).show(1)
df_min = df.select(min('idade')).show(1)
print(df_max)
print(df_min)



+----------+
|max(idade)|
+----------+
|        73|
+----------+

+----------+
|min(idade)|
+----------+
|        26|
+----------+

None
None


In [0]:
#conferindo se existem dados fora do padrão da coluna / output precisa retornar colunas sem os dados para estarem corretos
from pyspark.sql.functions import col
df_idade = df.filter(col("idade").rlike("^[a-z,A-Z]*$")).show()
df_sexo = df.filter(col("sexo").rlike("^[0-9]*$")).show()
df_dependentes = df.filter(col("dependentes").rlike("^[a-z,A-Z]*$")).show()
df_escolaridade = df.filter(col("escolaridade").rlike("^[0-9]*$")).show()
df_estado_civil = df.filter(col("estado_civil").rlike("^[0-9]*$")).show()
df_salario_anual = df.filter(col("salario_anual").rlike("^[0-9]*$")).show()
df_tipo_cartao = df.filter(col("tipo_cartao").rlike("^[0-9]*$")).show()
df_meses_de_relacionamento = df.filter(col("meses_de_relacionamento").rlike("^[a-z,A-Z]*$")).show()
df_qtd_produtos = df.filter(col("qtd_produtos").rlike("^[a-z,A-Z]*$")).show()
df_iteracoes_12meses = df.filter(col("iteracoes_12meses").rlike("^[a-z,A-Z]*$")).show()
df_meses_inativo_12meses = df.filter(col("meses_inativo_12meses").rlike("^[a-z,A-Z]*$")).show()
df_limite_credito = df.filter(col("limite_credito").rlike("^[a-z,A-Z]*$")).show()
df_valor_transacoes_12meses = df.filter(col("valor_transacoes_12meses").rlike("^[a-z,A-Z]*$")).show()
df_qtd_transacoes_12meses = df.filter(col("qtd_transacoes_12meses").rlike("^[a-z,A-Z]*$")).show()


+-----+----+-----------+------------+------------+-------------+-----------+-----------------------+------------+-----------------+---------------------+--------------+------------------------+----------------------+
|idade|sexo|dependentes|escolaridade|estado_civil|salario_anual|tipo_cartao|meses_de_relacionamento|qtd_produtos|iteracoes_12meses|meses_inativo_12meses|limite_credito|valor_transacoes_12meses|qtd_transacoes_12meses|
+-----+----+-----------+------------+------------+-------------+-----------+-----------------------+------------+-----------------+---------------------+--------------+------------------------+----------------------+
+-----+----+-----------+------------+------------+-------------+-----------+-----------------------+------------+-----------------+---------------------+--------------+------------------------+----------------------+

+-----+----+-----------+------------+------------+-------------+-----------+-----------------------+------------+-----------------+

In [0]:
#conferindo as variaveis string individualmente e substituir valores não informados("na")
df.groupBy('sexo').count().show()
df.groupBy('escolaridade').count().show()
df.groupBy('estado_civil').count().show()
df.groupBy('salario_anual').count().show()
df.groupBy('tipo_cartao').count().show()


+----+-----+
|sexo|count|
+----+-----+
|   F| 5358|
|   M| 4769|
+----+-----+

+-------------------+-----+
|       escolaridade|count|
+-------------------+-----+
|          doutorado|  967|
|           mestrado| 3128|
|       ensino medio| 2013|
|sem educacao formal| 1487|
|          graduacao| 1013|
|                 na| 1519|
+-------------------+-----+

+------------+-----+
|estado_civil|count|
+------------+-----+
|    solteiro| 3943|
|      casado| 4687|
|  divorciado|  748|
|          na|  749|
+------------+-----+

+--------------+-----+
| salario_anual|count|
+--------------+-----+
|       $120K +|  727|
|   $60K - $80K| 1402|
|  $80K - $120K| 1535|
|   $40K - $60K| 1790|
|menos que $40K| 3561|
|            na| 1112|
+--------------+-----+

+-----------+-----+
|tipo_cartao|count|
+-----------+-----+
|     silver|  555|
|       gold|  116|
|   platinum|   20|
|       blue| 9436|
+-----------+-----+



Podemos observar alguns valores **na**(não respondido) em alguns dados. Para tratar isso nesse caso, vamos substituir esses valores pelo valor mais frequente da coluna.
No nosso caso, a coluna **escolaridade** receberá **mestrado** , a coluna **estado_civil** receberá **casado** e a coluna **salario_anual** receberá  **menos que $40K** nos valores ausentes.

In [0]:
#importando biblioteca regexp_replace para fazer a troca de valores
from pyspark.sql.functions import regexp_replace
df2 = df.replace("na", "mestrado", "escolaridade")
df3 = df2.replace("na", "casado", "estado_civil")
df4 = df3.replace("na", "menos que $40K", "salario_anual")
df=df4



###Transformando variáveis categóricas em variáveis numéricas
Agora, precisamos transformar nossas variáveis de textos para números, para fazer o processo de **clusterização**.

Para essa técnica, vamos usar a biblioteca **StringIndexer** para transformar nossas variáveis categóricas em números. Depois vamos aplicar a técnica de **One hot encoding** para transformar os números em um vetor.

In [0]:
#importando biblioteca pipeline para criar uma pipeline a ser feita
from pyspark.ml import Pipeline
#importando bibliotexa StringIndexer para criar novas colunas do tipo inteiro representando as colunas categóricas
from pyspark.ml.feature import StringIndexer

indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(df) for column in list(set(df.columns)-set(['idade','dependentes','meses_de_relacionamento', 'qtd_produtos','iteracoes_12meses','meses_inativo_12meses','limite_credito','valor_transacoes_12meses','qtd_transacoes_12meses'])) ]

pipeline = Pipeline(stages=indexers)
df_index = pipeline.fit(df).transform(df)

df_index.select('salario_anual_index','tipo_cartao_index','sexo_index','estado_civil_index','escolaridade_index').show(5)



+-------------------+-----------------+----------+------------------+------------------+
|salario_anual_index|tipo_cartao_index|sexo_index|estado_civil_index|escolaridade_index|
+-------------------+-----------------+----------+------------------+------------------+
|                3.0|              0.0|       1.0|               0.0|               1.0|
|                0.0|              0.0|       0.0|               1.0|               0.0|
|                2.0|              0.0|       1.0|               0.0|               0.0|
|                0.0|              0.0|       0.0|               0.0|               1.0|
|                3.0|              0.0|       1.0|               0.0|               2.0|
+-------------------+-----------------+----------+------------------+------------------+
only showing top 5 rows



Agora vamos utilizar a técnica de **One hot enconding** para transformar as novas colunas númericas em vetores.

In [0]:
# importando biblioteca one hot encoder
from pyspark.ml.feature import OneHotEncoder
encoder = OneHotEncoder(inputCols=["salario_anual_index","tipo_cartao_index","sexo_index","estado_civil_index","escolaridade_index"],
                                 outputCols=["salario_anual_vec","tipo_cartao_vec","sexo_vec","estado_civil_vec","escolaridade_vec"],dropLast=False)
model = encoder.fit(df_index)
df_ohe = model.transform(df_index)
df_ohe.select("salario_anual_vec","tipo_cartao_vec","sexo_vec","estado_civil_vec","escolaridade_vec").show(5)


+-----------------+---------------+-------------+----------------+----------------+
|salario_anual_vec|tipo_cartao_vec|     sexo_vec|estado_civil_vec|escolaridade_vec|
+-----------------+---------------+-------------+----------------+----------------+
|    (5,[3],[1.0])|  (4,[0],[1.0])|(2,[1],[1.0])|   (3,[0],[1.0])|   (5,[1],[1.0])|
|    (5,[0],[1.0])|  (4,[0],[1.0])|(2,[0],[1.0])|   (3,[1],[1.0])|   (5,[0],[1.0])|
|    (5,[2],[1.0])|  (4,[0],[1.0])|(2,[1],[1.0])|   (3,[0],[1.0])|   (5,[0],[1.0])|
|    (5,[0],[1.0])|  (4,[0],[1.0])|(2,[0],[1.0])|   (3,[0],[1.0])|   (5,[1],[1.0])|
|    (5,[3],[1.0])|  (4,[0],[1.0])|(2,[1],[1.0])|   (3,[0],[1.0])|   (5,[2],[1.0])|
+-----------------+---------------+-------------+----------------+----------------+
only showing top 5 rows



Agora, vamos usar uma técnica para unir todas as nossas variáveis em uma única coluna de vetores. Para isso vamos usar a ferramente  **Vector Assembler**

In [0]:
# importando a biblioteca VectorsAssembler 
from pyspark.ml.feature import VectorAssembler

assembler= VectorAssembler(
    inputCols=["idade","dependentes","meses_de_relacionamento","qtd_produtos","iteracoes_12meses","meses_inativo_12meses","limite_credito",
              "valor_transacoes_12meses","qtd_transacoes_12meses","salario_anual_vec","tipo_cartao_vec","sexo_vec","estado_civil_vec","escolaridade_vec"],
outputCol = 'Features')

output= assembler.transform(df_ohe)

df_features= output
df_features.select("Features").show(5)


+--------------------+
|            Features|
+--------------------+
|(28,[0,1,2,3,4,5,...|
|(28,[0,1,2,3,4,5,...|
|(28,[0,1,2,3,5,6,...|
|(28,[0,1,2,3,4,5,...|
|(28,[0,1,2,3,5,6,...|
+--------------------+
only showing top 5 rows



###Padronização dos dados
Agora  que temos uma coluna com todas as nossas variáveis juntas em um vetor, precisamos fazer a **padronização dos dados**, diminuir a variança dos dados quando temos valores diferentes. No nosso caso, como estamos trabalhando com a variável **limite_crédito**, temos uma variança grande dos valores e por isso vamos deixar esses dados na mesma escala das outras variáveis utilizando a biblioteca **StandardScaler** para fazer esse processo.

In [0]:
# importando a biblioteca StandarScaler
from pyspark.ml.feature import StandardScaler
scaler = StandardScaler(inputCol='Features',outputCol='Scaled_features',
                      withMean=True,withStd=True)
scalerModel = scaler.fit(df_features)
scaled_df = scalerModel.transform(df_features)
scaled_df.select("Scaled_features").show(5)


+--------------------+
|     Scaled_features|
+--------------------+
|[-0.1653974133307...|
|[0.33355391368761...|
|[0.58302957719681...|
|[-0.7890865721037...|
|[-0.7890865721037...|
+--------------------+
only showing top 5 rows



###PCA
Vamos utilizar agora a técnica de **PCA**(Análise de componentes principais) para converter um conjunto de observações de variáveis possivelmente correlacionadas num conjunto de valores de variáveis linearmente não correlacionadas chamadas de componentes principais. Isso ira reduzir a dimensão dos nossos dados em 2 componentes facilitando a predição do nosso modelo.

In [0]:
#importando biblioteca PCA
from pyspark.ml.feature import PCA 

pca = PCA(k=2, inputCol="Scaled_features", outputCol="pca")
model = pca.fit(scaled_df)
df_pca = model.transform(scaled_df)
df_pca.select("pca").show(5)

+--------------------+
|                 pca|
+--------------------+
|[1.75037190356745...|
|[-1.7995031247212...|
|[1.41507938819494...|
|[-1.9739377045107...|
|[1.35416606461328...|
+--------------------+
only showing top 5 rows



###Calculando número de clusters para utilizar no algorítimo K-means
Vamos realizar um teste do nosso modelo de clusterização, para saber quantos clusters(grupos) serão mais performáticos para nossa utilização. Utilizando a **Silhoutte Score**  e a **distância Euclidiana**, vamos descobrir qual o número exato de clusters para o nosso algorítimo de clusterização. A classificação é de **-1**(para o pior desempenho e) e **1**(para o melhor desempenho)

In [0]:
#importando algoritimo Kmeans e ClusteringEvaluator
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
silhouette_score=[]
evaluator = ClusteringEvaluator(predictionCol='prediction', featuresCol='pca', \
                                metricName='silhouette', distanceMeasure='squaredEuclidean')
for i in range(2,10):
    
    KMeans_algo=KMeans(featuresCol='pca', k=i)
    
    KMeans_fit=KMeans_algo.fit(df_pca)
    
    output = KMeans_fit.transform(df_pca)

    score=evaluator.evaluate(output)
    
    silhouette_score.append(score)
    
    print("Silhouette Score:",score)

Silhouette Score: 0.694245181861431
Silhouette Score: 0.7920153613722272
Silhouette Score: 0.7003426124344374
Silhouette Score: 0.6681162730152383
Silhouette Score: 0.5813268298424676
Silhouette Score: 0.5748051716232486
Silhouette Score: 0.5882270024478965
Silhouette Score: 0.5791334385197523


###Criando a clusterização com K-means
Como vimos anteriormente, nosso algoritmo performou melhor com **3 clusters**, com isso vamos criar 3 grupos diferentes de clientes para a entrega do nosso projeto.

In [0]:
kmeans= KMeans(featuresCol="pca",predictionCol="cluster",k=3)
model=kmeans.fit(df_pca)
df_predict= model.transform(df_pca)
df_predict.select("idade","sexo","cluster").show(10)




+-----+----+-------+
|idade|sexo|cluster|
+-----+----+-------+
|   45|   M|      1|
|   49|   F|      0|
|   51|   M|      1|
|   40|   F|      0|
|   40|   M|      1|
|   44|   M|      1|
|   51|   M|      2|
|   32|   M|      2|
|   37|   M|      1|
|   48|   M|      1|
+-----+----+-------+
only showing top 10 rows

