## **Instalando o PySpark**

In [None]:
!pip install pyspark
!pip install gcsfs
!pip install pymongo[srv]

## **Importando as bibliotecas**

In [None]:
from pyspark.sql import SparkSession
from pyspark import SparkConf
import pyspark.sql.functions as F
import pandas as pd
from google.cloud import storage
import os
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, DateType
from pyspark.sql.window import Window
from pyspark.sql.functions import dense_rank
from pyspark.sql.functions import rank
import pymongo
from pymongo import MongoClient

## **Configurando o acesso do Colab ao bucket.**

In [None]:
from  google.colab import drive
drive.mount('/content/drive')

In [None]:
serviceAccount = '/content/drive/[LINK DA CHAVE DO BUCKET]'
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = serviceAccount

In [None]:
client = storage.Client()
bucket = client.get_bucket('projeto-individual-sc')
bucket.blob('marketing_campaign.csv')
path = 'gs://[LINK DO ARQUIVO NO BUCKET]'

## **Configurando o PySpark**

In [None]:
spark = (
    SparkSession.builder
    .master('local')
    .appName('ProjetoIndividual')
    .config('spark.ui.port','4050')
    .config("spark.jars", 'https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop2-latest.jar')
    .getOrCreate()
)

In [None]:
dfs = (spark.read
           .format('csv')
           .option('header', 'true')
           .option('inferschema', 'true')
           .option('delimiter', ',')
           .load(path)
      )

In [None]:
dfs.show(5)

## **Montando o Schema com os StructType**

In [None]:
schema = ( StructType ([
   StructField("ID", IntegerType(), True),
   StructField("Year_Birth", StringType(), True),
   StructField("Education", StringType(), True),
   StructField("Marital_Status", StringType(), True),
   StructField("Income", IntegerType(), True),
   StructField("Kidhome", IntegerType(), True),
   StructField("Teenhome", IntegerType(), True),
   StructField("Dt_Customer", StringType(), True),
   StructField("Recency", IntegerType(), True),
   StructField("MntWines", IntegerType(), True),
   StructField("MntFruits", IntegerType(), True),
   StructField("MntMeatProducts", IntegerType(), True),
   StructField("MntFishProducts", IntegerType(), True),
   StructField("MntSweetProducts", IntegerType(), True),
   StructField("MntGoldProds", IntegerType(), True),
   StructField("NumDealsPurchases",IntegerType(), True),
   StructField("NumWebPurchases", IntegerType(), True),
   StructField("NumCatalogPurchases", IntegerType(), True),
   StructField("NumStorePurchases", IntegerType(), True),
   StructField("NumWebVisitsMonth", IntegerType(), True),
   StructField("AcceptedCmp3", IntegerType(), True),
   StructField("AcceptedCmp4", IntegerType(), True),
   StructField("AcceptedCmp5", IntegerType(), True),
   StructField("AcceptedCmp1", IntegerType(), True),
   StructField("AcceptedCmp2", IntegerType(), True),
   StructField("Complain", IntegerType(), True),
   StructField("Z_CostContact", IntegerType(), True),
   StructField("Z_Revenue", IntegerType(), True),
   StructField("Response", IntegerType(), True),      
   ])
)



## **Criando um backup para o dataframe**

In [None]:
dfs1 = dfs
dfs2 = dfs1
dfs3 = dfs2

## **Traduzindo as colunas**

In [None]:
dfs = dfs.withColumnRenamed('Year_Birth', 'Ano_de _Nascimento').withColumnRenamed('Education', "Escolaridade").withColumnRenamed('Marital_Status', "Estado_Civil").withColumnRenamed('Income', "Rendimento").withColumnRenamed('Kidhome', 'Qtd_de_crianças').withColumnRenamed('Teenhome', 'Qtd_de_adolescentes').withColumnRenamed('Dt_Customer', 'Data_de_inscricao').withColumnRenamed('Recency', "Qtd_dias_ultima_compra").withColumnRenamed('MntWines', 'Despesa_vinho').withColumnRenamed('MntFruits', "Despesa_frutas").withColumnRenamed('MntMeatProducts', 'Despesa_carne').withColumnRenamed('MntFishProducts', "Despesa_peixe").withColumnRenamed('MntSweetProducts', 'Despesa_doces').withColumnRenamed('MntGoldProds', "Despesa_Ouro").withColumnRenamed('NumDealsPurchases', 'Qtd_desconto_compras').withColumnRenamed('NumWebPurchases', "Qtd_compras_web").withColumnRenamed('NumCatalogPurchases', 'Qtd_compras_catalogo').withColumnRenamed('NumStorePurchases', "Qtd_compras_loja").withColumnRenamed('NumWebVisitsMonth', 'Qtd_visita_site_mes').withColumnRenamed('AcceptedCmp1', "Tentativa1").withColumnRenamed('AcceptedCmp2', 'Tentativa2').withColumnRenamed('AcceptedCmp3', "Tentativa3").withColumnRenamed('AcceptedCmp4', 'Tentativa4').withColumnRenamed('AcceptedCmp5', "Tentativa5").withColumnRenamed('Complain', 'Criticas').withColumnRenamed('Z_CostContact', "Custos_de _contato").withColumnRenamed('Response', 'Resposta').withColumnRenamed('Z_Revenue', "Revenda")

