#PROJETO INDIVIDUAL

#1. Bibliotecas

###1.1.Instalações de Bibliotecas

In [None]:
!pip install pyspark 
!pip install pymongo
!pip install gcsfs
!pip install pymysql

###1.2.Importação das Bibliotecas

In [None]:
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
from pymongo import MongoClient
import os
import pyspark.sql.functions as F
from pyspark.sql.types import *
from sqlalchemy import create_engine

###1.3.Conectar com Bibliotecas

####1.3.1.GCP

In [None]:
#Conecção com a GCP
serviceAccount = '/content/key-store-via-colab.json'
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = serviceAccount

####1.3.2.PySpark

In [None]:
#CONFIGURAR A SPARK SESSION
spark = (
   SparkSession.builder
               .master('local')
               .appName('gcsfs') 
               .config('spark.ui.port', '4050')
               .config("spark.jars", 'https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop2-latest.jar') #config acesso conector gcp
               .getOrCreate()                
)
   

In [None]:
spark

####1.3.3.Mongo

In [None]:
#conector do Mongo Atlas
uri = "mongodb+srv://nayyarabernardo-soulcod.mv7bzjn.mongodb.net/?authSource=%24external&authMechanism=MONGODB-X509&retryWrites=true&w=majority"
client = MongoClient(uri, 
                     tls=True, 
                     tlsCertificateKeyFile='/content/drive/MyDrive/BC26-ENG DADOS-PYTHON/PROJETOS/PROJETO AQUECIMENTO/X509-cert-3376380089754927980.pem')

#2.Load - Carregamento de dados originais - GCP e MongoDB

##2.1.Carregando os arquivos Originais para o Bucket

In [None]:
#CRIAR UM DF COM ESSES ARQUIVOS 
df_01 = pd.read_csv('/content/marketing_campaign.csv', sep="\t")


#ENVIAR ESSE DF PARA O BUCKET
df_01.to_csv('gs://projeto_cat/ORIGINAL/marketing_campaign.csv', index=False)

##2.2.Carregando os arquivos Brutos para o MongoDB

In [None]:
#Criar conexão
db = client['dados_mark']

In [None]:
# #Criar e enviar coleção ORIGINAL  
colecao_1 = db['marketing_campaign-ORIGINAL']
df1_dict = df_01.to_dict('records')
# Inserindo os dados no mongodb
colecao_1.insert_many(df1_dict)

#3.Extract - Extração de dados do data lake

In [None]:
#Para configurar meu display pandas 
pd.set_option('display.max_columns',25)

In [None]:
df_1 = pd.read_csv('gs://projeto_cat/ORIGINAL/marketing_campaign.csv')

#4.Trasnform - Tratamento utlizando Pandas e PySpark

##4.1.PANDAS

In [None]:
#Backap do DF GERAL geral
dfbackgeral = df_1.copy()

###Pre-analise dos dados

In [None]:
#Determinando de linhas e de colunas
df_1.shape

(2240, 29)

In [None]:
#Analisar as primeiras linhas para ter uma compreensão basica do df

df_1.head(5)

Unnamed: 0,ID,Year_Birth,Education,Marital_Status,Income,Kidhome,Teenhome,Dt_Customer,Recency,MntWines,MntFruits,MntMeatProducts,...,NumCatalogPurchases,NumStorePurchases,NumWebVisitsMonth,AcceptedCmp3,AcceptedCmp4,AcceptedCmp5,AcceptedCmp1,AcceptedCmp2,Complain,Z_CostContact,Z_Revenue,Response
0,5524,1957,Graduation,Single,58138.0,0,0,04-09-2012,58,635,88,546,...,10,4,7,0,0,0,0,0,0,3,11,1
1,2174,1954,Graduation,Single,46344.0,1,1,08-03-2014,38,11,1,6,...,1,2,5,0,0,0,0,0,0,3,11,0
2,4141,1965,Graduation,Together,71613.0,0,0,21-08-2013,26,426,49,127,...,2,10,4,0,0,0,0,0,0,3,11,0
3,6182,1984,Graduation,Together,26646.0,1,0,10-02-2014,26,11,4,20,...,0,4,6,0,0,0,0,0,0,3,11,0
4,5324,1981,PhD,Married,58293.0,1,0,19-01-2014,94,173,43,118,...,3,6,5,0,0,0,0,0,0,3,11,0


