<a href="https://colab.research.google.com/github/pablohenrique93/projeto-treino-etl-autos-ebay/blob/main/autos_ebay_kleinanzeigan_etl.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Instalação das Bibliotecas

In [None]:
pip install pyspark

In [None]:
pip install pandera

# Importação das Bibliotecas

In [3]:
#Importação das Bibliotecas 
from pyspark.sql import SparkSession
import pyspark.sql.functions
from pyspark.sql.functions import *
import os
import pyspark.sql.functions as F
import pandas as pd
import numpy as np
from pyspark.sql.window import Window
from pyspark.sql.types import *
import pymongo
from pymongo import MongoClient

# Leitura da Base de Dados

In [4]:
df = pd.read_csv('/content/autos.csv')

In [None]:
# Verificação se o Dataframe foi devidamente carregado
df.head()

# Backup do Dataframe

In [6]:
# Faz-se necessário fazer um backup do Dataframe antes de qualquer manipulação de dados, em caso de um erro 
# nos processos posteriores os dados estão assegurados.

backup_df = df.copy

# Pré Análise do Dataframe

In [None]:
df.head(5)

In [None]:
df.tail(5)

In [None]:
df.dtypes

In [8]:
df.shape

(371528, 21)

# Tratamento de Dados

In [None]:
#Conferência de coluna por coluna com o objetivo de encontrar possíveis inconsistências
pd.unique(df['dateCrawled'])

In [None]:
pd.unique(df['name'])

In [None]:
pd.unique(df['seller'])

In [None]:
pd.unique(df['offerType'])

In [None]:
pd.unique(df['price'])

In [None]:
pd.unique(df['abtest'])

In [None]:
pd.unique(df['vehicleType'])

In [None]:
pd.unique(df['yearOfRegistration'])

In [None]:
pd.unique(df['gearbox'])

In [None]:
pd.unique(df['powerPS'])

In [None]:
pd.unique(df['model'])

In [None]:
df.dtypes

In [None]:
pd.unique(df['kilometer'])

In [None]:
pd.unique(df['monthOfRegistration'])

In [None]:
pd.unique(df['fuelType'])

In [None]:
pd.unique(df['brand'])

In [None]:
pd.unique(df['notRepairedDamage'])

In [None]:
pd.unique(df['dateCreated'])

In [None]:
pd.unique(df['nrOfPictures'])

In [None]:
pd.unique(df['postalCode'])

In [None]:
pd.unique(df['lastSeen'])

# Limpeza de Inconsistências e Padronização de Colunas

In [31]:
# Dropagem da coluna 'index' pois o dataframe ja possui uma númeração com a mesma característica
# Dropagem da coluna 'nrOfPictures' pois a mesma se encontra zerada

df.drop(['index'],axis=1,inplace=True)
df.drop(['nrOfPictures'],axis=1,inplace=True)

In [32]:
# Renomeação e tradução de colunas, utilizando a fonte dos dados como parâmetro de tradução

df.rename(columns={
    'dateCrawled':'data_anuncio',
    'name':'nome_carro',
    'seller':'vendedor',
    'offerType':'tipo_de_oferta',
    'price':'preco',
    'abtest':'teste_ab',
    'vehicleType':'tipo_de_veiculo',
    'yearOfRegistration':'ano_de_registro',
    'gearbox':'cambio',
    'powerPS':'potencia_cavalos',
    'model':'modelo',
    'kilometer':'quilometragem',
    'monthOfRegistration':'mes_de_registro',
    'fuelType':'tipo_de_combustivel',
    'brand':'marca',
    'notRepairedDamage':'danos_nao_reparados',
    'dateCreated':'data_lista_ebay',
    'postalCode':'codigo_postal_loc',
    'lastSeen':'data_ultima_visualizacao'
},inplace=True)

In [33]:
# Renomeando e traduzindo dados dentro das colunas com o objetivo de facilitar o entendimento das mesmas

df.replace({'vendedor':'privat'}, 'privado',regex=True, inplace=True)  
df.replace({'vendedor':'gewerblich'}, 'comercial', regex=True, inplace=True)

In [34]:
df.replace({'tipo_de_oferta':'Angebot'}, 'oferta', regex=True, inplace=True)
df.replace({'tipo_de_oferta':'Gesuch'}, 'procura', regex=True, inplace=True)