In [None]:
#Renomeando a coluna para retirar o "ç" da palavra
dfs = dfs.withColumnRenamed('Qtd_de_crianças', 'Qtd_de_criancas')

## **Traduzindo os valores das colunas [Estado_civil] e [Escolaridade]**

In [None]:
dfs = dfs.replace({'Single':"Solteiro"}, subset=['Estado_Civil'])
dfs = dfs.replace({'Together':"Uniao_estavel"}, subset=['Estado_Civil'])
dfs = dfs.replace({'Married':'Casado'}, subset=['Estado_Civil'])
dfs = dfs.replace({'Divorced':'Divorciado'}, subset=['Estado_Civil'])
dfs = dfs.replace({'Widow':'Viúvo(a)'}, subset=['Estado_Civil'])
dfs = dfs.replace({'Graduation':'Graduação'}, subset=['Escolaridade'])
dfs = dfs.replace({'Master':'Mestrado'}, subset=['Escolaridade'])
dfs = dfs.replace({'Basic':'Fundamental'}, subset=['Escolaridade'])
dfs = dfs.replace({'2n Cycle':'Pós graduação'}, subset=['Escolaridade'])

## **Tratamento e limpezas**

In [None]:
#Trocando os valores de YOLO, Alone e Absurd para "0", pois o Mongo Db apresenta erro quando estamos subindo com valores nulos.
dfs = dfs.replace({'YOLO': "0"}, subset=['Estado_Civil'])
dfs = dfs.replace({'Absurd': "0"}, subset=['Estado_Civil'])
dfs = dfs.replace({'Alone': "0"}, subset=['Estado_Civil'])

In [None]:
#Percorrendo as colunas para identificar se tem valores nulos
for c in dfs.columns:
  print(c, dfs.filter(F.col(c).isNull()).count())

ID 0
Ano_de _Nascimento 0
Escolaridade 0
Estado_Civil 0
Rendimento 24
Qtd_de_criancas 0
Qtd_de_adolescentes 0
Data_de_inscricao 0
Qtd_dias_ultima_compra 0
Despesa_vinho 0
Despesa_frutas 0
Despesa_carne 0
Despesa_peixe 0
Despesa_doces 0
Despesa_Ouro 0
Qtd_desconto_compras 0
Qtd_compras_web 0
Qtd_compras_catalogo 0
Qtd_compras_loja 0
Qtd_visita_site_mes 0
Tentativa3 0
Tentativa4 0
Tentativa5 0
Tentativa1 0
Tentativa2 0
Criticas 0
Custos_de _contato 0
Revenda 0
Resposta 0


In [None]:
#Verificando a contagem geral dos valores no dataframe
dfs.count()

2240

In [None]:
#Verificando a contagem geral dos valores no dataframe sem os valores nulos
dfs.dropna().count()

2216

In [None]:
#Efetuando a limpeza dos valores nulos
dfs = dfs.dropna()

In [None]:
#Verificando a contagem geral dos valores para confirmar a operação
dfs.dropna().count()

2216

***Dropando uma coluna***

In [None]:
dfs = dfs.drop('Revenda')

In [None]:
dfs.show(3)

***Realizando a mudança de nome das colunas***

Alteramos o nome das seguintes colunas:

[Ano_de _nascimento] para [Ano_nascimento] 
Foi retirado o " _de", encurtando o nome da coluna.

[Custos_de _contato] para [Custos_de_contato]
Foi retirado o " _de", encurtando o nome da coluna.

