# Notebook Trusted API Coingecko

In [1]:
%idle_timeout 10
%glue_version 5.0
%worker_type G.1X
%number_of_workers 2

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from datetime import datetime
from pyspark.sql.functions import col, lit
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.8 
Current idle_timeout is None minutes.
idle_timeout has been set to 10 minutes.
Setting Glue version to: 5.0
Previous worker type: None
Setting new worker type to: G.1X
Previous number of workers: None
Setting new number of workers to: 2
Trying to create a Glue session for the kernel.
Session Type: glueetl
Worker Type: G.1X
Number of Workers: 2
Idle Timeout: 10
Session ID: 7d8358e9-2a01-4a51-ad90-9efafabf7c96
Applying the following default arguments:
--glue_kernel_version 1.0.8
--enable-glue-datacatalog true
Waiting for session 7d8358e9-2a01-4a51-ad90-9efafabf7c96 to get into ready status...
Session 7d8358e9-2a01-4a51-ad90-9efafabf7c96 has be

In [2]:
# Data atual UTC
now = datetime.utcnow()

# Caminho de leitura raw
raw_path = f"s3://arquitetura-software-datalake/raw/coingecko/{now.year}/{now.month:02d}/{now.day:02d}/{now.hour:02d}/"

# Leitura dos JSONs
df = spark.read.option("multiLine", True).json(raw_path)

# Seleção e renomeação das colunas desejadas
df_trusted = df.select(
    col("id").alias("ID_MOEDA"),
    col("symbol").alias("SIMBOLO"),
    col("name").alias("NOME"),
    col("current_price").alias("PRECO_ATUAL"),
    col("market_cap").alias("VALOR_MERCADO"),
    col("market_cap_rank").alias("RANKING_MERCADO"),
    col("total_volume").alias("VOLUME_TOTAL"),
    col("price_change_24h").alias("VARIACAO_PRECO_24H"),
    col("price_change_percentage_24h").alias("PERCENTUAL_VARIACAO_PRECO_24H"),
    col("circulating_supply").alias("CIRCULACAO"),
    col("total_supply").alias("FORNECIMENTO_TOTAL"),
    col("max_supply").alias("FORNECIMENTO_MAXIMO"),
    col("ath").alias("PRECO_MAXIMO_HISTORICO"),
    col("ath_date").alias("DATA_MAXIMO_HISTORICO"),
    col("atl").alias("PRECO_MINIMO_HISTORICO"),
    col("atl_date").alias("DATA_MINIMO_HISTORICO"),
    col("ath_change_percentage").alias("VARIACAO_DESDE_MAXIMO"),
    col("atl_change_percentage").alias("VARIACAO_DESDE_MINIMO"),
    col("high_24h").alias("PRECO_MAXIMO_24H"),
    col("low_24h").alias("PRECO_MINIMO_24H"),
    col("market_cap_change_24h").alias("VARIACAO_VALOR_MERCADO_24H"),
    col("market_cap_change_percentage_24h").alias("PERCENTUAL_VARIACAO_VALOR_MERCADO_24H"),
    col("fully_diluted_valuation").alias("VALOR_TOTAL_DILUIDO"),
    col("last_updated").alias("ULTIMA_ATUALIZACAO")
)

# Adiciona a coluna DAT_CARGA no formato "YYYY-MM-DD:HH"
dat_carga_str = now.strftime("%Y-%m-%d:%H")
df_trusted = df_trusted.withColumn("DAT_CARGA", lit(dat_carga_str))

# Caminho de destino trusted
dest_path = f"s3://arquitetura-software-datalake/trusted/coingecko/{now.year}/{now.month:02d}/{now.day:02d}/{now.hour:02d}/"

# Escreve no S3 com overwrite por pasta
df_trusted.write.mode("overwrite").parquet(dest_path)