In [35]:
df.replace({'teste_ab':'control'}, 'controle',regex=True, inplace=True)
df.replace({'teste_ab':'test'}, 'teste',regex=True, inplace=True)

In [36]:
df.replace({'tipo_de_veiculo':'kleinwagen'}, 'carro_pequeno',regex=True, inplace=True)
df.replace({'tipo_de_veiculo':'cabrio'}, 'conversivel',regex=True, inplace=True)
df.replace({'tipo_de_veiculo':'bus'}, 'onibus',regex=True, inplace=True)
df.replace({'tipo_de_veiculo':'andere'}, 'outros',regex=True, inplace=True)

In [37]:
df.replace({'cambio':'manuell'}, 'manual',regex=True, inplace=True)
df.replace({'cambio':'automatik'}, 'automatico',regex=True, inplace=True)

In [38]:
df.replace({'modelo':'andere'}, 'outros',regex=True, inplace=True)

In [39]:
df.replace({'tipo_de_combustivel':'benzin'}, 'gasolina',regex=True, inplace=True)
df.replace({'tipo_de_combustivel':'lpg'}, 'gas',regex=True, inplace=True)
df.replace({'tipo_de_combustivel':'andere'}, 'outros',regex=True, inplace=True)
df.replace({'tipo_de_combustivel':'hybrid'}, 'hibrido',regex=True, inplace=True)
df.replace({'tipo_de_combustivel':'cng'}, 'gas_verde',regex=True, inplace=True)
df.replace({'tipo_de_combustivel':'elektro'}, 'eletrico',regex=True, inplace=True)

In [40]:
df.replace({'danos_nao_reparados':'ja'}, 'sim',regex=True, inplace=True)
df.replace({'danos_nao_reparados':'nein'}, 'nao',regex=True, inplace=True)

In [None]:
#Verificando a quantidade de valores nulos
df.isna().sum()

#Insights, filtros e Groupby

In [None]:
#Groupby para verificação dos veículos que se encontram com danos não reparados com seu respectivo plot
df.groupby(['danos_nao_reparados']).size().sort_values(ascending=False)

In [None]:
df.groupby(['danos_nao_reparados']).size().head(10).sort_values(ascending=False).plot.pie(xlabel='danos_nao_reparados', ylabel = 'quantidade', rot=90)

In [None]:
#Groupby para verificação da quantidade de cada tipo de veículo assim como sua respectiva plotagem
df.groupby(['tipo_de_veiculo']).size().sort_values(ascending=False)

In [None]:
df.groupby(['tipo_de_veiculo']).size().head(10).sort_values(ascending=False).plot.bar(xlabel='tipo_de_veiculo', ylabel = 'quantidade', rot=90)

In [None]:
# Groupby para verificação da quantidade dos tipos de vendedores assim como sua respectiva plotagem
df.groupby(['vendedor']).size().sort_values(ascending=False)

In [None]:
df.groupby(['vendedor']).size().head(10).sort_values(ascending=False).plot.bar(xlabel='vendedor', ylabel = 'quantidade', rot=90)

In [None]:
#Groupby para verificação da quantidade de listagens inclusas em teste a/b e sua respectiva plotagem
df.groupby(['teste_ab']).size().sort_values(ascending=False)

In [None]:
df.groupby(['teste_ab']).size().head(10).sort_values(ascending=False).plot.pie(xlabel='teste_ab', ylabel = 'quantidade', rot=90)

In [None]:
#Groupby para verificação da quilometragem máxima dos modelos da lista assim como sua respectiva plotagem
df.groupby(['quilometragem']).size().sort_values(ascending=False)

In [None]:
df.groupby(['quilometragem']).size().head(10).sort_values(ascending=False).plot.bar(xlabel='quilometragem', ylabel = 'quantidade', rot=90)

#SPARK (Utilização da biblioteca PySpark para dar continuidade a manipulação dos dados)

In [53]:
#Configuração de ambiente (sessão) Spark / variável de sessão que permite o uso do Spark
spark = (
    SparkSession.builder
                .master('local')
                .appName('autos')
                .getOrCreate()
)

In [None]:
#Verificando a variavel spark
spark

