## Importação de Bibliotecas

Nesta célula, são importadas as bibliotecas necessárias para manipulação de dados e funções do Spark.

In [0]:
from pyspark.sql.functions import monotonically_increasing_id

### A seguir será usado o comando SQL `USE CATALOG` e `USE SCHEMA` para definir o catálogo e o schema ativos.

In [0]:
spark.sql("USE CATALOG mvp")
spark.sql("USE SCHEMA bronze")

### Lendo o arquivo CSV das estações com Spark e exibindo as 10 primeiras linhas, incluindo o cabeçalho

In [0]:
df_stations = spark.read.option("header", True).csv("/Volumes/mvp/staging/dataset/stations.csv")
display(df_stations.limit(10))

### Escrita da tabela `stations` em formato Delta

Nesta célula, o DataFrame `df_stations` é persistido no metastore do Databricks como uma **tabela Delta**, utilizando sobrescrita completa.
- A tabela `stations` passa a existir como uma tabela Delta
- Os dados ficam persistidos e versionados
- A tabela pode ser consultada diretamente por SQL, notebooks ou ferramentas de BI conectadas ao Databricks

In [0]:
df_stations.write.format("delta").mode("overwrite").saveAsTable("stations")

### Comentário da tabela `mvp.bronze.stations`

Nesta célula, é definido o comentário descritivo da tabela `mvp.bronze.stations`, documentando seu propósito e principais informações armazenadas no metastore para facilitar entendimento e uso futuro.


In [0]:
spark.sql("""
    comment on table mvp.bronze.stations is
    'The table contains information about various stations, including their geographical locations and operational details. It can be used for mapping station locations, analyzing regional coverage, and tracking the operational history of each station. Key data points include the region, state, city, and latitude/longitude coordinates.'
""")

### Comentários das colunas da tabela `mvp.bronze.stations`

Nesta célula, são definidos comentários descritivos para cada coluna da tabela `mvp.bronze.stations`, documentando o significado e a finalidade de cada campo diretamente no metastore.


In [0]:
COLUMN_COMMENTS = [
    ("region", "The geographical area or zone where the station is located"),
    ("state", "State where the station is located"),
    ("city_station", "The city where the station is located"),
    ("id_station", "Unique identifier assigned to each station"),
    ("lat", "Latitude coordinate of the station location"),
    ("lon", "Longitude coordinate of the station location"),
    ("lvl", "Altitude of the station"),
    ("record_first", "Date when data recording began for the station"),
    ("record_last", "Date of the most recent station record."),
]

for column, comment in COLUMN_COMMENTS:
    spark.sql(f"comment on column mvp.bronze.stations.{column} is '{comment}'")

### Inspeção do schema e metadata da tabela `mvp.bronze.stations`

Nesta célula, é realizada a inspeção detalhada da tabela `mvp.bronze.stations` utilizando comandos `DESCRIBE`.

Primeiro, o comando `DESCRIBE EXTENDED` é executado para obter schema e metadados completos da tabela. Em seguida, é criado um identificador auxiliar (`_id`) para permitir navegar pelas linhas do resultado e extrair o bloco de informações a partir da seção **Catalog**, exibindo apenas um subconjunto relevante dessas informações.

Por fim, o comando `DESCRIBE` padrão é executado para exibir exclusivamente o schema da tabela, com foco nas colunas e seus tipos de dados.


In [0]:
df_describe = spark.sql("describe extended mvp.bronze.stations")
df_describe = df_describe.withColumn("_id", monotonically_increasing_id())
target_id = df_describe.filter("col_name = 'Catalog'").select("_id").first()._id

table_describe = df_describe.filter(f"_id >= {target_id}").limit(9)
display(table_describe.drop("_id"))

display(spark.sql("describe mvp.bronze.stations"))

### Leitura dos dados meteorológicos

Nesta célula, os arquivos CSV de dados meteorológicos são carregados para um DataFrame Spark utilizando um padrão de nome (`weather_*_filtered.csv`), permitindo a leitura conjunta de múltiplos arquivos de forma automática. Em seguida, é exibida uma amostra dos dados para verificação do conteúdo carregado.


In [0]:
    #Abrindo CSVs
df_weather_data = (
    spark.read
    .option("header", True)
    .csv(f"/Volumes/mvp/staging/dataset/weather_*_filtered.csv")
)

display(df_weather_data.limit(10))

### Padronização dos nomes das colunas

Nesta célula, os nomes das colunas do DataFrame `df_weather_data` são padronizados por meio da substituição de caracteres especiais e espaços por sublinhados (`_`). Esse processo garante maior compatibilidade com Spark SQL, facilita consultas e evita problemas com caracteres não permitidos em nomes de colunas. Em seguida, é exibida uma amostra dos dados com os nomes das colunas já normalizados.


