# Processo de extração dos dados
Utilizando a biblioteca pymongo iremos conectar o arquivo de database "marketing_campaign.csv", armazenado em uma bucket pública na Google Cloud Storage, com o MongoDB e depois extrair esse data frame do Mongo para trabalharmos utilizando o Pandas e o PySpark.

In [None]:
pip install pymongo[srv]

In [None]:
!pip install pyspark

In [None]:
!pip install gcsfs

In [398]:
from pyspark.sql import SparkSession
from pyspark import SparkConf
import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, DateType
from pyspark.sql.window import Window
import pyspark.sql.types as T
from google.cloud import storage
import os

In [399]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [400]:
serviceAccount = '/content/CHAVE_JSON.json'
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = serviceAccount

In [401]:
client = storage.Client()

#VARIÁVEL BUCKET RECEBE O NOME DA BUCKET DO CLOUD STORAGE
bucket = client.get_bucket('mara')

#MÉTODO BLOB RETORNA O NOME DO ARQUIVO (JSON, CSV, PARQUET)
bucket.blob('marketing_campaign.csv')

#VARIÁVEL PATH RECEBE O CAMINHO DO ARQUIVO CSV
path = ('gs://mara/marketing_campaign.csv')

In [402]:
#IMPORTAÇÃO PACOTES MONGODB
import pymongo
from pymongo import MongoClient

#IMPORTAR BIBLIOTECA PANDAS
import pandas as pd

In [403]:
#CRIANDO SESSÃO NO SPARK
spark = (
    SparkSession.builder
    .master('local')
    .appName('trabalho_marina')
    .config('spark.ui.port', '4050')
    .getOrCreate()
)

In [404]:
#LINK DO ARQUIVO NA BUCKET PÚBLICA
df = pd.read_csv("https://storage.googleapis.com/mara/marketing_campaign.csv",sep=",",parse_dates=["Dt_Customer"],dayfirst=True)

#Ou:
#df = pd.read_csv(path, sep=",",parse_dates=["Dt_Customer"],dayfirst=True)

In [405]:
#PRE-VISUALIZAR df
df

Unnamed: 0,ID,Year_Birth,Education,Marital_Status,Income,Kidhome,Teenhome,Dt_Customer,Recency,MntWines,MntFruits,MntMeatProducts,MntFishProducts,MntSweetProducts,MntGoldProds,NumDealsPurchases,NumWebPurchases,NumCatalogPurchases,NumStorePurchases,NumWebVisitsMonth,AcceptedCmp3,AcceptedCmp4,AcceptedCmp5,AcceptedCmp1,AcceptedCmp2,Complain,Z_CostContact,Z_Revenue,Response
0,5524,1957,Graduation,Single,58138.0,0,0,2012-09-04,58,635,88,546,172,88,88,3,8,10,4,7,0,0,0,0,0,0,3,11,1
1,2174,1954,Graduation,Single,46344.0,1,1,2014-03-08,38,11,1,6,2,1,6,2,1,1,2,5,0,0,0,0,0,0,3,11,0
2,4141,1965,Graduation,Together,71613.0,0,0,2013-08-21,26,426,49,127,111,21,42,1,8,2,10,4,0,0,0,0,0,0,3,11,0
3,6182,1984,Graduation,Together,26646.0,1,0,2014-02-10,26,11,4,20,10,3,5,2,2,0,4,6,0,0,0,0,0,0,3,11,0
4,5324,1981,PhD,Married,58293.0,1,0,2014-01-19,94,173,43,118,46,27,15,5,5,3,6,5,0,0,0,0,0,0,3,11,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2235,10870,1967,Graduation,Married,61223.0,0,1,2013-06-13,46,709,43,182,42,118,247,2,9,3,4,5,0,0,0,0,0,0,3,11,0
2236,4001,1946,PhD,Together,64014.0,2,1,2014-06-10,56,406,0,30,0,0,8,7,8,2,5,7,0,0,0,1,0,0,3,11,0
2237,7270,1981,Graduation,Divorced,56981.0,0,0,2014-01-25,91,908,48,217,32,12,24,1,2,3,13,6,0,1,0,0,0,0,3,11,0
2238,8235,1956,Master,Together,69245.0,0,1,2014-01-24,8,428,30,214,80,30,61,2,6,5,10,3,0,0,0,0,0,0,3,11,0


In [406]:
#CONECTOR DO MONGO ATLAS
cliente = pymongo.MongoClient("CHAVE_MONGO")

In [407]:
#DATABASE 'trabalhomarina' CRIADA NO MONGO
db = cliente['trabalhomarina']
#COLECAO 'marketingOriginal' CRIADA COM BASE NO db
dataOriginal = db.marketingOriginal

In [408]:
#ATENÇÃO: NÃO RODAR NOVAMENTE, POIS UMA VEZ CRIADA A COLEÇÃO SE ESTA FOR INSERIDA DE NOVO TEREMOS LINHAS DUPLICADAS
'''
#CARREGANDO DATAFRAME ORIGINAL PARA MONGODB
dfOriginal = df.to_dict("records")
#INSERINDO NA COLEÇÃO
dataOriginal.insert_many(dfOriginal)
'''

'\n#CARREGANDO DATAFRAME ORIGINAL PARA MONGODB\ndfOriginal = df.to_dict("records")\n#INSERINDO NA COLEÇÃO\ndataOriginal.insert_many(dfOriginal)\n'

In [409]:
#CONTA QUANTOS DOCUMENTOS (LINHAS) TEM NA COLEÇÃO
dataOriginal.count_documents({})

2240

In [410]:
#EXTRAIR ARQUIVO DO MONGODB E TRAZER PARA SER TRABALHADO COM PANDAS (PD)
extrator = dataOriginal.find({})
dfmongo = pd.DataFrame(list(extrator))

# Processo de transformação, limpeza e filtragem dos dados usando Pandas
Agora que já temos a coleção criada no MongoDB: 'marketingOriginal' iniciaremos o processo de transformação dos dados buscando primeiramente encontrar inconsistências, valores nulos, dropar colunas que não serão utilizadas (valores repetidos), traduzir informações do inglês para o português. 

In [411]:
#EXIBIR TODAS AS COLUNAS
pd.set_option('max_columns', None)

In [412]:
#EXIBIR 5 PRIMEIRAS LINHAS DO DATAFRAME
dfmongo.head()


Unnamed: 0,_id,ID,Year_Birth,Education,Marital_Status,Income,Kidhome,Teenhome,Dt_Customer,Recency,MntWines,MntFruits,MntMeatProducts,MntFishProducts,MntSweetProducts,MntGoldProds,NumDealsPurchases,NumWebPurchases,NumCatalogPurchases,NumStorePurchases,NumWebVisitsMonth,AcceptedCmp3,AcceptedCmp4,AcceptedCmp5,AcceptedCmp1,AcceptedCmp2,Complain,Z_CostContact,Z_Revenue,Response
0,6234bb09f7f67e3c3f82c84b,5524,1957,Graduation,Single,58138.0,0,0,2012-09-04,58,635,88,546,172,88,88,3,8,10,4,7,0,0,0,0,0,0,3,11,1
1,6234bb09f7f67e3c3f82c84c,2174,1954,Graduation,Single,46344.0,1,1,2014-03-08,38,11,1,6,2,1,6,2,1,1,2,5,0,0,0,0,0,0,3,11,0
2,6234bb09f7f67e3c3f82c84d,4141,1965,Graduation,Together,71613.0,0,0,2013-08-21,26,426,49,127,111,21,42,1,8,2,10,4,0,0,0,0,0,0,3,11,0
3,6234bb09f7f67e3c3f82c84e,6182,1984,Graduation,Together,26646.0,1,0,2014-02-10,26,11,4,20,10,3,5,2,2,0,4,6,0,0,0,0,0,0,3,11,0
4,6234bb09f7f67e3c3f82c84f,5324,1981,PhD,Married,58293.0,1,0,2014-01-19,94,173,43,118,46,27,15,5,5,3,6,5,0,0,0,0,0,0,3,11,0