In [55]:
#Montagem do esquema de colunas contendo a tipagem de cada coluna via StructType
esquema = (
    StructType([
        StructField('data_anuncio', StringType()),
        StructField('nome_carro', StringType()),
        StructField('vendedor', StringType()),
        StructField('tipo_de_oferta', StringType()),
        StructField('preco', IntegerType()),
        StructField('teste_ab', StringType()),
        StructField('tipo_de_veiculo', StringType()),
        StructField('ano_de_registro', IntegerType()),
        StructField('cambio', StringType()),
        StructField('potencia_cavalos', IntegerType()),
        StructField('modelo', StringType()),
        StructField('quilometragem', IntegerType()),
        StructField('mes_de_registro', IntegerType()),
        StructField('tipo_de_combustivel', StringType()),
        StructField('marca', StringType()),
        StructField('danos_nao_reparados', StringType()),
        StructField('data_lista_ebay', StringType()),
        StructField('codigo_postal_loc', IntegerType()),
        StructField('data_ultima_visualizacao', StringType())
        
    ])
)

In [None]:
# Conversão do df, Pandas Para PySpark inferindo o schema
df_spark = spark.createDataFrame(df, schema=esquema)
df_spark.printSchema()
df_spark.show()

In [57]:
#Antes de iniciar a manipulação de dados faz-se necessário realizar um backup, para em caso de um possível erro, os dados estarem assegurados
df_spark_bckup = df_spark

#Pré Análise

In [None]:
#Verificação das colunas bem como seus tipos
df_spark.printSchema()

In [None]:
#Verificação do Dataset
df_spark.show(truncate=False)

In [None]:
# Verificação dos dados presentes em cada coluna
df_spark.summary().show()

In [None]:
#Contagem de linhas distintas
df_spark_distinct = df_spark.distinct()
print('Distinct ns.: '+str(df_spark_distinct.count()))

#Tratamento de Dados

In [None]:
#Contagem de linhas totais do dataset
df_spark.count()

In [None]:
#Dropagem de linhas duplicadas 
df1 = df_spark.dropDuplicates()
df1.count()

In [None]:
#Verificando todas as colunas do data frame
df1.select('mes_de_registro').distinct().show(100,truncate = False)

In [65]:
#Alterando valores na coluna 'mes_de_registro' com o objetivo de padroniza-la visando uma possível 
#concatenação com a coluna 'ano_de_registro'

df2 = ( df1.withColumn('mes_de_registro', F.when(F.col('mes_de_registro') == '0', F.regexp_replace(F.col('mes_de_registro'), '0', '-00'))
                                .when(F.col('mes_de_registro') == '5', F.regexp_replace(F.col('mes_de_registro'), '5', '-05'))
                                .when(F.col('mes_de_registro') == '8', F.regexp_replace(F.col('mes_de_registro'), '8', '-08'))
                                .when(F.col('mes_de_registro') == '6', F.regexp_replace(F.col('mes_de_registro'), '6', '-06'))
                                .when(F.col('mes_de_registro') == '7', F.regexp_replace(F.col('mes_de_registro'), '7', '-07'))
                                .when(F.col('mes_de_registro') == '2', F.regexp_replace(F.col('mes_de_registro'), '2', '-02'))
                                .when(F.col('mes_de_registro') == '3', F.regexp_replace(F.col('mes_de_registro'), '3', '-03'))
                                .when(F.col('mes_de_registro') == '1', F.regexp_replace(F.col('mes_de_registro'), '1', '-01'))
                                .when(F.col('mes_de_registro') == '9', F.regexp_replace(F.col('mes_de_registro'), '9', '-09'))
                                .when(F.col('mes_de_registro') == '10', F.regexp_replace(F.col('mes_de_registro'), '10', '-10'))
                                .when(F.col('mes_de_registro') == '11', F.regexp_replace(F.col('mes_de_registro'), '11', '-11'))
                                .when(F.col('mes_de_registro') == '12', F.regexp_replace(F.col('mes_de_registro'), '12', '-12'))
                                .when(F.col('mes_de_registro') == '4', F.regexp_replace(F.col('mes_de_registro'), '4', '-04'))
                                   
                       )
)

