# Pipeline Name
**Notebook**: notebook_name.py

## Overview
Pipeline overview

## Dependencies
- **Modules**: 
`Kaizen.commons.processing.data_process`
`Kaizen.commons.processing.data_entities`
`Kaizen.commons.config.yaml_loader`
`Kaizen.commons.processing.logger`
`Kaizen.commons.config.connections`

- **SQL Template**: `../../../commons/sql/sql_path.sql`
- **DDL Template**: `../../../commons/ddl/ddl_path.sql`

## Table Schema
Input the table schema.

## Table Configuration
### Storage Properties
- **Format**: Delta Lake
- **Clustering**: (columns which clustering is applied if needed)

## Process Flow
1. **Table Creation**
   - Creates table if not exists with specified schema
   - Sets clustering and Delta properties

2. **Spark Configuration**
   - Enables adaptive query optimization
   - Enables adaptive optimizer

3. **Data Processing**
   Brief description of how the data is processed.

4. **Table Maintenance**
   - Performs VACUUM operation
   - Runs OPTIMIZE command
5. **SQL Lint**
   - Detect lint errors on SQL queries
   


## Execution Details
- **Schedule**: Daily run
- **Processing Mode**: Spark write method
- **Data Cleanup**: Removes current day's data before processing
- **SQL Implementation**: Data transformation logic embedded in SQL template

## Usage Notes
Additional usage notes

---
*Last Updated: December 2024*  
*Maintainer: CI&T Data Engineering Team*

In [0]:
from Kaizen.commons.processing.data_process import DataProcess
from Kaizen.commons.processing.data_entities import DeltaDestination
from Kaizen.commons.config.yaml_loader import ConfigLoader
from Kaizen.commons.processing.logger import get_logger
from Kaizen.commons.sql_lint.sql_linter import lint_sql_files



Collecting sqlfluff
  Obtaining dependency information for sqlfluff from https://files.pythonhosted.org/packages/59/b3/cc0d377a3354215edd7fa415c3909d2b76f6c803d0c47b38f61f488ae31c/sqlfluff-3.3.1-py3-none-any.whl.metadata
  Downloading sqlfluff-3.3.1-py3-none-any.whl.metadata (12 kB)
Collecting colorama>=0.3 (from sqlfluff)
  Obtaining dependency information for colorama>=0.3 from https://files.pythonhosted.org/packages/d1/d6/3965ed04c63042e047cb6a3e6ed1a63a35087b6a609aa3a15ed8ac56c221/colorama-0.4.6-py2.py3-none-any.whl.metadata
  Downloading colorama-0.4.6-py2.py3-none-any.whl.metadata (17 kB)
Collecting diff-cover>=2.5.0 (from sqlfluff)
  Obtaining dependency information for diff-cover>=2.5.0 from https://files.pythonhosted.org/packages/bf/36/4093a0d6bff40e6de69cadce3aa0cebe8597718c7cf4225a78b8cff2c861/diff_cover-9.2.2-py3-none-any.whl.metadata
  Downloading diff_cover-9.2.2-py3-none-any.whl.metadata (18 kB)
Collecting Jinja2 (from sqlfluff)
  Obtaining dependency information for Jin


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.2.1[0m[39;49m -> [0m[32;49m25.0.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [0]:
##Config files for spark settings and log settings
##Based on the yaml file on ../commons/config_files
config_loader = ConfigLoader('../../commons/config_files/default.yaml')
settings=config_loader.config["settings"]
log_settings = settings["log_settings"] | {"spark": spark, "context": "process_name"}
logger=get_logger(log_settings)
process=DataProcess(spark,settings["spark_session"],logger)

In [0]:
##Configuration for target table, spark session and if you need to create a table or not
config_destination = {
    "location_name": "table_name",
    "mode": "overwrite",#overwrite or append
    "create_table_from_schema": False #True or False,
    "create_table_from_ddl": True #To create a table from DDL
    "cleanup" : True #to clean up the table if it has a created_date column, deletes current day data
}
transformation_sql_path="../../commons/sql/sql_path.sql"#your path to SQL
table_ddl_path="../../commons/sql/ddl/ddl_path.sql"#your path to DDL

In [0]:
#Data processing classes, doesnt change since the parameters were already defined on the previous cell
transformation = process.read_sql_template(transformation_sql_path)
table_ddl=process.read_sql_template(table_ddl_path)

destination=DeltaDestination(location_name=config_destination["location_name"], mode=config_destination["mode"], table_definition=table_ddl)

process.transform(destination=destination, transformation=transformation, create_table_from_schema=config_destination["create_table_from_schema"],cleanup=config_destination["cleanup"], create_table_from_ddl=config_destination["create_table_from_ddl"])

In [0]:
# Check for linting errors in SQL queries
df = lint_sql_files(
    sql_path="/Workspace/Repos/*******/kaizen/Kaizen/commons/sql",  # Path to the directory or SQL file
    dialect="databricks",  # SQL dialect to be used by SQLFluff
    
    save_to_table=True,  # If True, saves the results in a Delta table in Databricks
    table_name="lint_results"  # Name of the Delta table where results will be stored
)