In [413]:
#EXIBIR O TIPO DAS COLUNAS
dfmongo.dtypes

_id                            object
ID                              int64
Year_Birth                      int64
Education                      object
Marital_Status                 object
Income                        float64
Kidhome                         int64
Teenhome                        int64
Dt_Customer            datetime64[ns]
Recency                         int64
MntWines                        int64
MntFruits                       int64
MntMeatProducts                 int64
MntFishProducts                 int64
MntSweetProducts                int64
MntGoldProds                    int64
NumDealsPurchases               int64
NumWebPurchases                 int64
NumCatalogPurchases             int64
NumStorePurchases               int64
NumWebVisitsMonth               int64
AcceptedCmp3                    int64
AcceptedCmp4                    int64
AcceptedCmp5                    int64
AcceptedCmp1                    int64
AcceptedCmp2                    int64
Complain    

In [414]:
#DIMENSÃO DO DATAFRAME: QUANTIDADE DE LINHAS E COLUNAS
dfmongo.shape

(2240, 30)

In [415]:
#EXIBE NOME DAS COLUNAS
dfmongo.columns

Index(['_id', 'ID', 'Year_Birth', 'Education', 'Marital_Status', 'Income',
       'Kidhome', 'Teenhome', 'Dt_Customer', 'Recency', 'MntWines',
       'MntFruits', 'MntMeatProducts', 'MntFishProducts', 'MntSweetProducts',
       'MntGoldProds', 'NumDealsPurchases', 'NumWebPurchases',
       'NumCatalogPurchases', 'NumStorePurchases', 'NumWebVisitsMonth',
       'AcceptedCmp3', 'AcceptedCmp4', 'AcceptedCmp5', 'AcceptedCmp1',
       'AcceptedCmp2', 'Complain', 'Z_CostContact', 'Z_Revenue', 'Response'],
      dtype='object')

In [416]:
#TRADUZIR COLUNAS
dfmongo.rename(columns={'Year_Birth': 'Ano_de_nascimento', 'Education': 'Escolaridade',
                        'Marital_Status':'Estado_civil', 'Income':'Renda_anual',
                        'Kidhome':'Nro_Filhos_criancas', 'Teenhome':'Nro_Filhos_adolescentes',
                        'Dt_Customer':'Data_inscricao_cliente', 'Recency':'Nro_dias_ultima_compra',
                        'Complain':'Reclamacoes', 'MntWines': 'Gastos_vinho',
                        'MntFruits':'Gastos_frutas', 'MntMeatProducts':'Gastos_carnes',
                        'MntFishProducts': 'Gastos_peixes', 'MntSweetProducts':'Gastos_doces',
                        'MntGoldProds':'Gastos_ouro', 'NumDealsPurchases':'Compras_promocao',
                        'NumWebPurchases':'Compras_site', 'NumCatalogPurchases':'Compras_catalogo',
                        'NumStorePurchases':'Compras_loja_fisica', 'NumWebVisitsMonth':'Nro_visitas_site',
                        'AcceptedCmp3':'Aceitou_oferta3', 'AcceptedCmp4':'Aceitou_oferta4',
                        'AcceptedCmp5':'Aceitou_oferta5', 'AcceptedCmp1':'Aceitou_oferta1',
                        'AcceptedCmp2':'Aceitou_oferta2', 'Response':'Aceitou_ultima_oferta',
                        'Z_CostContact': 'Z_Custo_contato', 'Z_Revenue':'Z_Receita',

                        }, inplace=True)

In [417]:
#CONFERIR SE COLUNAS FORAM TRADUZIDAS
dfmongo.columns

Index(['_id', 'ID', 'Ano_de_nascimento', 'Escolaridade', 'Estado_civil',
       'Renda_anual', 'Nro_Filhos_criancas', 'Nro_Filhos_adolescentes',
       'Data_inscricao_cliente', 'Nro_dias_ultima_compra', 'Gastos_vinho',
       'Gastos_frutas', 'Gastos_carnes', 'Gastos_peixes', 'Gastos_doces',
       'Gastos_ouro', 'Compras_promocao', 'Compras_site', 'Compras_catalogo',
       'Compras_loja_fisica', 'Nro_visitas_site', 'Aceitou_oferta3',
       'Aceitou_oferta4', 'Aceitou_oferta5', 'Aceitou_oferta1',
       'Aceitou_oferta2', 'Reclamacoes', 'Z_Custo_contato', 'Z_Receita',
       'Aceitou_ultima_oferta'],
      dtype='object')

In [418]:
#VERIFICAR OS VALORES ÚNICOS EXISTENTES EM 'Z_Custo_contato'
dfmongo['Z_Custo_contato'].unique()

array([3])

In [419]:
#VERIFICAR OS VALORES ÚNICOS EXISTENTES EM 'Z_Receita'
dfmongo['Z_Custo_contato'].unique()

array([3])

In [420]:
#DROPAR COLUNAS 'Z_Custo_contato' e 'Z_Receita' pois as mesmas possuem, respectivamente, apenas
#valores 3 e 11, sendo assim um valor repetido que representa um ruído no Dataset
dfmongo.drop(['Z_Custo_contato','Z_Receita'],axis=1,inplace=True) 

In [421]:
#DROPAR COLUNA '_id' que foi gerada automaticamente pelo MongoDB
dfmongo.drop(['_id'],axis=1,inplace=True) 

In [422]:
dfmongo.head()

Unnamed: 0,ID,Ano_de_nascimento,Escolaridade,Estado_civil,Renda_anual,Nro_Filhos_criancas,Nro_Filhos_adolescentes,Data_inscricao_cliente,Nro_dias_ultima_compra,Gastos_vinho,Gastos_frutas,Gastos_carnes,Gastos_peixes,Gastos_doces,Gastos_ouro,Compras_promocao,Compras_site,Compras_catalogo,Compras_loja_fisica,Nro_visitas_site,Aceitou_oferta3,Aceitou_oferta4,Aceitou_oferta5,Aceitou_oferta1,Aceitou_oferta2,Reclamacoes,Aceitou_ultima_oferta
0,5524,1957,Graduation,Single,58138.0,0,0,2012-09-04,58,635,88,546,172,88,88,3,8,10,4,7,0,0,0,0,0,0,1
1,2174,1954,Graduation,Single,46344.0,1,1,2014-03-08,38,11,1,6,2,1,6,2,1,1,2,5,0,0,0,0,0,0,0
2,4141,1965,Graduation,Together,71613.0,0,0,2013-08-21,26,426,49,127,111,21,42,1,8,2,10,4,0,0,0,0,0,0,0
3,6182,1984,Graduation,Together,26646.0,1,0,2014-02-10,26,11,4,20,10,3,5,2,2,0,4,6,0,0,0,0,0,0,0
4,5324,1981,PhD,Married,58293.0,1,0,2014-01-19,94,173,43,118,46,27,15,5,5,3,6,5,0,0,0,0,0,0,0


In [423]:
#VERIFICAR OS VALORES ÚNICOS EXISTENTES EM 'Escolaridade'
dfmongo['Escolaridade'].unique()

array(['Graduation', 'PhD', 'Master', 'Basic', '2n Cycle'], dtype=object)

In [424]:
#VERIFICAR OS VALORES ÚNICOS EXISTENTES EM 'Aceitou_oferta2'
dfmongo['Aceitou_oferta2'].unique()

array([0, 1])

In [425]:
#VERIFICAR OS VALORES ÚNICOS EXISTENTES EM 'Estado_civil'
dfmongo['Estado_civil'].unique()