In [None]:
#Verificação da coluna após alteração
df2.select('mes_de_registro').distinct().show(100,truncate = False)

In [67]:
#Concatenando as colunas 'ano_de_registro' e 'mes_de_regsitro' resultando numa nova coluna chamada 'ano_mes_registro'
df3 = df2.withColumn("ano_mes_registro", concat(df2["ano_de_registro"], df2["mes_de_registro"]))

In [None]:
#Verificando a concatenação da coluna
df3.select("ano_mes_registro").distinct().show(100,truncate = False)

In [69]:
#Realizando o drop das colunas 'ano_de_registro' e 'mes_de_registro' pois foram concatenadas
df4 = df3.drop('ano_de_registro','mes_de_registro')

In [70]:
#Criando uma nova coluna excluindo os dados de horários pois os mesmos se apresentavam zerados
df5 = df4.withColumn("data_ebay_lista", date_format("data_lista_ebay", "yyyy-MM-dd"))

In [71]:
#Drop da coluna de origem pois já foi modificada com a criação de uma nova coluna
df6 = df5.drop("data_lista_ebay")

In [72]:
#Ordenação de Colunas do Dataframe com o objetivo de facilitar o entendimento dos dados
df7 = df6.select(col("data_anuncio"), col("nome_carro"), col("ano_mes_registro"), col("modelo"), col("cambio"), col("marca"), 
                 col("tipo_de_veiculo"), col("potencia_cavalos"), col("quilometragem"), col("tipo_de_combustivel"), col("danos_nao_reparados"), 
                 col("preco"), col("codigo_postal_loc"), col("vendedor"), col("tipo_de_oferta"), col("teste_ab"), col("data_ebay_lista"), col("data_ultima_visualizacao"))

In [73]:
#Conversão da coluna "data_ebay_lista" para uma nova coluna chamada data_lista_ebay, já no formato date
df8 = df7.withColumn("data_lista_ebay", to_date("data_ebay_lista", "yyyy-MM-dd"))

In [74]:
#Drop da coluna "data_ebay_lista" pois ja foi modificada com a criação da coluna "data_lista_ebay" no formato date
df9 = df8.drop("data_ebay_lista")

In [75]:
#Criação de Colunas no formato timestamp utilizando as colunas de origem
df10 = df9.withColumn("anuncio_data", to_timestamp("data_anuncio", "yyyy-MM-dd HH:mm:ss"))
df11 = df10.withColumn("data_ultima_visu", to_timestamp("data_ultima_visualizacao", "yyyy-MM-dd HH:mm:ss"))

In [77]:
#Drop das colunas de origem pois as mesmas ja foram modificadas
df12 = df11.drop("data_anuncio","data_ultima_visualizacao")

In [78]:
#Reordenação de Colunas novamente pois o dataframe sofreu modificações adicionais
df13 = df12.select(col("anuncio_data"), col("nome_carro"), col("ano_mes_registro"), col("modelo"), col("cambio"), col("marca"), 
                 col("tipo_de_veiculo"), col("potencia_cavalos"), col("quilometragem"), col("tipo_de_combustivel"), col("danos_nao_reparados"), 
                 col("preco"), col("codigo_postal_loc"), col("vendedor"), col("tipo_de_oferta"), col("teste_ab"), col("data_lista_ebay"), col("data_ultima_visu"))

#Possíveis Análises (Groupby e Filtros)

In [None]:
#Visualização da quantidade de registros de cada carro
df13.groupBy('nome_carro').count().orderBy(F.col('count').desc()).show(20)

In [None]:
#Verificação da quantidade de carros automáticos e manuais 
df13.groupBy('cambio').count().orderBy(F.col('count').desc()).show(100)

In [None]:
#Verificação da quantidade dos tipos de veículos
df13.groupBy('tipo_de_veiculo').count().orderBy(F.col('count').desc()).show(100)

In [None]:
#Verificação da quantidade de anúncio por data
df13.groupBy('anuncio_data').count().orderBy(F.col('count').desc()).show()

In [None]:
#Verificação dos carros mais caros da lista, organizando do maior para o menor, com retorno do nome do carro, modelo e preço
df13.select(F.col('nome_carro'), F.col('modelo'), F.col('preco')).orderBy(F.col('preco').desc()).show()

