<a href="https://colab.research.google.com/github/pablohenrique93/projeto-treino-etl-pyspark-phone-prices/blob/main/spark_etl_project_phones_prices.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Instalação das Bibliotecas


In [None]:
pip install pyspark

In [None]:
pip install pymongo

# Importação das Biliotecas

In [None]:
# Importando pandas
import pandas as pd

# Importando pyspark
import pyspark

# Importando a SparkSession
from pyspark.sql import SparkSession

from pyspark.sql import Row
from pyspark.sql.types import * # O * significa que será utilizado todo tipo de dado (int, float e etc)
import pyspark.sql.functions as F
from pyspark.sql.functions import col

# Importando pymongo
import pymongo
from pymongo import MongoClient

# Configurando e Verificando a Spark Session

In [None]:
# Configurando a variável do ambiente da sessão spark
spark = (
    SparkSession.builder
                .master('local')
                .appName('Projeto ETL phone prices')
                .config('spark.ui.port','4050')
                .getOrCreate()
)

In [None]:
spark

# Importação e criação do Dataframe

In [None]:
# Criando Dataframe
df = (
    spark.createDataFrame(pd.read_csv('/content/cleaned_all_phones.csv'))

)

In [None]:
# Visualizando o Dataframe
df.show(truncate = True)

# Pré Visualização

In [None]:
# Retornando as 5 primeiras linhas do DataFrame

df.head(5)

In [None]:
# Exibindo o esquema do DataFrame, mostrando os nomes das colunas e seus tipos de dados.
df.printSchema()

In [None]:
# Exibindo estatísticas descritivas das colunas numéricas do DataFrame, como média, desvio padrão, mínimo, máximo e etc
df.describe().show()

In [None]:
# Exibindo as colunas do Dataframe
df.columns

In [None]:
# Exibindo o número total de linhas
df.count()

# Definição do Schema

In [None]:
schema = StructType([
                StructField('phone_name', StringType()),
                StructField('brand', StringType()),
                StructField('os', StringType()),
                StructField('inches', FloatType()),
                StructField('resolution', StringType()),
                StructField('battery', IntegerType()),
                StructField('battery_type', StringType()),
                StructField('ram(GB)', IntegerType()),
                StructField('announcement_date', DateType()),
                StructField('weight(g)', FloatType()),
                StructField('storage(GB)', IntegerType()),
                StructField('video_720p', BooleanType()),
                StructField('video_1080p', BooleanType()),
                StructField('video_4K', BooleanType()),
                StructField('video_8K', BooleanType()),
                StructField('video_30fps', BooleanType()),
                StructField('video_60fps', BooleanType()),
                StructField('video_120fps', IntegerType()),
                StructField('video_240fps', BooleanType()),
                StructField('video_480fps', BooleanType()),
                StructField('video_960fps', BooleanType()),
                StructField('price(USD)', FloatType())

])

# Variável para receber o caminho do Dataframe(localmente) inferindo o schema
df2 = spark.read.csv("/content/cleaned_all_phones.csv", header=True, schema=schema)

In [None]:
# Conferindo a tipagem das colunas após inferência do Schema
df2.dtypes

In [None]:
df2.show()

# Manipulação do Dataframe

In [None]:
# Renomeação e tradução das colunas
df3 = df2.toDF(*['nome_celular', 'marca', 'so', 'tamanho_tela_polegada', 'resolucao', 'bateria', 'tipo_bateria',
                 'ram(GB)', 'data_anuncio', 'peso(gramas)', 'armazenamento(GB)', 'video_720p', 'video_1080p','video_4K',
                 'video_8K', 'video_30fps', 'video_60fps', 'video_120fps', 'video_240fps', 'video_480fps', 'video_960fps', 'preco(USD)'])

In [None]:
# Conferindo dataframe após tradução e renomeação de colunas
df3.show()

In [None]:
# Verificando se o dataframe possui linhas duplicadas


# Número de linhas antes de remover duplicatas
total_linhas_antes = df3.count()

# Removendo as linhas duplicadas
df_sem_duplicatas = df3.dropDuplicates()

# Número de linhas após remover duplicatas
total_linhas_depois = df_sem_duplicatas.count()

# Comparando o número de linhas antes e depois para verificar se há duplicatas
if total_linhas_antes > total_linhas_depois:
    print("Existem linhas duplicadas no DataFrame.")
