#### Importação de Pacotes

In [0]:
from pyspark.sql.functions import *

#### Definição do Unity Catalog e Schema

In [0]:
%sql
USE CATALOG bees;
USE SCHEMA brewery;

In [0]:
%sql
CREATE TABLE IF NOT EXISTS brewery_silver(
  id STRING,
  name STRING,
  brewery_type STRING,
  address_1 STRING,
  address_2 STRING,
  address_3 STRING,
  city STRING,
  state_province STRING,
  postal_code STRING,
  country STRING,
  longitude DOUBLE,
  latitude DOUBLE,
  phone STRING,
  website_url STRING,
  state STRING
)
PARTITIONED BY (country)
COMMENT 'Tabela Silver dos dados de cervejarias consumidos da brewery_bronze.
Particionada por Country'

#### Importação da tabela Bronze

In [0]:
df_bronze = spark.read.table('brewery_bronze')

#### Transformação de Dados  
Ações Realizadas:
* Identificado que alguns registros da coluna "name" possuem problemas de Encoding.  
Realizado tratamento através da função UDF fix_encode

* Identificado que coluna "address_1" e "street" possuem os mesmos dados.  
Antes de deletar a coluna "street", ela foi aplicada em um coalesce com "address_1",  
para prevenir casos futuros de preenchimento apenas na coluna "street"

In [0]:
@udf()
def fix_encode(s):
    "Função UDF para solucionar problema de encode"
    try:
        return s.encode('LATIN1').decode('UTF-8')
    except:
        return s

In [0]:
df_transform = (df_bronze.withColumn("name", fix_encode(col('name')))
                         .withColumn('address_1', coalesce('address_1', 'street'))
                         .drop('street')
)

#### Data Quality  
Aplicadas verificações de valores nulos nos campos da tabela.  
* "is_address_missing" verifica valores nulos em endereços e código postal
* "is_location_missing" verifica valores nulos nos campos de localização: Cidade, Estado, País
* "is_lat_long_missing" verifica valores nulos nos campos de Latitude e Longitude
* "is_phone_missing" verifica o preenchimento do campo de telefone
* "is_website_missing" verifica o preenchimento do campo de url do website

In [0]:
df_quality = (df_transform.withColumn('is_address_missing',
                              (col('address_1').isNull() &
                              col('address_2').isNull() &
                              col('address_3').isNull()) |
                              col('postal_code').isNull()
                              )
                  .withColumn('is_location_missing', 
                              col('city').isNull() |
                              col('state_province').isNull() |
                              col('country').isNull()
                              )
                  .withColumn('is_lat_long_missing',
                              col('longitude').isNull() |
                              col('latitude').isNull()
                            )
                  .withColumn('is_phone_missing',col('phone').isNull())
                  .withColumn('is_website_missing',col('website_url').isNull())
                  .withColumn('data_quality',
                              struct(
                                'is_address_missing',
                                'is_location_missing',
                                'is_lat_long_missing',
                                'is_phone_missing',
                                'is_website_missing'
                              )
                            )
                  .drop('is_address_missing', 'is_location_missing',
                        'is_lat_long_missing', 'is_phone_missing',
                        'is_website_missing'
                        )
)

#### Output tabela Silver

In [0]:
(df_quality.write.format("delta")
                 .mode("overwrite")
                 .option("mergeSchema", True)
                 .saveAsTable("brewery_silver")
)

#### Criação de View para acompanhar integridade dos Dados

In [0]:
%sql
CREATE OR REPLACE VIEW vw_brewery_silver_quality
COMMENT "
View para acompanhamento de data quality da tabela breweries_silver
Coluna:
-check_quality indica o tipo de check realizado
-valor indica a quantidade de registros econtrados para o check realizado
-percent_tabela indica a porcentagem de registros econtrados para o check realizado em relação ao total de linhas da tabela"
AS
SELECT 
  check_quality,
  sum(valor) count,
  ROUND(100 * SUM(valor) / (SELECT count(1) FROM brewery_silver),1) percent_tabela
FROM (
  SELECT 
    stack(5,
    'is_address_missing', cast(data_quality.is_address_missing as int),
    'is_location_missing', cast(data_quality.is_location_missing as int),
    'is_lat_long_missing', cast(data_quality.is_lat_long_missing as int),
    'is_phone_missing', cast(data_quality.is_phone_missing as int),
    'is_website_missing', cast(data_quality.is_website_missing as int)
  ) AS (check_quality, valor)
  FROM
    brewery_silver
)
GROUP BY
  check_quality
ORDER BY 
  count DESC