Skip to content

Este projeto se trata de um simples etl com um dataset com as variações dos preços diários do bitcoin no período de 2020-2022. Os códigos do notebook foram desenvolvidos tanto em pyspark quanto em sql, numa simulação de solucão referentes a perguntas de négocio.

Notifications You must be signed in to change notification settings

victorandradebr/ETL_DATABRICKS_BITCOIN

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

2 Commits
 
 
 
 
 
 
 
 

Repository files navigation

Este projeto se trata de um simples etl com um dataset com as variações dos preços diários do bitcoin no período de 2020-2022. Os códigos do notebook foram desenvolvidos tanto em pyspark quanto em sql, numa simulação de solucão referentes a perguntas de négocio:

  • 1 - Qual foi o maior valor do bitcoin no mês de Abril de 2022? E o menor?
  • 2 - Qual a média do valor do fechamento do bitcoin no ano de 2021 em relação aos dados trabalhados?
  • 3 - Qual foi o dia e o mês apenas do ano de 2021 que o bitcoin teve sua maior alta?
  • 4 - Qual foi o dia e o mês apenas do ano de 2022 que o bitcoin teve sua maior baixa?
  • 5 - Qual foi a diferença entre o maior valor de bitcoin do ano de 2022 e o maior ano de 2020?

O dado pode ser encontrado aqui. Quanto a lógica, primeiramente o dado foi convertido da zona landing, onde mantém o seu estado original em csv para a zona processing, em parquet para ganharmos otimização, performance e compressão de volume do dado.

# Lendo o dado na sua forma bruta.

file_location = "/FileStore/tables/Binance_BTCBRL_d.csv"

df = (
    spark
    .read
    .format("csv")
    .option("InferSchema", True)
    .option("header", True)
    .option("quote", "'")
    .option("sep", ",")
    .load(file_location)
)

display(df)

# Escrevendo o dados no formato parquet na segunda camada do datalake.

file_location2 = "/PROCESSING_ZONE/Binance_file"

(
    df
    .write
    .mode("overwrite")
    .format("parquet")
    .save(file_location2)
)    

Após foi feita a leitura no parquet salvo na etapa anterior e uma tratativa de enriquecimento e melhoria na tipagem dos dados, além da criação de uma view para também realizarmos manipulações em SQL.

# Lendo o dado de uma das zonas do datalake.

file_location3 = "/PROCESSING_ZONE/Binance_file"

df1 = (
    spark
    .read
    .format("parquet")
    .load(file_location3)
)

display(df1)

from pyspark.sql.types import DecimalType
from pyspark.sql.functions import to_date, col, year, month, dayofmonth

df1 = (
    df1.withColumnRenamed('date', 'date'.upper())
        .withColumnRenamed('symbol', 'symbol'.upper())
        .withColumnRenamed('open', 'open'.upper())
        .withColumnRenamed('high', 'high'.upper())
        .withColumnRenamed('low', 'low'.upper())
        .withColumnRenamed('close', 'close'.upper()) 
        .withColumnRenamed('Volume BTC', 'Volume_BTC'.upper())
        .withColumnRenamed('Volume BRL', 'Volume_BRL'.upper())
        .withColumnRenamed('tradecount', 'trade_count'.upper())
        .withColumn('DATE', to_date('DATE'))
        .withColumn('OPEN', col('OPEN').cast(DecimalType(10, 2)))
        .withColumn('HIGH', col('HIGH').cast(DecimalType(10, 2)))
        .withColumn('LOW', col('LOW').cast(DecimalType(10, 2)))
        .withColumn('CLOSE', col('CLOSE').cast(DecimalType(10, 2)))
        .withColumn('VOLUME_BRL', col('VOLUME_BRL').cast(DecimalType(10, 2)))
        .withColumn('ANO', year('DATE'))
        .withColumn('MES', month('DATE'))
        .withColumn('DIA', dayofmonth('DATE'))
        .select('DATE', 'SYMBOL', 'OPEN', 'CLOSE', 'LOW', 'HIGH', 'VOLUME_BTC', 'VOLUME_BRL', 'TRADE_COUNT', 'ANO', 'MES', 'DIA')
)

display(df1)


# Criando uma view para trabalhar com PySpark e SQL respondendo algumas perguntas de negócios.

df1.createOrReplaceTempView('df1')

E para finalizar, foram respondidas as questões:

1 - Qual foi o maior valor do bitcoin no mês de Abril de 2022? E o menor?
from pyspark.sql.functions import max, min 

df1_new = ( 
            df1.where("MES == '4'")
             .groupBy("MES")
             .agg(max("HIGH").alias("MAX_VALUE_ABRIL_2022"), min("HIGH").alias("MIN_VALUE_ABRIL_2022"))
)

display(df1_new)

(
    df1_new
    .write
    .mode("overwrite")
    .format("parquet")
    .save("/CURATED/MIN_MAX_ABRIL_2022")
)
%sql 