In [None]:
df_1.dtypes

ID                       int64
Year_Birth               int64
Education               object
Marital_Status          object
Income                 float64
Kidhome                  int64
Teenhome                 int64
Dt_Customer             object
Recency                  int64
MntWines                 int64
MntFruits                int64
MntMeatProducts          int64
MntFishProducts          int64
MntSweetProducts         int64
MntGoldProds             int64
NumDealsPurchases        int64
NumWebPurchases          int64
NumCatalogPurchases      int64
NumStorePurchases        int64
NumWebVisitsMonth        int64
AcceptedCmp3             int64
AcceptedCmp4             int64
AcceptedCmp5             int64
AcceptedCmp1             int64
AcceptedCmp2             int64
Complain                 int64
Z_CostContact            int64
Z_Revenue                int64
Response                 int64
dtype: object

In [None]:
df_1.nunique()

ID                     2240
Year_Birth               59
Education                 5
Marital_Status            8
Income                 1974
Kidhome                   3
Teenhome                  3
Dt_Customer             663
Recency                 100
MntWines                776
MntFruits               158
MntMeatProducts         558
MntFishProducts         182
MntSweetProducts        177
MntGoldProds            213
NumDealsPurchases        15
NumWebPurchases          15
NumCatalogPurchases      14
NumStorePurchases        14
NumWebVisitsMonth        16
AcceptedCmp3              2
AcceptedCmp4              2
AcceptedCmp5              2
AcceptedCmp1              2
AcceptedCmp2              2
Complain                  2
Z_CostContact             1
Z_Revenue                 1
Response                  2
dtype: int64

In [None]:
df_1['ID'].is_unique

True

In [None]:
sorted(pd.unique(df_1['Education']))

['2n Cycle', 'Basic', 'Graduation', 'Master', 'PhD']

In [None]:
sorted(pd.unique(df_1['Marital_Status']))

['Absurd',
 'Alone',
 'Divorced',
 'Married',
 'Single',
 'Together',
 'Widow',
 'YOLO']

In [None]:
df_1

Unnamed: 0,ID,Year_Birth,Education,Marital_Status,Income,Kidhome,Teenhome,Dt_Customer,Recency,MntWines,MntFruits,MntMeatProducts,...,NumCatalogPurchases,NumStorePurchases,NumWebVisitsMonth,AcceptedCmp3,AcceptedCmp4,AcceptedCmp5,AcceptedCmp1,AcceptedCmp2,Complain,Z_CostContact,Z_Revenue,Response
0,5524,1957,Graduation,Single,58138.0,0,0,04-09-2012,58,635,88,546,...,10,4,7,0,0,0,0,0,0,3,11,1
1,2174,1954,Graduation,Single,46344.0,1,1,08-03-2014,38,11,1,6,...,1,2,5,0,0,0,0,0,0,3,11,0
2,4141,1965,Graduation,Together,71613.0,0,0,21-08-2013,26,426,49,127,...,2,10,4,0,0,0,0,0,0,3,11,0
3,6182,1984,Graduation,Together,26646.0,1,0,10-02-2014,26,11,4,20,...,0,4,6,0,0,0,0,0,0,3,11,0
4,5324,1981,PhD,Married,58293.0,1,0,19-01-2014,94,173,43,118,...,3,6,5,0,0,0,0,0,0,3,11,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2235,10870,1967,Graduation,Married,61223.0,0,1,13-06-2013,46,709,43,182,...,3,4,5,0,0,0,0,0,0,3,11,0
2236,4001,1946,PhD,Together,64014.0,2,1,10-06-2014,56,406,0,30,...,2,5,7,0,0,0,1,0,0,3,11,0
2237,7270,1981,Graduation,Divorced,56981.0,0,0,25-01-2014,91,908,48,217,...,3,13,6,0,1,0,0,0,0,3,11,0
2238,8235,1956,Master,Together,69245.0,0,1,24-01-2014,8,428,30,214,...,5,10,3,0,0,0,0,0,0,3,11,0


In [None]:
df_1.isna().sum()