In [0]:
# Substituindo caracteres especiais não desejados
df_replaced_weather_data = df_weather_data.toDF(*[col.replace(' ', '_').replace('(', '_').replace(')', '_').replace('-', '_').replace('.', '_').replace('°', '_').replace('(', '_').replace(')', '_').replace(',', '_') for col in df_weather_data.columns])

display(df_replaced_weather_data.limit(10))

### Escrita da tabela `weather_data` em formato Delta

In [0]:
df_replaced_weather_data.write.format("delta").mode("overwrite").saveAsTable("weather_data")

### Comentário da tabela `mvp.bronze.weather_data`

In [0]:
spark.sql("""
    comment on table mvp.bronze.weather_data is
    'The table contains hourly weather data collected from various stations. It includes information such as temperature, humidity, precipitation, and wind conditions. Possible use cases include analyzing weather patterns, conducting climate research, and supporting agricultural planning by understanding local weather conditions.'
""")

### Comentários das colunas da tabela `mvp.bronze.weather_data`


In [0]:
COLUMN_COMMENTS = [
    ("DATA__YYYY_MM_DD_", "Date of the observation recorded in YYYY-MM-DD format."),
    ("Hora_UTC", "Hour of the observation recorded in Coordinated Universal Time (UTC)"),
    ("PRECIPITAÇÃO_TOTAL__HORÁRIO__mm_", "Total hourly precipitation measured in millimeters"),
    ("PRESSAO_ATMOSFERICA_AO_NIVEL_DA_ESTACAO__HORARIA__mB_", "Hourly atmospheric pressure at the station level, measured in millibars (mB)"),
    ("PRESSÃO_ATMOSFERICA_MAX_NA_HORA_ANT___AUT___mB_", "Maximum atmospheric pressure measured in the previous hour, recorded automatically, given in millibars (mB)"),
    ("PRESSÃO_ATMOSFERICA_MIN__NA_HORA_ANT___AUT___mB_", "Minimum atmospheric pressure recorded during the previous hour, measured in millibars."),
    ("RADIACAO_GLOBAL__KJ/m²_", "Amount of global radiation recorded in kilojoules per square meter"),
    ("TEMPERATURA_DO_AR___BULBO_SECO__HORARIA___C_", "Hourly dry-bulb air temperature recorded in degrees Celsius."),
    ("TEMPERATURA_DO_PONTO_DE_ORVALHO___C_", "Dew point temperature in Celsius, representing the air temperature at which condensation occurs."),
    ("TEMPERATURA_MÁXIMA_NA_HORA_ANT___AUT____C_", "Highest air temperature recorded in the previous hour, measured in degrees Celsius"),
    ("TEMPERATURA_MÍNIMA_NA_HORA_ANT___AUT____C_", "Lowest air temperature recorded in the preceding hour, in Celsius"),
    ("TEMPERATURA_ORVALHO_MAX__NA_HORA_ANT___AUT____C_", "Highest dew point temperature recorded during the previous hour (°C)"),
    ("TEMPERATURA_ORVALHO_MIN__NA_HORA_ANT___AUT____C_", "Minimum dew point temperature recorded in the previous hour, in Celsius"),
    ("UMIDADE_REL__MAX__NA_HORA_ANT___AUT___%_", "Maximum relative humidity recorded in the previous hour as measured automatically, expressed in percent."),
    ("UMIDADE_REL__MIN__NA_HORA_ANT___AUT___%_", "Lowest relative humidity recorded automatically in the previous hour, expressed as percentage"),
    ("UMIDADE_RELATIVA_DO_AR__HORARIA__%_", "Percentage of relative humidity measured for each hour"),
    ("VENTO__DIREÇÃO_HORARIA__gr______gr__", "Hourly wind direction measured in degrees"),
    ("VENTO__RAJADA_MAXIMA__m/s_", "Highest recorded wind gust speed during the hour, measured in meters per second"),
    ("VENTO__VELOCIDADE_HORARIA__m/s_", "Measured wind speed per hour, expressed in meters per second"),
    ("ESTACAO", "Station identifier where the hourly weather data was recorded"),
]

for column, comment in COLUMN_COMMENTS:
    spark.sql(f"comment on column mvp.bronze.weather_data.`{column}` is '{comment}'")

### Inspeção do schema e metadata da tabela `mvp.bronze.weather_data`

In [0]:
df_describe = spark.sql("describe extended mvp.bronze.weather_data")
df_describe = df_describe.withColumn("_id", monotonically_increasing_id())
target_id = df_describe.filter("col_name = 'Catalog'").select("_id").first()._id

table_describe = df_describe.filter(f"_id >= {target_id}").limit(9)
display(table_describe.drop("_id"))

display(spark.sql("describe mvp.bronze.weather_data"))