SELECT 
      MES,
      MAX(HIGH) AS MAX_VALUE_ABRIL_2022,
      MIN(HIGH) AS MIN_VALUE_ABRIL_2022
FROM 
      df1
WHERE 
      MES = 4 
GROUP BY 
      MES
2 - Qual a média do valor do fechamento do bitcoin no ano de 2021 em relação aos dados trabalhados?
from pyspark.sql.functions import avg, col
from pyspark.sql.types import DecimalType

df2_new = ( 
            df1.where("ANO == '2021'")
             .groupBy("ANO")
             .agg(avg("CLOSE"))
             .withColumn("avg(CLOSE)",col("avg(CLOSE)").cast(DecimalType(10, 2)))
             .withColumnRenamed("avg(CLOSE)", "MEAN_CLOSE_2021")
)

display(df2_new)

(
    df2_new
    .write
    .mode("overwrite")
    .format("parquet")
    .save("/CURATED/AVG_CLOSE_2021")
)
%sql 

SELECT 
     ANO,
     CAST(AVG(CLOSE) AS DECIMAL(10, 2)) AS MEAN_CLOSE_2021
FROM 
      df1
WHERE 
      ANO = 2021
GROUP BY 
      ANO  
3 - Qual foi o dia e o mês apenas do ano de 2021 que o bitcoin teve sua maior alta?
from pyspark.sql.functions import max 


df3_new = (
            df1.where("ANO == '2021'")
            .groupBy("MES", "DIA")
            .agg(max("HIGH").alias("MAX_HIGH"))
            .orderBy("MAX_HIGH", ascending = False)
            .limit(1)
            .select("DIA", "MES")

)

display(df3_new)

(
    df3_new
    .write
    .mode("overwrite")
    .format("parquet")
    .save("/CURATED/DIA_MES_MAX_HIGH_2021")
)
%sql 

WITH tmp as (
  SELECT 
        DIA,
        MES,
        MAX(HIGH) as max_tmp
  FROM df1
  WHERE ANO = 2021 
  GROUP BY DIA, MES
  ORDER BY max_tmp DESC
  LIMIT(1)
)

SELECT 
     DIA, MES
FROM tmp
4 - Qual foi o dia e o mês apenas do ano de 2022 que o bitcoin teve sua maior baixa?
from pyspark.sql.functions import min


df4_new = (
            df1.where("ANO == '2022'")
            .groupBy("MES", "DIA")
            .agg(min("LOW").alias("MIN_LOW"))
            .orderBy("MIN_LOW", ascending = False)
            .limit(1)
            .select("DIA", "MES")

)

display(df4_new)

(
    df4_new
    .write
    .mode("overwrite")
    .format("parquet")
    .save("/CURATED/DIA_MES_MIN_LOW_2022")
)
%sql 

WITH tmp as (
  SELECT 
        DIA,
        MES,
        MIN(LOW) as min_tmp
  FROM df1
  WHERE ANO = 2022 
  GROUP BY DIA, MES
  ORDER BY min_tmp DESC
  LIMIT(1)
)

SELECT 
     DIA, MES
FROM tmp
5 - Qual foi a diferença entre o maior valor de bitcoin do ano de 2022 e o maior ano de 2020?
from pyspark.sql.functions import max, min, col

df5_new = (
        df1.where("ANO != '2021'")
        .groupBy("ANO")
        .agg(max("HIGH").alias("MAX_PER_YEAR"))
        .orderBy("ANO")
        .agg(max("MAX_PER_YEAR").alias("MAX_PER_YEAR_2022"), min("MAX_PER_YEAR").alias("MAX_PER_YEAR_2020"))
        .withColumn("DIFF", col("MAX_PER_YEAR_2022") - col("MAX_PER_YEAR_2020"))
        .select("DIFF")

)

display(df5_new)


(
    df5_new
    .write
    .mode("overwrite")
    .format("parquet")
    .save("/CURATED/MAX_HIGH_DIFF_2022_2020")
)
%sql

with ano_2022 as (
    SELECT 
          MAX(HIGH) AS MAX_2022
    FROM 
          df1 
    WHERE 
          ANO = 2022
), 

ano_2020 as (
    SELECT 
          MAX(HIGH) AS MAX_2020
    FROM 
          df1 
    WHERE 
          ANO = 2020

)

SELECT 
    (a.MAX_2022 - b.MAX_2020) as DIFF
FROM 
    ano_2022 a
CROSS JOIN 
    ano_2020 b

Segue abaixo os resultados das manipulações salvos:

print

Espero que gostem, se tiverem dúvidas entrem em contato pelo LinkedIn

About

Este projeto se trata de um simples etl com um dataset com as variações dos preços diários do bitcoin no período de 2020-2022. Os códigos do notebook foram desenvolvidos tanto em pyspark quanto em sql, numa simulação de solucão referentes a perguntas de négocio.

Topics

Resources

Stars

Watchers

Forks