ID                      0
Year_Birth              0
Education               0
Marital_Status          0
Income                 24
Kidhome                 0
Teenhome                0
Dt_Customer             0
Recency                 0
MntWines                0
MntFruits               0
MntMeatProducts         0
MntFishProducts         0
MntSweetProducts        0
MntGoldProds            0
NumDealsPurchases       0
NumWebPurchases         0
NumCatalogPurchases     0
NumStorePurchases       0
NumWebVisitsMonth       0
AcceptedCmp3            0
AcceptedCmp4            0
AcceptedCmp5            0
AcceptedCmp1            0
AcceptedCmp2            0
Complain                0
Z_CostContact           0
Z_Revenue               0
Response                0
dtype: int64

In [None]:
df_1.Income.max()

666666.0

In [None]:
df_1.Year_Birth.min()

1893

Conclusões Iniciais:
* Somente a coluna 'Income' possui valores nulos
* Mudar tipos: 'Ano'
* Renomear colunas e linhas para coluna Educação e Relacionamento
* Dropar colunas 
* Eliminar elemento discrepante


###Tratamento

In [None]:
#Renomear as colunas
df_1.rename(columns={'Year_Birth':'ano_nascimento',
                     'Education':'escolaridade',
                     'Marital_Status':'estado_civil',
                     'Income':'renda',
                     'Kidhome':'qtd_crianca',
                     'Teenhome':'qtd_adolescente',
                     'Dt_Customer':'data_filiação',
                     'Recency':'dias_ultima_compra',
                     'MntWines':'gasto_vinho',
                     'MntFruits':'gasto_fruta',
                     'MntMeatProducts':'gasto_carne',
                     'MntFishProducts':'gasto_peixe',
                     'MntSweetProducts':'gasto_doce',
                     'MntGoldProds':'gasto_ouro',
                     'NumDealsPurchases':'compras_promoção',
                     'NumWebPurchases':'compras_website',
                     'NumCatalogPurchases':'compras_catalogo',
                     'NumStorePurchases':'compras_loja',
                     'NumWebVisitsMonth':'visitas_website',
                     'AcceptedCmp1':'campanha_1',
                     'AcceptedCmp2':'campanha_2',
                     'AcceptedCmp3':'campanha_3',
                     'AcceptedCmp4':'campanha_4',
                     'AcceptedCmp5':'campanha_5',
                     'Complain':'reclamacao',
                     'Response':'resposta'},
                     inplace=True)

In [None]:
#Para saber todos os niveis de educação cadastrados 
pd.unique(df_1['escolaridade'])

array(['Graduation', 'PhD', 'Master', 'Basic', '2n Cycle'], dtype=object)

In [None]:
# Tradução dos valores da coluna 'escolaridade'
df_1.replace({'2n Cycle':'pos-graduação',
              'Basic':'medio',
              'Graduation':'superior',
              'Master':'mestrado',
              'PhD':'pos_doutorado'},
              inplace=True)

In [None]:
# Tradução dos valores da coluna 'estado_civil'
df_1.replace({'Absurd':'absurdo',
              'Alone':'sozinho',
              'Divorced':'divorciado',
              'Married':'casado',
              'Single':'solteiro',
              'Together':'em_relacionamento',
              'Widow':'viuvo',
              'YOLO':'yolo'},
              inplace=True)

In [None]:
#dropar colunas 
df_1.drop(['Z_CostContact','Z_Revenue'], axis=1, inplace = True)    

In [None]:
#Modificando tipo da coluna 'ano nascimento' para str e data_filiação
df_1['data_filiação'] = pd.to_datetime(df_1['data_filiação'], dayfirst=True)
df_1['ano_nascimento']=df_1['ano_nascimento'].astype(str)

In [None]:
#Para verificar o Yolo
ftyolo = df_1.estado_civil == 'yolo'
df_1.loc[ftyolo]

Unnamed: 0,ID,ano_nascimento,escolaridade,estado_civil,renda,qtd_crianca,qtd_adolescente,data_filiação,dias_ultima_compra,gasto_vinho,gasto_fruta,gasto_carne,...,compras_promoção,compras_website,compras_catalogo,compras_loja,visitas_website,campanha_3,campanha_4,campanha_5,campanha_1,campanha_2,reclamacao,resposta
2177,492,1973,pos_doutorado,yolo,48432.0,0,1,2012-10-18,3,322,3,50,...,5,7,1,6,8,0,0,0,0,0,0,0
2202,11133,1973,pos_doutorado,yolo,48432.0,0,1,2012-10-18,3,322,3,50,...,5,7,1,6,8,0,0,0,0,0,0,1


