## Aplicação de Esquema no Delta Lake

A aplicação de esquema é um recurso essencial do Delta Lake que garante a qualidade dos dados, evitando que dados inválidos sejam gravados em uma tabela. Esse mecanismo atua como um guardião, verificando se os dados que estão sendo gravados estão em conformidade com o esquema definido para a tabela. Essa verificação ocorre durante as operações de gravação, garantindo que a integridade dos dados seja mantida em todos os momentos.

Vejamos os seguintes aspectos da aplicação de esquema:

* **O que é um esquema?** Um esquema é essencialmente um modelo que define a estrutura dos dados. Ele especifica os tipos de dados para cada coluna, os nomes das colunas e os metadados associados aos dados. No contexto do Delta Lake, esse esquema é armazenado em formato JSON dentro do log de transações, fornecendo um registro transparente e acessível da estrutura da tabela.

* **Como funciona a aplicação de esquema?** O Delta Lake emprega validação de esquema em gravações, o que significa que sempre que novos dados são gravados na tabela, eles são verificados quanto à compatibilidade com o esquema da tabela de destino. Se houver alguma discrepância, a operação de gravação é rejeitada e uma exceção é gerada, alertando o usuário sobre a incompatibilidade. Esse processo garante que apenas dados que estejam em conformidade com o esquema definido sejam persistidos na tabela Delta Lake.

* **Regras de aplicação de esquema**:
    * **Colunas Adicionais**: O Delta Lake proíbe a inclusão de colunas adicionais que não estejam definidas no esquema da tabela. Se os dados recebidos contiverem colunas extras, a operação de gravação será rejeitada.
    * **Menos Colunas**: O Delta Lake permite a gravação de dados que contenham menos colunas do que o definido no esquema da tabela. As colunas ausentes serão preenchidas com valores nulos. No entanto, é importante observar que, embora essa flexibilidade seja permitida, o esquema da tabela de destino permanece inalterado.
    * **Tipos de Dados Incompatíveis**: Cada coluna no esquema da tabela Delta Lake possui um tipo de dados associado. Se os dados recebidos contiverem um tipo de dados diferente para uma coluna, a aplicação de esquema gerará uma exceção, evitando que a gravação seja concluída. Por exemplo, se uma coluna na tabela Delta Lake for definida como String, mas os dados recebidos contiverem um valor inteiro para essa coluna, a operação de gravação será rejeitada.

* **Benefícios da Aplicação de Esquema**:
    * **Garantia da Qualidade dos Dados**: Ao impor regras rígidas sobre a estrutura dos dados, a aplicação de esquema atua como um mecanismo de controle de qualidade crucial, garantindo que apenas dados formatados corretamente entrem na tabela Delta Lake.
    * **Confiabilidade para Sistemas de Produção**: Para sistemas de produção, especialmente aqueles que alimentam algoritmos de aprendizado de máquina, painéis ou ferramentas de análise e visualização de dados, a aplicação de esquema é inestimável. Ela garante que os dados usados por esses sistemas sejam confiáveis e livres de problemas relacionados a tipos de dados e estrutura.

* **Cenários do Mundo Real**: A aplicação de esquema se assemelha à verificação de identidade que você encontra ao entrar nas instalações da sua empresa. Assim como a verificação de identidade garante que apenas pessoas autorizadas acessem as instalações, a aplicação de esquema garante que apenas dados que atendam aos critérios especificados sejam permitidos na tabela Delta Lake. É uma medida de segurança para manter a integridade e confiabilidade dos dados.


In [0]:
spark.conf.set("fs.azure.account.auth.type.deltadbstgpina.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.deltadbstgpina.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.deltadbstgpina.dfs.core.windows.net", "39113859-cd89-41ee-af19-bedb601eac67")
spark.conf.set("fs.azure.account.oauth2.client.secret.deltadbstgpina.dfs.core.windows.net", "U6D8Q~EI-8UPcYENtNQ9GFpKRqS~81_U8oBzXcNX")
spark.conf.set("fs.azure.account.oauth2.client.endpoint.deltadbstgpina.dfs.core.windows.net", "https://login.microsoftonline.com/63c8a3d9-530b-4409-afe8-69c29700ddff/oauth2/token")

In [0]:
source = "abfss://test@deltadbstgpina.dfs.core.windows.net/"


## Reading data with More Columns


In [0]:

from pyspark.sql.types import StructType,StructField, StringType, IntegerType,DateType,FloatType,DoubleType

schema1 = StructType([
    StructField('Education_Level',StringType()),
    StructField('Line_Number',IntegerType()),
    StructField('Employed',IntegerType()),
    StructField('Unemployed',IntegerType()),
    StructField('Industry',StringType()),
    StructField('Gender',StringType()),
    StructField('Date_Inserted',StringType()),
    StructField('dense_rank',IntegerType()),
    StructField('Max_Salary_USD',IntegerType())
])


Atenção: O "`df_morecols`" possui uma coluna adicional chamada "`Max_salary_USD`".

In [0]:
df_moreCols = (spark.read.format('csv')
                        .schema(schema1)
                        .option('header','true')
                        .load(f'{source}/SchemaEvol/SchemaMoreCols.csv'))

In [0]:
df_moreCols.printSchema()

In [0]:
df_moreCols.write.format('delta').mode('append').saveAsTable('`delta`.deltaspark')


## Source with Less Columns

In [0]:

from pyspark.sql.types import StructType,StructField, StringType, IntegerType,DateType,FloatType,DoubleType

schema = StructType([
    StructField('Education_Level',StringType()),
    StructField('Line_Number',IntegerType()),
    StructField('Employed',IntegerType()),
    StructField('Unemployed',IntegerType()),
    StructField('Industry',StringType()),
    StructField('Gender',StringType())
])

In [0]:
df_lessCols = (spark.read.format('csv')
                        .schema(schema)
                        .option('header','true')
                        .load(f'{source}/SchemaEvol/SchemaLessCols.csv'))

In [0]:
df_lessCols.write.format('delta').mode('append').saveAsTable('`delta`.deltaspark')

In [0]:
%sql
SELECT * FROM `delta`.deltaspark


## Source data with different data type

Para esse exemplo não houve a declaração do schema.

In [0]:
df_diff = (spark.read.format('csv')
            .option('header','true')
            .load(f'{source}/files/*.csv'))

In [0]:
df_diff.write.format('delta').mode('append').saveAsTable('`delta`.deltaspark')