array(['Single', 'Together', 'Married', 'Divorced', 'Widow', 'Alone',
       'Absurd', 'YOLO'], dtype=object)

In [426]:
#TRADUZIR VALORES DAS C0LUNAS
dfmongo['Escolaridade'] = dfmongo['Escolaridade'].replace(['Graduation', 'PhD', 'Master', 'Basic', '2n Cycle'],
                          ['Graduacao', 'PhD', 'Mestrado', 'Ensino_Medio', 'Pos_Graduacao' ])
dfmongo['Estado_civil'] = dfmongo['Estado_civil'].replace(['Single', 'Together', 'Married', 'Divorced', 'Widow', 'Alone', 'Absurd', 'YOLO'],
                          ['Solteiro(a)', 'Em Uniao Estavel', 'Casado(a)', 'Divorciado(a)', 'Viuvo(a)', 'Solteiro(a)','Absurd (Outro)', 'YOLO (Outro)'])

#Observação 1: Não se traduz PhD pois este é um tipo de doutorado específico

#Observação 2: Achou-se melhor não traduzir "Absurd" e "YOLO" em sua totalidade,
#pois os mesmos não apresentam um sentido concreto de Estado Civil

In [427]:
#CONFERIR TRADUÇÕES NO DATAFRAME
dfmongo.head()

Unnamed: 0,ID,Ano_de_nascimento,Escolaridade,Estado_civil,Renda_anual,Nro_Filhos_criancas,Nro_Filhos_adolescentes,Data_inscricao_cliente,Nro_dias_ultima_compra,Gastos_vinho,Gastos_frutas,Gastos_carnes,Gastos_peixes,Gastos_doces,Gastos_ouro,Compras_promocao,Compras_site,Compras_catalogo,Compras_loja_fisica,Nro_visitas_site,Aceitou_oferta3,Aceitou_oferta4,Aceitou_oferta5,Aceitou_oferta1,Aceitou_oferta2,Reclamacoes,Aceitou_ultima_oferta
0,5524,1957,Graduacao,Solteiro(a),58138.0,0,0,2012-09-04,58,635,88,546,172,88,88,3,8,10,4,7,0,0,0,0,0,0,1
1,2174,1954,Graduacao,Solteiro(a),46344.0,1,1,2014-03-08,38,11,1,6,2,1,6,2,1,1,2,5,0,0,0,0,0,0,0
2,4141,1965,Graduacao,Em Uniao Estavel,71613.0,0,0,2013-08-21,26,426,49,127,111,21,42,1,8,2,10,4,0,0,0,0,0,0,0
3,6182,1984,Graduacao,Em Uniao Estavel,26646.0,1,0,2014-02-10,26,11,4,20,10,3,5,2,2,0,4,6,0,0,0,0,0,0,0
4,5324,1981,PhD,Casado(a),58293.0,1,0,2014-01-19,94,173,43,118,46,27,15,5,5,3,6,5,0,0,0,0,0,0,0


In [428]:
#REORGANIZAR COLUNAS

dfmongo = dfmongo[['ID', 'Ano_de_nascimento', 'Escolaridade', 'Estado_civil',
       'Renda_anual', 'Nro_Filhos_criancas', 'Nro_Filhos_adolescentes',
       'Data_inscricao_cliente', 'Nro_dias_ultima_compra', 'Gastos_vinho',
       'Gastos_frutas', 'Gastos_carnes', 'Gastos_peixes', 'Gastos_doces',
       'Gastos_ouro', 'Compras_promocao', 'Compras_site', 'Compras_catalogo',
       'Compras_loja_fisica', 'Nro_visitas_site', 'Aceitou_ultima_oferta', 'Aceitou_oferta1',
       'Aceitou_oferta2', 'Aceitou_oferta3', 'Aceitou_oferta4',
       'Aceitou_oferta5', 'Reclamacoes']]

In [429]:
#CONFERINDO REORGANIZAÇÃO

dfmongo.head()

Unnamed: 0,ID,Ano_de_nascimento,Escolaridade,Estado_civil,Renda_anual,Nro_Filhos_criancas,Nro_Filhos_adolescentes,Data_inscricao_cliente,Nro_dias_ultima_compra,Gastos_vinho,Gastos_frutas,Gastos_carnes,Gastos_peixes,Gastos_doces,Gastos_ouro,Compras_promocao,Compras_site,Compras_catalogo,Compras_loja_fisica,Nro_visitas_site,Aceitou_ultima_oferta,Aceitou_oferta1,Aceitou_oferta2,Aceitou_oferta3,Aceitou_oferta4,Aceitou_oferta5,Reclamacoes
0,5524,1957,Graduacao,Solteiro(a),58138.0,0,0,2012-09-04,58,635,88,546,172,88,88,3,8,10,4,7,1,0,0,0,0,0,0
1,2174,1954,Graduacao,Solteiro(a),46344.0,1,1,2014-03-08,38,11,1,6,2,1,6,2,1,1,2,5,0,0,0,0,0,0,0
2,4141,1965,Graduacao,Em Uniao Estavel,71613.0,0,0,2013-08-21,26,426,49,127,111,21,42,1,8,2,10,4,0,0,0,0,0,0,0
3,6182,1984,Graduacao,Em Uniao Estavel,26646.0,1,0,2014-02-10,26,11,4,20,10,3,5,2,2,0,4,6,0,0,0,0,0,0,0
4,5324,1981,PhD,Casado(a),58293.0,1,0,2014-01-19,94,173,43,118,46,27,15,5,5,3,6,5,0,0,0,0,0,0,0


In [430]:
#MOSTRA OS VALORES NULOS
dfmongo.isna().sum()

#Observação: Apenas a coluna 'Renda_anual' retornou valores nulos

ID                          0
Ano_de_nascimento           0
Escolaridade                0
Estado_civil                0
Renda_anual                24
Nro_Filhos_criancas         0
Nro_Filhos_adolescentes     0
Data_inscricao_cliente      0
Nro_dias_ultima_compra      0
Gastos_vinho                0
Gastos_frutas               0
Gastos_carnes               0
Gastos_peixes               0
Gastos_doces                0
Gastos_ouro                 0
Compras_promocao            0
Compras_site                0
Compras_catalogo            0
Compras_loja_fisica         0
Nro_visitas_site            0
Aceitou_ultima_oferta       0
Aceitou_oferta1             0
Aceitou_oferta2             0
Aceitou_oferta3             0
Aceitou_oferta4             0
Aceitou_oferta5             0
Reclamacoes                 0
dtype: int64

In [431]:
#RENDA MEDIANA ANUAL
renda_mediana = dfmongo.Renda_anual.median()
renda_mediana

51381.5

In [432]:
#PARA REDUZIR A DISCREPÂNCIA NA MÉDIA DA 'Renda_anual' IREMOS SUBSTITUIR OS VALORES NULOS PELA MEDIANA DAS RENDAS NÃO NULAS
dfmongo['Renda_anual'] = dfmongo['Renda_anual'].fillna(dfmongo['Renda_anual'].median())

In [433]:
#MOSTRA OS VALORES NULOS
dfmongo.Renda_anual.isna().sum()

#Observação: agora no local dos valores nulos temos a mediana dos valores não nulos da coluna

0

In [434]:
#OUTRA FORMA DE VERIFICAR SE EXISTEM VALORES NULOS
dfmongo.info()

#Observação: Todas as colunas retornam 2240 linhas não nulas, que é o total atual de linhas da tabela

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2240 entries, 0 to 2239
Data columns (total 27 columns):
 #   Column                   Non-Null Count  Dtype         