In [None]:
#dropar linha pois foi verificado que é a mesma pessoa 
df_1.drop([2202], axis=0, inplace=True)

In [None]:
df_1.to_csv('dados_marketing_geral.csv', index=False)

##4.2.PySpark

In [None]:
esquema = (
    StructType([
        StructField('ID', IntegerType(), False),
        StructField('ano_nascimento', IntegerType(), True),
        StructField('escolaridade', StringType(), True),
        StructField('estado_civil', StringType(), True),
        StructField('renda', FloatType(), True),
        StructField('qtd_crianca', IntegerType(), True),
        StructField('qtd_adolescente', IntegerType(), True),
        StructField('data_filiação', TimestampType(), True),
        StructField('dias_ultima_compra', IntegerType(), True),
        StructField('gasto_vinho', IntegerType(), True),
        StructField('gasto_fruta', IntegerType(), True),
        StructField('gasto_carne', IntegerType(), True),
        StructField('gasto_peixe', IntegerType(), True),
        StructField('gasto_doce', IntegerType(), True),
        StructField('gasto_ouro', IntegerType(), True),
        StructField('compras_promoção', IntegerType(), True),
        StructField('compras_website', IntegerType(), True),
        StructField('compras_catalogo', IntegerType(), True),
        StructField('compras_loja', IntegerType(), True),
        StructField('visitas_website', IntegerType(), True),
        StructField('campanha_3', IntegerType(), True),
        StructField('campanha_4', IntegerType(), True),
        StructField('campanha_5', IntegerType(), True),
        StructField('campanha_1', IntegerType(), True),
        StructField('campanha_2', IntegerType(), True),
        StructField('reclamacao', IntegerType(), True),
        StructField('campanha_ultima', IntegerType(), True),
    ])
)

In [None]:
#criar o df em pyspark estruturado
df_spark = (spark.read.format('csv')
              .option('header','true')
              .option('delimiter',',')
              .option('inferschema','false')
              .load('/content/dados_marketing_geral.csv', schema = esquema)
)

In [None]:
# Backup
df2 = df_spark

###Pre analise PySpark

In [None]:
df_spark.show(2)

In [None]:
df_spark.printSchema()

In [None]:
df_spark.summary().show()

##Tratamento

In [None]:
# Drop linhas duplicadas no dataframe
df_spark = df_spark.dropDuplicates()
df_spark = df_spark.distinct()

In [None]:
# Criando colunas
df_spark = (df_spark.withColumn('idade', 2022 - F.col('ano_nascimento')))

In [None]:
df_spark = (df_spark.withColumn('filhos', F.col('qtd_crianca') + F.col('qtd_adolescente')))

In [None]:
# Eliminando colunas criança e adolescente
df_spark = df_spark.drop(F.col('qtd_adolescente')).drop(F.col('qtd_crianca'))

#5.Load

##5.1.Carregar para Google Cloud

In [None]:
#Enviar para o datalake 
df_spark.write.csv('gs://projeto_cat/TRATADOS', sep=',', mode='append', header=True)

##5.2.Carregar para o Mongo DB

In [None]:
#transformar de Pypark para PANDAS
df_geral = df_spark.toPandas()

In [None]:
colecao_3 = db['marketing_campaign-TRATADO']
df3_dict = df_geral.to_dict('records')
# Inserindo os dados no mongodb
colecao_3.insert_many(df3_dict)

<pymongo.results.InsertManyResult at 0x7ff15a1e7100>

##5.3.Carregar para o MySQL

In [None]:
# dados de para realizar conexão com o servidor
servidor = '35.223.189.162'
nome_do_banco = 'DadosImoveis'
usuario = 'xxxx'
senha = 'xxxx'

# ciar uma SQLAlchemy engine para conectar com o MySQL 
engine = create_engine("mysql+pymysql://{user}:{pw}@{host}/{db}"
				.format(host=servidor, db=nome_do_banco, user=usuario, pw=senha))

# convertendo dataframe em uma table do sql                                   
df_geral.to_sql('dados_marketing2', engine, index=True)