## Setup

Primeiro, vamos garantir que estamos usando o catálogo correto e que o schema 00_landing existe.



In [0]:
SELECT current_catalog(), current_schema()


In [0]:
USE CATALOG smart_claims_dev

In [0]:
SELECT current_catalog(), current_schema()


In [0]:
USE SCHEMA 00_landing

In [0]:
SELECT current_catalog(), current_schema()


## EXPLORE THE DATA SOURCE FILES



Listar arquivos dentro do volume

In [0]:
LIST '/Volumes/smart_claims_dev/00_landing/sql_server'

Mesmo comando usando python

In [0]:
%python
files = dbutils.fs.ls(
    '/Volumes/smart_claims_dev/00_landing/sql_server'
)
display(files)

Query os arquivos CSV usando o path do volume

In [0]:
SELECT *
FROM csv.`/Volumes/smart_claims_dev/00_landing/sql_server/claims.csv`
--drop the table if it exists for demonstration

## Batch Data Ingestion with CTAs

CTAs with CREATE TABLE AS é uma maneira eficiente de ingerir dados em massa. Sempre reescreve a tabela inteira.

Dentro do Unity Catalog, podemos criar tabelas Delta a partir de arquivos em volumes.




In [0]:
SELECT *
FROM read_files(
  '/Volumes/smart_claims_dev/00_landing/sql_server/claims.csv',
  format => 'csv'
)
LIMIT 10;


In [0]:
--drop the table if it exists for demonstration purposes
DROP TABLE IF EXISTS smart_claims_dev.01_bronze.claims;

--create the table using the CREATE TABLE AS statement
CREATE TABLE smart_claims_dev.01_bronze.claims
AS SELECT *
FROM read_files(
  '/Volumes/smart_claims_dev/00_landing/sql_server/claims.csv',
  format => 'csv'
)

In [0]:
SELECT *
FROM smart_claims_dev.01_bronze.claims limit 10

In [0]:
DESCRIBE TABLE EXTENDED smart_claims_dev.01_bronze.claims;

## Python Ingestion

In [0]:
DROP TABLE IF EXISTS smart_claims_dev.01_bronze.claims;

In [0]:
SELECT * FROM smart_claims_dev.01_bronze.claims limit 10

In [0]:
%python
df = (
    spark.read
    .format("csv")
    .option("header", True)
    .load("/Volumes/smart_claims_dev/00_landing/sql_server/claims.csv")
)

df.write.mode("overwrite").saveAsTable("smart_claims_dev.01_bronze.claims")

claims_table = spark.table("smart_claims_dev.01_bronze.claims")

display(claims_table)

## COPY INTO

Sempre vai trazer dados incrementalmente.

_Incremental batch ingestion_

Essa feature está descontinuada no databricks.

**Usaremos Batch ou Streaming com Auto Loader + Lakeflow**

_aula04/notebooks/04_create_streaming_tables_Auto_Loader_

In [0]:
DROP TABLE IF EXISTS smart_claims_dev.01_bronze.claims;

CREATE OR REPLACE TABLE smart_claims_dev.01_bronze.claims (
    claim_no STRING,
    policy_no STRING,
    claim_date STRING,
    months_as_customer STRING,
    injury STRING,
    property STRING,
    vehicle STRING,
    total STRING,
    collision_type STRING,
    number_of_vehicles_involved STRING,
    age STRING,
    insured_relationship STRING,
    license_issue_date STRING,
    date STRING,        -- renomeei `date` para evitar conflito
    hour STRING,
    type STRING,
    severity STRING,
    number_of_witnesses STRING,
    suspicious_activity STRING
)
COMMENT 'Landing raw claims data (CSV → Bronze, raw strings)';


In [0]:
COPY INTO smart_claims_dev.01_bronze.claims
FROM '/Volumes/smart_claims_dev/00_landing/sql_server/claims.csv'
FILEFORMAT = CSV
FORMAT_OPTIONS ('header'='true')
COPY_OPTIONS ('mergeSchema'='false');


In [0]:
%python
df = spark.read.csv("/Volumes/smart_claims_dev/00_landing/sql_server/claims.csv", header=True)
print(df.columns)


In [0]:
SELECT * FROM smart_claims_dev.01_bronze.claims limit 10