---  ------                   --------------  -----         
 0   ID                       2240 non-null   int64         
 1   Ano_de_nascimento        2240 non-null   int64         
 2   Escolaridade             2240 non-null   object        
 3   Estado_civil             2240 non-null   object        
 4   Renda_anual              2240 non-null   float64       
 5   Nro_Filhos_criancas      2240 non-null   int64         
 6   Nro_Filhos_adolescentes  2240 non-null   int64         
 7   Data_inscricao_cliente   2240 non-null   datetime64[ns]
 8   Nro_dias_ultima_compra   2240 non-null   int64         
 9   Gastos_vinho             2240 non-null   int64         
 10  Gastos_frutas            2240 non-null   int64         
 11  Gastos_carnes            2240 non-null   int64         
 12  Gastos_peixes            2240 non-

In [435]:
#DIMENSÃO DO DATAFRAME: QUANTIDADE DE LINHAS E COLUNAS
dfmongo.shape

(2240, 27)

In [436]:
#APAGA LINHAS DUPLICADAS DO DF
dfmongo = dfmongo.drop_duplicates()

#Observação: Não foram encontradas linhas duplicadas

In [437]:
#VERIFICA SE A COLUNA ID POSSUI VALORES UNICOS
dfmongo.ID.is_unique

#Observação: A coluna não possui valores repetidos, pois tivemos "True" como retorno

True

In [438]:
dfmongo.head()

Unnamed: 0,ID,Ano_de_nascimento,Escolaridade,Estado_civil,Renda_anual,Nro_Filhos_criancas,Nro_Filhos_adolescentes,Data_inscricao_cliente,Nro_dias_ultima_compra,Gastos_vinho,Gastos_frutas,Gastos_carnes,Gastos_peixes,Gastos_doces,Gastos_ouro,Compras_promocao,Compras_site,Compras_catalogo,Compras_loja_fisica,Nro_visitas_site,Aceitou_ultima_oferta,Aceitou_oferta1,Aceitou_oferta2,Aceitou_oferta3,Aceitou_oferta4,Aceitou_oferta5,Reclamacoes
0,5524,1957,Graduacao,Solteiro(a),58138.0,0,0,2012-09-04,58,635,88,546,172,88,88,3,8,10,4,7,1,0,0,0,0,0,0
1,2174,1954,Graduacao,Solteiro(a),46344.0,1,1,2014-03-08,38,11,1,6,2,1,6,2,1,1,2,5,0,0,0,0,0,0,0
2,4141,1965,Graduacao,Em Uniao Estavel,71613.0,0,0,2013-08-21,26,426,49,127,111,21,42,1,8,2,10,4,0,0,0,0,0,0,0
3,6182,1984,Graduacao,Em Uniao Estavel,26646.0,1,0,2014-02-10,26,11,4,20,10,3,5,2,2,0,4,6,0,0,0,0,0,0,0
4,5324,1981,PhD,Casado(a),58293.0,1,0,2014-01-19,94,173,43,118,46,27,15,5,5,3,6,5,0,0,0,0,0,0,0


In [439]:
#A FUNÇÃO DESCRIBE NOS APRESENTA UM CONJUNTO DE ESTATÍSTICAS DESCRETIVAS DAS VARIÁVEIS:
#a quantidade de valores, a média, o desvio padrão, o valor mínimo, os quartis da distribuição e o valor máximo.

dfmongo.describe()

Unnamed: 0,ID,Ano_de_nascimento,Renda_anual,Nro_Filhos_criancas,Nro_Filhos_adolescentes,Nro_dias_ultima_compra,Gastos_vinho,Gastos_frutas,Gastos_carnes,Gastos_peixes,Gastos_doces,Gastos_ouro,Compras_promocao,Compras_site,Compras_catalogo,Compras_loja_fisica,Nro_visitas_site,Aceitou_ultima_oferta,Aceitou_oferta1,Aceitou_oferta2,Aceitou_oferta3,Aceitou_oferta4,Aceitou_oferta5,Reclamacoes
count,2240.0,2240.0,2240.0,2240.0,2240.0,2240.0,2240.0,2240.0,2240.0,2240.0,2240.0,2240.0,2240.0,2240.0,2240.0,2240.0,2240.0,2240.0,2240.0,2240.0,2240.0,2240.0,2240.0,2240.0
mean,5592.159821,1968.805804,52237.975446,0.444196,0.50625,49.109375,303.935714,26.302232,166.95,37.525446,27.062946,44.021875,2.325,4.084821,2.662054,5.790179,5.316518,0.149107,0.064286,0.013393,0.072768,0.074554,0.072768,0.009375
std,3246.662198,11.984069,25037.955891,0.538398,0.544538,28.962453,336.597393,39.773434,225.715373,54.628979,41.280498,52.167439,1.932238,2.778714,2.923101,3.250958,2.426645,0.356274,0.245316,0.114976,0.259813,0.262728,0.259813,0.096391
min,0.0,1893.0,1730.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
25%,2828.25,1959.0,35538.75,0.0,0.0,24.0,23.75,1.0,16.0,3.0,1.0,9.0,1.0,2.0,0.0,3.0,3.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
50%,5458.5,1970.0,51381.5,0.0,0.0,49.0,173.5,8.0,67.0,12.0,8.0,24.0,2.0,4.0,2.0,5.0,6.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
75%,8427.75,1977.0,68289.75,1.0,1.0,74.0,504.25,33.0,232.0,50.0,33.0,56.0,3.0,6.0,4.0,8.0,7.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
max,11191.0,1996.0,666666.0,2.0,2.0,99.0,1493.0,199.0,1725.0,259.0,263.0,362.0,15.0,27.0,28.0,13.0,20.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0


# Processo transformação, limpeza e filtragem dos dados usando PySpark
Neste passo vamos criar colunas, agrupar informações relevantes, utilizar Window Functions e realizar demais drops que por ventura não tenham ainda sido feitas no nível Pandas.

In [440]:
#CRIANDO UM SCHEMA PARA MEU DATAFRAME EM SPARK
esquema = (
    StructType([
        StructField("ID",IntegerType(), True),
        StructField("Ano_de_nascimento", IntegerType(), True),
        StructField("Escolaridade", StringType(), True),
        StructField("Estado_civil", StringType(), True),
        StructField("Renda_anual", FloatType(), True),
        StructField("Nro_Filhos_criancas", IntegerType(), True),
        StructField("Nro_Filhos_adolescentes", IntegerType(), True),
        StructField('Data_inscricao_cliente', DateType(), True),
        StructField('Nro_dias_ultima_compra', IntegerType(), True),
        StructField('Gastos_vinho', IntegerType(), True),
        StructField('Gastos_frutas', IntegerType(), True), 
        StructField('Gastos_carnes', IntegerType(), True),
        StructField('Gastos_peixes', IntegerType(), True),
        StructField('Gastos_doces', IntegerType(), True),
        StructField('Gastos_ouro', IntegerType(), True),
        StructField('Compras_promocao', IntegerType(), True),
        StructField('Compras_site', IntegerType(), True),
        StructField('Compras_catalogo', IntegerType(), True),
        StructField('Compras_loja_fisica', IntegerType(), True),
        StructField('Nro_visitas_site', IntegerType(), True), 
        StructField('Aceitou_ultima_oferta', IntegerType(), True),
        StructField('Aceitou_oferta1', IntegerType(), True),
        StructField('Aceitou_oferta2', IntegerType(), True),
        StructField('Aceitou_oferta3', IntegerType(), True),
        StructField('Aceitou_oferta4', IntegerType(), True),
        StructField('Aceitou_oferta5', IntegerType(), True),
        StructField('Reclamacoes', IntegerType(), True),
    ])
)

In [441]:
#CRIAR UM DATAFRAME DO SPARK A PARTIR DO DF DO PANDAS
dfspark = spark.createDataFrame(dfmongo, schema=esquema)