In [None]:
#Realização de um groupby para verificar os valores mínimos e máximos dos preços dos carros
df13.groupBy('nome_carro').agg(
    F.round(F.sum('preco'),2).alias('preco'), 
    F.min('preco').alias('valor_min'),
    F.max('preco').alias('valor_max')
    ).show()

In [None]:
#Filtro para exibição de carros com potência maior ou igual a 150 cavalos
df13.select(F.col('nome_carro'), F.col('potencia_cavalos')).filter(F.col('potencia_cavalos') >= 150).show()

#SparkSQL

In [None]:
#Criação de um banco de dados
spark.sql('create database autos').show()

In [None]:
#Exibindo o banco de dados
spark.sql('show databases').show()

In [None]:
#Selecionando o banco de dados para trabalhar utilizando o mesmo
spark.sql('use autos')

In [89]:
#Criando e salvando o Df em tabela
df13.write.saveAsTable('autos')

In [None]:
spark.sql("SELECT * FROM autos").show()

## Queries executadas utilizando SQL

In [None]:
#Seleção de nome, modelo, marca e cambio dos carros filtrando por cambio manual
spark.sql("SELECT nome_carro, modelo, marca, cambio FROM autos WHERE cambio = 'manual'").show()

In [None]:
#Query para retorno do nome, modelo, potência em cavalos e tipo do veículo de todas as limousines 
spark.sql('SELECT nome_carro, modelo, potencia_cavalos, tipo_de_veiculo FROM autos WHERE tipo_de_veiculo = "limousine";').show(truncate=False)

In [None]:
#Query para retorno do nome do carro e marca ordenados pela marca do veículo de forma ascendente
spark.sql('SELECT nome_carro, marca FROM autos ORDER BY marca ASC;').show(truncate=False)

In [None]:
#Query para retornar os carros com a potência entre 150 e 300 cavalos
spark.sql("SELECT nome_carro, potencia_cavalos FROM autos WHERE potencia_cavalos BETWEEN 150 AND 300;").show(truncate=False)

In [None]:
#Query para retorno dos carros mais caros da tabela de forma ordenada
spark.sql("SELECT nome_carro, preco FROM autos ORDER BY preco").show(truncate=False)

In [None]:
#Query executada para retorno de todos os carros que tiveram seus anúncios postados entre '2016-03-08 12:53:50' e '2016-03-05 14:41:46'
spark.sql("SELECT * FROM autos WHERE anuncio_data BETWEEN '2016-03-08 12:53:50' AND '2016-03-05 14:41:46'").show(truncate=False)

In [None]:
#Query executada com o objetivo de retorno dos carros que tiverem entre seus nomes a palavra "Benz"
spark.sql("SELECT *  FROM autos WHERE nome_carro LIKE '%Benz%'").show(truncate=False)

In [None]:
#Query para retorno de anúncios feitos somente por vendedores comerciais
spark.sql("SELECT * FROM autos WHERE vendedor = 'comercial'").show(truncate=False)

In [None]:
#Query executada para retorno da média de preços dos carros da tabela
spark.sql("SELECT AVG(preco) AS media_preco_carros FROM autos;").show(truncate=False)

In [None]:
#Query executada para retorno do preço máximo dos carros da tabela
spark.sql("SELECT MAX(preco) AS preco_maximo_carros FROM autos;").show(truncate=False)

# Conversão para Pandas para envio para o MondoDB

In [103]:
df_final = df13.toPandas()

In [108]:
#Salvando tambem em formato Csv com objetivo de documentá-lo
df_final.to_csv('autos_treated.csv', index=False)

# Envio para o MongoDB

In [101]:
#Conectando com o MongoDB

uri = "uri_mongo"
client = MongoClient(uri,tls=True,tlsCertificateKeyFile='mongo_key')

In [107]:
#Criando coleções para enviar para o mongoDB
db = client['treated_base']
colecaoautos = db['autos_treated']

In [None]:
#Envio
data_dict = df_final.to_dict('records')
#'insert many' para inserir vários
colecaoautos.insert_many(data_dict)
print('Data Frame importado com sucesso!')

In [None]:
#Verificando se o arquivo foi enviado
colecaoautos.count_documents({})

#FINAL DO PROCESSO