else:
    print("Não há linhas duplicadas no DataFrame.")



In [None]:
# Verificando se há valores nulos em cada coluna e contando-os
contagem_nulos_por_coluna = []
for coluna in df3.columns:
    contagem_nulos = df3.filter(col(coluna).isNull()).count()
    contagem_nulos_por_coluna.append((coluna, contagem_nulos))

# Exibindo a contagem de valores nulos em cada coluna
for coluna, contagem in contagem_nulos_por_coluna:
    print(f"A coluna '{coluna}' tem {contagem} valores nulos.")

In [None]:
# Como verificado, foram encontrados 1512 valores nulos na coluna 'video_120fps'
# Desse modo, substituiremos esses valores pelo termo "Desconhecido"
df4 = df3.fillna({'video_120fps':'Desconhecido'})

# Realização de Consultas

In [None]:
# Contagem de quantos itens diferentes aparecem na coluna 'preco(USD)'
df4.select('preco(USD)').count()

In [None]:
# Filtro para identificar os celulares cujos preços são maiores que 420.0 ordenados de forma decrescente

df4.select('nome_celular', 'preco(USD)').filter(df3['preco(USD)'] > 420.0).orderBy('preco(USD)', ascending=False).show()

In [None]:
# Exibindo os celulares postados mais recentes, ordenados pela coluna "data_anuncio"
df4.select("nome_celular", "data_anuncio").orderBy(col("data_anuncio").desc()).show()

In [None]:
# Exibindo os celulares e suas características ordenados pelos seus respectivos preços
df4.select(
    "nome_celular", "marca", "so", "resolucao", "bateria",
    "ram(GB)", "armazenamento(GB)", "preco(USD)"
).orderBy(col("preco(USD)").desc()).show()


In [None]:
# Exibindo as colunas 'nome_celular', 'marca', 'so', 'video_8K' e 'preco(USD)' somente dos aparelhos que filmam em 8k

df4.select('nome_celular', 'marca', 'so', 'video_8K','preco(USD)').filter(df4['video_8K'] == True).show()

In [None]:
# Exibindo os celulares que possuem armazenamento maior que 64 GB

df4.select('nome_celular', 'marca', 'so','armazenamento(GB)').filter(df4['armazenamento(GB)'] > 64).show()

In [None]:
# Exibindo somente os celulares que possuem a memória RAM maior do que 6
df4.select('nome_celular', 'marca', 'so','ram(GB)').filter(df4['ram(GB)'] > 6).show()

# Consultas em SQL

In [None]:
# Registrando uma tabela temporária
df4.registerTempTable('phone_prices')

#Declaração de uma variável de saída para receber o comando do sql
output = spark.sql('SELECT * FROM phone_prices')
output.show()

In [None]:
# Consultando colunas específicas
output = spark.sql('SELECT nome_celular, marca FROM phone_prices')
output.show()

In [None]:
# Consulta de colunas específicas, declarando um parâmetro, neste caso onde dados da coluna "preco(USD)" passam de 350.0
output = spark.sql('SELECT nome_celular, marca, so, `ram(GB)`, `armazenamento(GB)`, `preco(USD)` FROM phone_prices WHERE `preco(USD)` > 350')
output.show()

In [None]:
# Consulta ordenando registros pelo nome_celular
output = spark.sql("SELECT * FROM phone_prices ORDER BY nome_celular")
output.show()

# Convertendo e salvando dataframe localmente

In [None]:
# Convertendo o df de spark para pandas

df4_pandas = df4.toPandas()

In [None]:
# Salvando df4 localmente
df4_pandas.to_csv('phone_prices_treated.csv', index=False)

# Envio para o MongoDB

In [None]:
# Criando conexão com o MongoDB

uri = "mongo_uri"
client = MongoClient(uri,tls=True,tlsCertificateKeyFile='mongo_key')

In [None]:
# Criando coleção para enviar para o MongoDB

db = client['phone_prices']
colecaotreated = db['phone_prices_treated']

In [None]:
# Enviando o Dataset tratado para o MongoDB

df4_pandas.reset_index(drop=True)
df01 = df4_pandas.to_dict("records")
colecaotreated.insert_many(df01)

In [None]:
# Verificando se o arquivo foi enviado com sucesso
colecaotreated.count_documents({})
# Como podemos conferir o dataset foi enviado corretamente para o MongoDB

# FIM DO PROCESSO