In [442]:
#VISUALIZANDO O DATAFRAME
dfspark.show()

+----+-----------------+-------------+----------------+-----------+-------------------+-----------------------+----------------------+----------------------+------------+-------------+-------------+-------------+------------+-----------+----------------+------------+----------------+-------------------+----------------+---------------------+---------------+---------------+---------------+---------------+---------------+-----------+
|  ID|Ano_de_nascimento| Escolaridade|    Estado_civil|Renda_anual|Nro_Filhos_criancas|Nro_Filhos_adolescentes|Data_inscricao_cliente|Nro_dias_ultima_compra|Gastos_vinho|Gastos_frutas|Gastos_carnes|Gastos_peixes|Gastos_doces|Gastos_ouro|Compras_promocao|Compras_site|Compras_catalogo|Compras_loja_fisica|Nro_visitas_site|Aceitou_ultima_oferta|Aceitou_oferta1|Aceitou_oferta2|Aceitou_oferta3|Aceitou_oferta4|Aceitou_oferta5|Reclamacoes|
+----+-----------------+-------------+----------------+-----------+-------------------+-----------------------+-----------------

In [None]:
#VERIFICA OS TIPOS DAS COLUNAS
dfspark.printSchema()

In [444]:
#SELECIONANDO APENAS ALGUMAS COLUNAS PARA SEREM EXIBIDAS
dfspark.select("Ano_de_nascimento","Estado_civil","Renda_anual").show()

+-----------------+----------------+-----------+
|Ano_de_nascimento|    Estado_civil|Renda_anual|
+-----------------+----------------+-----------+
|             1957|     Solteiro(a)|    58138.0|
|             1954|     Solteiro(a)|    46344.0|
|             1965|Em Uniao Estavel|    71613.0|
|             1984|Em Uniao Estavel|    26646.0|
|             1981|       Casado(a)|    58293.0|
|             1967|Em Uniao Estavel|    62513.0|
|             1971|   Divorciado(a)|    55635.0|
|             1985|       Casado(a)|    33454.0|
|             1974|Em Uniao Estavel|    30351.0|
|             1950|Em Uniao Estavel|     5648.0|
|             1983|       Casado(a)|    51381.5|
|             1976|       Casado(a)|     7500.0|
|             1959|   Divorciado(a)|    63033.0|
|             1952|   Divorciado(a)|    59354.0|
|             1987|       Casado(a)|    17323.0|
|             1946|     Solteiro(a)|    82800.0|
|             1980|       Casado(a)|    41850.0|
|             1946|E

In [445]:
#CRIANDO NOVA COLUNA COM A IDADE DOS CLIENTES EM 2014
dfspark = dfspark.withColumn('Idade em 2014', 2014 - F.col('Ano_de_nascimento'))

#Observação: O último registro de cadastro de cliente neste dataframe é de 2014,
#por isso a escolha deste ano específico para inferir as idades dos clientes

In [446]:
#CRIAR UMA NOVA COLUNA COM A SOMA DE "Nro_Filhos_criancas" + "Nro_Filhos_adolescentes"
#PARA EXIBIR QUANTOS FILHOS DECLARADOS E MENORES DE IDADE O CLIENTE POSSUI

dfspark = dfspark.withColumn("Nro total filhos", F.col("Nro_Filhos_criancas") + F.col("Nro_Filhos_adolescentes"))

In [447]:
#RENOMEAR COLUNAS COM OBJETIVO DE PADRONIZAR NOMECLATURAS
#SUBSTITUINDO "ESPAÇO" POR "_"

dfspark = dfspark.withColumnRenamed("Idade em 2014", "Idade_em_2014").withColumnRenamed("Nro total filhos", "Nro_total_filhos")

In [448]:
#CRIANDO UMA NOVA COLUNA COM BASE EM DUAS CONDIÇÕES:
#POSSUIR OU NÃO FILHOS

dfspark = (dfspark.withColumn("Possui_filhos", F.when(F.col("Nro_total_filhos") > 0, F.lit("Sim"))
                    .otherwise(F.lit("Nao"))))

In [449]:
#CRIANDO UMA NOVA COLUNA PARA SABER SE O CLIENTE ACEITOU ALGUMA OFERTA:

dfspark = (dfspark.withColumn("Aceitou_alguma_oferta", F.when((F.col('Aceitou_oferta1') > 0) | (F.col('Aceitou_oferta2') > 0)
| (F.col('Aceitou_oferta3') > 0) | (F.col('Aceitou_oferta4') > 0) | (F.col('Aceitou_oferta5') > 0)
| (F.col('Aceitou_ultima_oferta') > 0), F.lit("Sim"))
                                .otherwise(F.lit('Nao'))))

In [450]:
#CRIANDO UMA NOVA COLUNA PARA SABER O TOTAL DE GASTOS DO CLIENTE EM COMPRAS NA LOJA:

dfspark = dfspark.withColumn("Gastos_total", F.col("Gastos_vinho") + F.col("Gastos_frutas") + F.col("Gastos_carnes")
+ F.col("Gastos_peixes") + F.col("Gastos_doces") + F.col("Gastos_ouro"))

