###  3.2.1 Dados de Vendas - Census Bureau
Criado em : 12/01/2025

Responsável: Mateus Moraes

In [0]:
import requests
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
from pyspark.sql.functions import current_date
from pyspark.sql.functions import current_timestamp



In [0]:

# Inicializando o SparkSession
spark = SparkSession.builder.appName("API_Dados_Varejo").getOrCreate()

# Função para fazer a requisição e transformar em DataFrame
def get_data_for_year(year):
    url = f"https://api.census.gov/data/timeseries/eits/mtis"
    params = {
        "get": "data_type_code,time_slot_id,seasonally_adj,category_code,cell_value,error_data",
        "for": "us:*",  # Para todos os estados
        "time": str(year),  # Ano específico
        "key": "ad10c09d5b9bc5882d5db37ec65a8944082152b8"  # Sua chave da API
    }

    # Fazendo a requisição
    response = requests.get(url, params=params)

    if response.status_code == 200:
        data = response.json()

        # Se houver dados, transformar em DataFrame do PySpark
        if data:
            columns = data[0]  # Cabeçalhos
            rows = data[1:]    # Dados

            # Criar um DataFrame PySpark
            df = spark.createDataFrame(rows, columns)

            # Adicionar uma coluna com o ano (para identificar de qual ano são os dados)
            df = df.withColumn("year", lit(year))

            return df
        else:
            return None
    else:
        print(f"Erro ao obter dados para {year}: {response.status_code}, {response.text}")
        return None

# Recolher dados de 2010 a 2013 e empilhar
dfs = []

for year in range(2010, 2014):
    df_year = get_data_for_year(year)
    if df_year is not None:
        dfs.append(df_year)

# Empilhar os DataFrames
if dfs:
    final_df = dfs[0]
    for df in dfs[1:]:
        final_df = final_df.union(df)

    # Mostrar os dados combinados
    display(final_df)

data_type_code,time_slot_id,seasonally_adj,category_code,cell_value,error_data,time,us,year
E_IM,0,no,TOTBUS,0.5,yes,2010-12,1,2010
IM,0,yes,TOTBUS,1367670.0,no,2010-06,1,2010
IM,0,yes,TOTBUS,1450371.0,no,2010-12,1,2010
E_IM,0,yes,TOTBUS,0.5,yes,2010-06,1,2010
E_IM,0,yes,TOTBUS,0.5,yes,2010-12,1,2010
IR,0,no,TOTBUS,1.21,no,2010-06,1,2010
E_SM,0,no,WHLSLR,1.0,yes,2010-07,1,2010
E_SM,0,no,WHLSLR,1.1,yes,2010-12,1,2010
SM,0,yes,WHLSLR,356706.0,no,2010-06,1,2010
SM,0,yes,WHLSLR,380210.0,no,2010-12,1,2010


In [0]:

# Adicionar a coluna de ingestion_date (data da ingestão)
final_df_with_ingestion_timestamp = final_df.withColumn("ingestion_timestamp", current_timestamp())

# Mostrar o dataframe com a nova coluna
display(final_df_with_ingestion_timestamp)

# Salvar o dataframe em formato Parquet no Databricks (diretório de destino)

final_df_with_ingestion_timestamp.write.mode("overwrite").format("parquet").saveAsTable("3_2_Sales_data_census_bureau")


data_type_code,time_slot_id,seasonally_adj,category_code,cell_value,error_data,time,us,year,ingestion_timestamp
E_IM,0,no,TOTBUS,0.5,yes,2010-12,1,2010,2025-01-12T22:18:57.185+0000
IM,0,yes,TOTBUS,1367670.0,no,2010-06,1,2010,2025-01-12T22:18:57.185+0000
IM,0,yes,TOTBUS,1450371.0,no,2010-12,1,2010,2025-01-12T22:18:57.185+0000
E_IM,0,yes,TOTBUS,0.5,yes,2010-06,1,2010,2025-01-12T22:18:57.185+0000
E_IM,0,yes,TOTBUS,0.5,yes,2010-12,1,2010,2025-01-12T22:18:57.185+0000
IR,0,no,TOTBUS,1.21,no,2010-06,1,2010,2025-01-12T22:18:57.185+0000
E_SM,0,no,WHLSLR,1.0,yes,2010-07,1,2010,2025-01-12T22:18:57.185+0000
E_SM,0,no,WHLSLR,1.1,yes,2010-12,1,2010,2025-01-12T22:18:57.185+0000
SM,0,yes,WHLSLR,356706.0,no,2010-06,1,2010,2025-01-12T22:18:57.185+0000
SM,0,yes,WHLSLR,380210.0,no,2010-12,1,2010,2025-01-12T22:18:57.185+0000