In [None]:
dfs = dfs.withColumnRenamed("Ano_de _Nascimento", "Ano_Nascimento").withColumnRenamed("Custos_de _contato", "Custos_contato")

In [None]:
dfs.show(2)

*Criando novas colunas com Funções de Agrupamento, Agregação ou Joins*

In [None]:
#Criando uma nova coluna chamada 'SOMA_TENTATIVAS", com a soma das colunas: Tentativa1, Tentativa2, Tentativa3, Tentativa4, Tentativa5
dfs = dfs.withColumn("soma_Tentativas", F.col("Tentativa3") + F.col("Tentativa2") + F.col("Tentativa1") + F.col("Tentativa4") + F.col("Tentativa5"))
#Criando uma nova coluna chamada 'SOMA_DESPESAS_COMIDAS', com a soma das colunas: Despesa_vinho, Despesa_frutas, Despesa_carne, Despesa_peixe, Despesa_doces, Despesa_Ouro
dfs = dfs.withColumn("soma_Despesas_Comidas", F.col("Despesa_Ouro") + F.col("Despesa_doces") + F.col("Despesa_peixe") + F.col("Despesa_carne") + F.col("Despesa_frutas") + F.col("Despesa_vinho"))

In [None]:
dfs.show(2)

*Filtros, ordenação e agrupamento*

In [None]:
#EXIBIR A REGIÃO, ESTADO E ÓBITOS ACUMULADOS DA REGIÃO SUDESTE
dfs.select(F.col('Escolaridade'), F.col("Estado_Civil"), F.col("Rendimento"))\
.show()

*Window Functions*

In [None]:
win0 = Window.partitionBy('Escolaridade').orderBy(F.desc('Rendimento'))
dfs30 = dfs.withColumn('Rendimento_Fx_Escolaridade', F.rank().over(win0))
dfs30.show(10)

In [None]:
dfs31 = dfs30.withColumn("dense_rank",dense_rank().over(win0)).show()

# **SparkSQL**

In [None]:
#Analisando se as maiores escolaridades estão relacionadas a quem tem maiores rendimentos em relação ao estado civil

dfs.createOrReplaceTempView('Spark_SQL')
dfs21 = spark.sql('SELECT Escolaridade, Estado_Civil, Rendimento from Spark_SQL order by Estado_Civil')
dfs21.show(100)


In [None]:
#Analisando o gasto de doces em relação a quantidade de criança
dfs.createOrReplaceTempView('Spark_SQL')
dfs22 = spark.sql('SELECT Estado_Civil, Qtd_de_criancas, Despesa_doces from Spark_SQL order by Despesa_doces')
dfs22.show(100)

In [None]:
#Analisando o gasto de carnes em relação a quantidade de adolescentes
dfs.createOrReplaceTempView('Spark_SQL')
dfs23 = spark.sql('SELECT Estado_Civil, Qtd_de_adolescentes, Despesa_carne from Spark_SQL order by Despesa_carne')
dfs23.show(100)

In [None]:
#Analisando se as maiores escolaridades estão relacionadas a quem tem maiores rendimentos em relação ao estado civil

dfs.createOrReplaceTempView('Spark_SQL')
dfs24 = spark.sql('SELECT Escolaridade, Qtd_compras_web, Qtd_compras_catalogo, Qtd_compras_loja from Spark_SQL order by Escolaridade')
dfs24.show(100)

In [None]:
#Analisando qual o estado civil que consome mais vinho 
dfs.createOrReplaceTempView('Spark_SQL')
dfs25 = spark.sql('SELECT Estado_Civil, Despesa_vinho from Spark_SQL order by Estado_Civil')
dfs25.show(100)

## **Convertendo o dataframe tratado em PySpark para o Pandas, para subirmos ele para o Mongodb**

In [None]:
psdf = dfs.toPandas()
print(psdf)


In [None]:
psdf.head()

In [None]:
client = MongoClient('[LINK DE CONEXÃO COM O BANCO DE DADOS MONGODB]')

In [None]:
db = client['Projteste']
collection = db['ProjtestePspark']

In [None]:
df_dict = psdf.to_dict("records")
collection.insert_many(df_dict)

<pymongo.results.InsertManyResult at 0x7f88d9cf7370>

## **Subindo o arquivo para a bucket**

In [None]:
dfs.write.format('csv').save('[CAMINHO DA PASTA PARA SALVAR O ARQUIVO]')