In [451]:
dfspark.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Ano_de_nascimento: integer (nullable = true)
 |-- Escolaridade: string (nullable = true)
 |-- Estado_civil: string (nullable = true)
 |-- Renda_anual: float (nullable = true)
 |-- Nro_Filhos_criancas: integer (nullable = true)
 |-- Nro_Filhos_adolescentes: integer (nullable = true)
 |-- Data_inscricao_cliente: date (nullable = true)
 |-- Nro_dias_ultima_compra: integer (nullable = true)
 |-- Gastos_vinho: integer (nullable = true)
 |-- Gastos_frutas: integer (nullable = true)
 |-- Gastos_carnes: integer (nullable = true)
 |-- Gastos_peixes: integer (nullable = true)
 |-- Gastos_doces: integer (nullable = true)
 |-- Gastos_ouro: integer (nullable = true)
 |-- Compras_promocao: integer (nullable = true)
 |-- Compras_site: integer (nullable = true)
 |-- Compras_catalogo: integer (nullable = true)
 |-- Compras_loja_fisica: integer (nullable = true)
 |-- Nro_visitas_site: integer (nullable = true)
 |-- Aceitou_ultima_oferta: integer (nullable = t

In [452]:
#REORGANIZAR COLUNAS

dfspark = dfspark[['ID', 'Ano_de_nascimento', 'Idade_em_2014', 'Escolaridade', 'Estado_civil',
       'Renda_anual', 'Nro_Filhos_criancas', 'Nro_Filhos_adolescentes', 'Nro_total_filhos', 'Possui_filhos',
       'Data_inscricao_cliente', 'Nro_dias_ultima_compra','Gastos_total','Gastos_vinho',
       'Gastos_frutas', 'Gastos_carnes', 'Gastos_peixes', 'Gastos_doces',
       'Gastos_ouro', 'Compras_promocao', 'Compras_site', 'Compras_catalogo',
       'Compras_loja_fisica', 'Nro_visitas_site', 'Aceitou_ultima_oferta', 'Aceitou_oferta1',
       'Aceitou_oferta2', 'Aceitou_oferta3', 'Aceitou_oferta4',
       'Aceitou_oferta5', 'Aceitou_alguma_oferta','Reclamacoes']]

In [453]:
#CONFERINDO SE A ORGANIZAÇÃO DEU CERTO
dfspark.show(5)

+----+-----------------+-------------+------------+----------------+-----------+-------------------+-----------------------+----------------+-------------+----------------------+----------------------+------------+------------+-------------+-------------+-------------+------------+-----------+----------------+------------+----------------+-------------------+----------------+---------------------+---------------+---------------+---------------+---------------+---------------+---------------------+-----------+
|  ID|Ano_de_nascimento|Idade_em_2014|Escolaridade|    Estado_civil|Renda_anual|Nro_Filhos_criancas|Nro_Filhos_adolescentes|Nro_total_filhos|Possui_filhos|Data_inscricao_cliente|Nro_dias_ultima_compra|Gastos_total|Gastos_vinho|Gastos_frutas|Gastos_carnes|Gastos_peixes|Gastos_doces|Gastos_ouro|Compras_promocao|Compras_site|Compras_catalogo|Compras_loja_fisica|Nro_visitas_site|Aceitou_ultima_oferta|Aceitou_oferta1|Aceitou_oferta2|Aceitou_oferta3|Aceitou_oferta4|Aceitou_oferta5|Aceit

In [454]:
#VERIFICAR SE EXISTEM PESSOAS COM MAIS DE 90 ANOS
#Objetivo: descobrir se existem outliers

dfspark.where(F.col("Idade_em_2014") > 90).show()

#Resultado: existem pelo menos 3 linhas contendo clientes que declararam ter mais de 100 de idade
#como é bastante imporovável que estes possuam de fato essa idade, já que a pessoa mais velha do mundo tem 119 anos,
#essas linhas de outliers serão dropadas para não atrapalhar outras métricas

+-----+-----------------+-------------+-------------+----------------+-----------+-------------------+-----------------------+----------------+-------------+----------------------+----------------------+------------+------------+-------------+-------------+-------------+------------+-----------+----------------+------------+----------------+-------------------+----------------+---------------------+---------------+---------------+---------------+---------------+---------------+---------------------+-----------+
|   ID|Ano_de_nascimento|Idade_em_2014| Escolaridade|    Estado_civil|Renda_anual|Nro_Filhos_criancas|Nro_Filhos_adolescentes|Nro_total_filhos|Possui_filhos|Data_inscricao_cliente|Nro_dias_ultima_compra|Gastos_total|Gastos_vinho|Gastos_frutas|Gastos_carnes|Gastos_peixes|Gastos_doces|Gastos_ouro|Compras_promocao|Compras_site|Compras_catalogo|Compras_loja_fisica|Nro_visitas_site|Aceitou_ultima_oferta|Aceitou_oferta1|Aceitou_oferta2|Aceitou_oferta3|Aceitou_oferta4|Aceitou_oferta5|A

In [455]:
#DROPAR LINHAS QUE CONTÊM OUTLIERS (valores atípicos)

dfspark = dfspark.where(F.col("ID") != 7829).drop()
dfspark = dfspark.where(F.col("ID") != 11004).drop()
dfspark = dfspark.where(F.col("ID") != 1150).drop()

In [456]:
#VERIFICAR SE LINHAS FORAM APAGADAS

dfspark.where(F.col("Idade_em_2014") > 90).show()

#Observação: Linhas com outliers apagadas com sucesso

+---+-----------------+-------------+------------+------------+-----------+-------------------+-----------------------+----------------+-------------+----------------------+----------------------+------------+------------+-------------+-------------+-------------+------------+-----------+----------------+------------+----------------+-------------------+----------------+---------------------+---------------+---------------+---------------+---------------+---------------+---------------------+-----------+
| ID|Ano_de_nascimento|Idade_em_2014|Escolaridade|Estado_civil|Renda_anual|Nro_Filhos_criancas|Nro_Filhos_adolescentes|Nro_total_filhos|Possui_filhos|Data_inscricao_cliente|Nro_dias_ultima_compra|Gastos_total|Gastos_vinho|Gastos_frutas|Gastos_carnes|Gastos_peixes|Gastos_doces|Gastos_ouro|Compras_promocao|Compras_site|Compras_catalogo|Compras_loja_fisica|Nro_visitas_site|Aceitou_ultima_oferta|Aceitou_oferta1|Aceitou_oferta2|Aceitou_oferta3|Aceitou_oferta4|Aceitou_oferta5|Aceitou_alguma_

In [457]:
#VERIFICA NÚMERO DE LINHAS DO DATAFRAME

dfspark.count()

2237

In [458]:
#FILTRO 1: PERFIL DE GASTOS DO CONSUMIDOR POR IDADE
df1 = ( dfspark.groupBy(F.col("Idade_em_2014"))
      .agg(F.max("Gastos_total"), #gastos máximos
           F.min("Gastos_total"), #gastos mínimos
           F.avg("Gastos_total")) #média de gastos
)

#Ordenar por idade
df1.orderBy("Idade_em_2014").show(100)


+-------------+-----------------+-----------------+------------------+
|Idade_em_2014|max(Gastos_total)|min(Gastos_total)| avg(Gastos_total)|
+-------------+-----------------+-----------------+------------------+
|           18|              122|               16|              69.0|
|           19|             1435|               23|             761.2|
|           20|             1990|             1004|1457.3333333333333|
|           21|             1258|             1127|            1191.2|
|           22|             2077|               29| 685.5384615384615|
|           23|             2525|               14|1112.3333333333333|
|           24|             1722|               41|             532.0|
|           25|             2069|               18| 466.1666666666667|
|           26|             2524|               17| 720.0689655172414|
|           27|             1901|               15| 617.1851851851852|
|           28|             2157|               10|             523.5|
|     

In [459]:
#FILTRO 2: PERFIL DE GASTOS DO CONSUMIDOR POR ESTADO CIVIL
df2 = ( dfspark.groupBy(F.col("Estado_civil"))
      .agg(F.max("Gastos_total"), #gastos máximos
           F.min("Gastos_total"), #gastos mínimos
           F.avg("Gastos_total")) #média de gastos
)

#Ordenar por média de gastos
df2.orderBy(F.avg("Gastos_total").desc()).show()

+----------------+-----------------+-----------------+-----------------+
|    Estado_civil|max(Gastos_total)|min(Gastos_total)|avg(Gastos_total)|
+----------------+-----------------+-----------------+-----------------+
|  Absurd (Outro)|             1216|             1169|           1192.5|
|        Viuvo(a)|             2088|               15|738.8181818181819|
|   Divorciado(a)|             2074|                6| 612.991341991342|
|Em Uniao Estavel|             2524|                8|606.2383419689119|
|     Solteiro(a)|             2525|                5|605.5186721991702|
|       Casado(a)|             2486|                8|590.8020833333334|
|    YOLO (Outro)|              424|              424|            424.0|
+----------------+-----------------+-----------------+-----------------+



In [460]:
#FILTRO 3: PERFIL DE GASTOS DO CONSUMIDOR QUE TEM OU NÃO TEM FILHOS EM CASA
df3 = ( dfspark.groupBy(F.col("Possui_filhos"))
      .agg(F.max("Gastos_total"), #gastos máximos
           F.min("Gastos_total"), #gastos mínimos
           F.avg("Gastos_total")) #média de gastos
)

#Ordenar por média de gastos
df3.orderBy(F.avg("Gastos_total").desc()).show()

#Observação: Quem não possui filhos tem uma média de gastos total superior a quem tem filhos

+-------------+-----------------+-----------------+-----------------+
|Possui_filhos|max(Gastos_total)|min(Gastos_total)|avg(Gastos_total)|
+-------------+-----------------+-----------------+-----------------+
|          Nao|             2525|                6|1104.857142857143|
|          Sim|             2194|                5|        407.03375|
+-------------+-----------------+-----------------+-----------------+



In [461]:
#VERIFICA QUANTAS PESSOAS SE DECLARARAM "YOLO"
dfspark.where(F.col("Estado_civil") == 'YOLO (Outro)').show()

#Observação: possível linha duplicada, pois todas as informações das duas linhas são iguais
#e as únicas colunas diferentes são: "Aceitou_ultima_oferta" e "Aceitou_alguma_oferta"
#Iremos dropar a linha de ID = 11133 para diminuir o ruído no dataset

+-----+-----------------+-------------+------------+------------+-----------+-------------------+-----------------------+----------------+-------------+----------------------+----------------------+------------+------------+-------------+-------------+-------------+------------+-----------+----------------+------------+----------------+-------------------+----------------+---------------------+---------------+---------------+---------------+---------------+---------------+---------------------+-----------+
|   ID|Ano_de_nascimento|Idade_em_2014|Escolaridade|Estado_civil|Renda_anual|Nro_Filhos_criancas|Nro_Filhos_adolescentes|Nro_total_filhos|Possui_filhos|Data_inscricao_cliente|Nro_dias_ultima_compra|Gastos_total|Gastos_vinho|Gastos_frutas|Gastos_carnes|Gastos_peixes|Gastos_doces|Gastos_ouro|Compras_promocao|Compras_site|Compras_catalogo|Compras_loja_fisica|Nro_visitas_site|Aceitou_ultima_oferta|Aceitou_oferta1|Aceitou_oferta2|Aceitou_oferta3|Aceitou_oferta4|Aceitou_oferta5|Aceitou_alg

In [462]:
#DROPAR LINHA QUE CONTÉM OUTLIER (valor duplicado)

dfspark = dfspark.where(F.col("ID") != 11133).drop()

In [463]:
#VERIFICANDO SE LINHA DUPLICADA FOI APAGADA

dfspark.where(F.col("Estado_civil") == 'YOLO (Outro)').show()

+---+-----------------+-------------+------------+------------+-----------+-------------------+-----------------------+----------------+-------------+----------------------+----------------------+------------+------------+-------------+-------------+-------------+------------+-----------+----------------+------------+----------------+-------------------+----------------+---------------------+---------------+---------------+---------------+---------------+---------------+---------------------+-----------+
| ID|Ano_de_nascimento|Idade_em_2014|Escolaridade|Estado_civil|Renda_anual|Nro_Filhos_criancas|Nro_Filhos_adolescentes|Nro_total_filhos|Possui_filhos|Data_inscricao_cliente|Nro_dias_ultima_compra|Gastos_total|Gastos_vinho|Gastos_frutas|Gastos_carnes|Gastos_peixes|Gastos_doces|Gastos_ouro|Compras_promocao|Compras_site|Compras_catalogo|Compras_loja_fisica|Nro_visitas_site|Aceitou_ultima_oferta|Aceitou_oferta1|Aceitou_oferta2|Aceitou_oferta3|Aceitou_oferta4|Aceitou_oferta5|Aceitou_alguma_

In [464]:
#NESTE MOMENTO IREMOS REDUZIR NOSSO DATAFRAME PARA TRABALHARMOS COM APENAS ALGUNS ATRIBUTOS DO MESMO

dfreduzido = dfspark[['ID', 'Idade_em_2014', 'Escolaridade', 'Estado_civil',
       'Renda_anual', 'Nro_total_filhos', 'Possui_filhos',
       'Gastos_total', 'Aceitou_alguma_oferta','Reclamacoes']]

In [465]:
#WINDOW FUNCTION QUE PERMITE OBSERVAR A MÉDIA DE RENDA ANUAL DO CLIENTE ORGANIZADO POR ESCOLARIDADE
#E ORDENADO POR QUEM POSSUI A MENOR RENDA ANUAL DECLARADA EM DIANTE

w0 = Window.partitionBy('Escolaridade')

filtro2 = dfreduzido.withColumn('Lista_renda_anual', F.collect_list(F.col("Renda_anual")).over(w0)) \
.withColumn("Media_renda", F.avg(F.col("Renda_anual")).over(w0))

filtro2.show()

#Observação: utilizando a window function teremos a criação de mais duas novas colunas
#"Lista_renda_anual" e "Media_renda" que serão adicionadas em um novo dataframe
#com informações sobre a Média de renda anual dos clientes de acordo com sua escolaridade

+----+-------------+------------+----------------+-----------+----------------+-------------+------------+---------------------+-----------+--------------------+-----------------+
|  ID|Idade_em_2014|Escolaridade|    Estado_civil|Renda_anual|Nro_total_filhos|Possui_filhos|Gastos_total|Aceitou_alguma_oferta|Reclamacoes|   Lista_renda_anual|      Media_renda|
+----+-------------+------------+----------------+-----------+----------------+-------------+------------+---------------------+-----------+--------------------+-----------------+
| 387|           38|Ensino_Medio|       Casado(a)|     7500.0|               0|          Nao|          61|                  Nao|          0|[7500.0, 24594.0,...|20306.25925925926|
|8373|           35|Ensino_Medio|Em Uniao Estavel|    24594.0|               1|          Sim|          29|                  Nao|          0|[7500.0, 24594.0,...|20306.25925925926|
|5342|           38|Ensino_Medio|   Divorciado(a)|     9548.0|               1|          Sim|       

In [466]:
#UTILIZANDO GROUP BY PARA VISUALIZAR A MÉDIA DE RENDA ANUAL DO CLIENTE ORGANIZADO POR ESCOLARIDADE
filtro3 = dfreduzido.groupBy("Escolaridade").agg(
    F.expr("collect_list(Renda_anual)").alias("Lista_renda_anual"),
    F.expr("avg(Renda_anual)").alias("Media_renda_anual"))

filtro3.show()

#Observação: Utilizando Groupby fica mais fácil agrupar os atributos desejados e
#com esta análise preliminar observamos que pessoas graduadas ou pós-graduadas possuem renda anual de mais que o dobro
#daqueles clientes que possuem apenas Ensino Médio

+-------------+--------------------+-----------------+
| Escolaridade|   Lista_renda_anual|Media_renda_anual|
+-------------+--------------------+-----------------+
| Ensino_Medio|[7500.0, 24594.0,...|20306.25925925926|
|Pos_Graduacao|[33812.0, 23718.0...|47681.39552238806|
|    Graduacao|[58138.0, 46344.0...|52707.30567879326|
|          PhD|[58293.0, 33454.0...|56055.45351239669|
|     Mestrado|[62513.0, 59354.0...|52896.77702702703|
+-------------+--------------------+-----------------+



In [467]:
#WINDOW FUNCTION QUE PARTICIONA O DATAFRAME POR ESTADO_CIVIL E ORDENA POR OS RESULTADOS PELA RENDA ANUAL

w1 = Window.partitionBy(F.col('Estado_civil')).orderBy('Renda_anual')

filtro4 = dfreduzido.withColumn('Nro_da_linha', F.row_number().over(w1)).show()


+-----+-------------+-------------+--------------+-----------+----------------+-------------+------------+---------------------+-----------+------------+
|   ID|Idade_em_2014| Escolaridade|  Estado_civil|Renda_anual|Nro_total_filhos|Possui_filhos|Gastos_total|Aceitou_alguma_oferta|Reclamacoes|Nro_da_linha|
+-----+-------------+-------------+--------------+-----------+----------------+-------------+------------+---------------------+-----------+------------+
| 4369|           57|     Mestrado|Absurd (Outro)|    65487.0|               0|          Nao|        1169|                  Nao|          0|           1|
| 7734|           21|    Graduacao|Absurd (Outro)|    79244.0|               0|          Nao|        1216|                  Sim|          0|           2|
| 5376|           35|    Graduacao|     Casado(a)|     2447.0|               1|          Sim|        1730|                  Nao|          0|           1|
| 9931|           51|          PhD|     Casado(a)|     4023.0|              

# Consultas usando SparkSQL
Agora iremos consultar nosso dataframe a partir de Queries diversas que buscam investigar mais sobre os dados. Iremos também explicar a sintaxe de cada consulta.

In [468]:
dfspark.createOrReplaceTempView('marketing')

In [469]:
#1) Qual é o Estado Civil mais comum do consumidor da empresa?

#Conta quantos clientes da tabela marketing possuem estado civil igual as condições descritas abaixo
spark.sql('SELECT COUNT(*) FROM marketing WHERE Estado_civil == "Solteiro(a)"').show()
spark.sql('SELECT COUNT(*) FROM marketing WHERE Estado_civil == "Casado(a)"').show()
spark.sql('SELECT COUNT(*) FROM marketing WHERE Estado_civil == "Divorciado(a)"').show()
spark.sql('SELECT COUNT(*) FROM marketing WHERE Estado_civil == "Em Uniao Estavel"').show()
spark.sql('SELECT COUNT(*) FROM marketing WHERE Estado_civil == "Viuvo(a)"').show()
spark.sql('SELECT COUNT(*) FROM marketing WHERE Estado_civil LIKE "%(Outro)"').show()

#Resultado: 482 Solteiros, 864 Casados, 231 Divorciados, 579 Em união estável, 77 Viuvos, 3 Outros
#Resposta: A empresa possui mais clientes com estado civil "Casado"(864) e em "União Estável"(579)

+--------+
|count(1)|
+--------+
|     482|
+--------+

+--------+
|count(1)|
+--------+
|     864|
+--------+

+--------+
|count(1)|
+--------+
|     231|
+--------+

+--------+
|count(1)|
+--------+
|     579|
+--------+

+--------+
|count(1)|
+--------+
|      77|
+--------+

+--------+
|count(1)|
+--------+
|       3|
+--------+



In [470]:
#2) Quais os gastos totais agrupados pelo estado civil dos clientes?

#Seleciona a soma dos gastos totais por estado civil da tabela marketing agrupando por estado civil e 
#ordenando do maior para o menor  os valores em gastos
spark.sql('SELECT Estado_civil, SUM(Gastos_total) AS Gastos_total FROM marketing GROUP BY Estado_civil ORDER BY Gastos_total DESC').show()

#Resultado: Essa consulta aliada a anterior consolida que os Casados e os em União Estável são
#o maior grupo consumidor e também o que gasta mais em produtos da empresa

+----------------+------------+
|    Estado_civil|Gastos_total|
+----------------+------------+
|       Casado(a)|      510453|
|Em Uniao Estavel|      351012|
|     Solteiro(a)|      291860|
|   Divorciado(a)|      141601|
|        Viuvo(a)|       56889|
|  Absurd (Outro)|        2385|
|    YOLO (Outro)|         424|
+----------------+------------+



In [471]:
#3) Houve um aumento no número de clientes cadastrados entre 2012 e 2013?

#Conta na tabela marketing quando o ano for igual a 2012 e na outra consulta quando o ano for igual a 2013
spark.sql('SELECT COUNT(*) FROM marketing WHERE year(Data_inscricao_cliente) == 2012').show()
spark.sql('SELECT COUNT(*) FROM marketing WHERE year(Data_inscricao_cliente) == 2013').show()

#Resposta: Sim, o número de clientes cadastrados cresceu de um ano para o outro

+--------+
|count(1)|
+--------+
|     493|
+--------+

+--------+
|count(1)|
+--------+
|    1187|
+--------+



In [472]:
#4) Existem mais clientes com filhos ou sem?

#Em "Estado_civil" da tabela marketing conta quantos clientes "Possui_filhos" == "Sim"
# e na consulta abaixo quantos "Possui_filhos" == "Não"
spark.sql('SELECT Estado_civil, COUNT(Possui_filhos) FROM marketing WHERE Possui_filhos == "Sim" GROUP BY Estado_civil').show()
spark.sql('SELECT Estado_civil, COUNT(Possui_filhos) FROM marketing WHERE Possui_filhos == "Nao" GROUP BY Estado_civil').show()


#Resposta: Em todos os estados civis (exceto Absurd) existem mais pessoas com filhos do que sem


+----------------+--------------------+
|    Estado_civil|count(Possui_filhos)|
+----------------+--------------------+
|   Divorciado(a)|                 173|
|Em Uniao Estavel|                 426|
|       Casado(a)|                 634|
|        Viuvo(a)|                  50|
|    YOLO (Outro)|                   1|
|     Solteiro(a)|                 315|
+----------------+--------------------+

+----------------+--------------------+
|    Estado_civil|count(Possui_filhos)|
+----------------+--------------------+
|   Divorciado(a)|                  58|
|Em Uniao Estavel|                 153|
|       Casado(a)|                 230|
|        Viuvo(a)|                  27|
|  Absurd (Outro)|                   2|
|     Solteiro(a)|                 167|
+----------------+--------------------+



In [473]:
#5) Clientes com menos filhos possuem média de renda anual maior ou menor do que aqueles clientes que têm filhos?

#Seleciona do "Nro_total_filhos" a média(AVG) da "Renda_anual" da tabela marketing, agrupando pelo "Nro_total_filhos" em ordem da
#"Media_renda_anual" maior para a menor
spark.sql('SELECT Nro_total_filhos, AVG(Renda_anual) AS Media_renda_anual FROM marketing GROUP BY Nro_total_filhos ORDER BY Media_renda_anual DESC').show()

#Resultado: Clientes sem filhos possuem renda anual maior do que os clientes que possuem filhos.

+----------------+------------------+
|Nro_total_filhos| Media_renda_anual|
+----------------+------------------+
|               0| 65565.14678178963|
|               1| 47747.25111111111|
|               3| 46943.29245283019|
|               2|44692.704275534445|
+----------------+------------------+



In [474]:
#6) Qual o recorde de maior gasto total de um uníco cliente e qual o menor gasto total feito por um cliente?

#Seleciona o valor máximo(max) de "Gastos_total" e o valor mínimo(min) de "Gastos_total" da tabela marketing
spark.sql('SELECT MAX(Gastos_total) AS Maior_gasto, MIN(Gastos_total) AS Menor_gasto FROM marketing').show()

#Resultado: O maior conjunto de compras (total de gastos) foi no valor de 2525,00 e o menor conjunto de compras foi no valor de 5,00

+-----------+-----------+
|Maior_gasto|Menor_gasto|
+-----------+-----------+
|       2525|          5|
+-----------+-----------+



# Carregar arquivo tratado para a Cloud Storage




In [475]:
#CONVERTENDO DF DE SPARK PRA PANDAS
df = dfspark.toPandas()


In [476]:
#FAZENDO CÓPIA NA BUCKET DO ARQUIVO TRATADO
client = storage.Client()
bucket = client.get_bucket('mara')

bucket.blob('marketing_campaign_tratado.csv').upload_from_string(df.to_csv(), 'text/csv')

# Carregar arquivo tratado para o MONGODB

In [477]:
#FAZER CAST POIS O MONGO NÃO ACEITA O FORMATO DATE TIME
df['Data_inscricao_cliente'] = df['Data_inscricao_cliente'].astype(str)

In [478]:
#DATABASE 'trabalhomarina' CRIADA NO MONGO
db = cliente['trabalhomarina']
#COLECAO 'marketingTratado' CRIADA COM BASE NO db
dataTratada = db.marketingTratado

In [479]:
#ATENÇÃO: NÃO RODAR NOVAMENTE, POIS UMA VEZ CRIADA A COLEÇÃO SE ESTA FOR INSERIDA DE NOVO TEREMOS LINHAS DUPLICADAS

'''
#CARREGANDO DATAFRAME TRATADO PARA MONGODB
df = df.to_dict("records")

#INSERINDO NA COLEÇÃO
dataTratada.insert_many(df)
'''

'\n#CARREGANDO DATAFRAME TRATADO PARA MONGODB\ndf = df.to_dict("records")\n\n#INSERINDO NA COLEÇÃO\ndataTratada.insert_many(df)\n'