### **DLT Pipeline**

#### Download CSV file
The following code downloads a CSV file and stores it in the specified volume.

In [None]:
import urllib

my_catalog = "<catalog-name>"
my_schema = "<schema-name>"
my_volume = "<volume-name>"

spark.sql(f"CREATE SCHEMA IF NOT EXISTS {my_catalog}.{my_schema}")
spark.sql(f"CREATE VOLUME IF NOT EXISTS {my_catalog}.{my_schema}.{my_volume}")

volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"
download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
filename = "babynames.csv"

urllib.request.urlretrieve(download_url, volume_path + filename)

#### Create Pipeline

To configure a new pipeline, do the following:

1. In the sidebar, click **Pipelines**.
2. Click **Create pipeline**.
3. In Pipeline name, type a **unique pipeline name**.
4. Select the **Serverless** checkbox.
5. In **Destination**, to configure a Unity Catalog location where tables are published, select a **Catalog and a Schema**.
6. In **Advanced**, click Add configuration and then **define pipeline parameters** for the catalog, schema, and volume to which you downloaded data using the following parameter names:
    
    - my_catalog

    - my_schema
    
    - my_volume
7. Click **Create**.

#### **DLT Code (Python)** 
##### Imports necessary modules

In [None]:
# Import modules

import dlt
from pyspark.sql.functions import *

##### References parameters defined during pipeline configuration

In [None]:
# Assign pipeline parameters to variables

my_catalog = spark.conf.get("my_catalog")
my_schema = spark.conf.get("my_schema")
my_volume = spark.conf.get("my_volume")

# Define the path to source data

volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"


##### Defines a streaming table named `baby_names_raw` that ingests from a volume.

In [None]:
# Define a streaming table to ingest data from a volume

@dlt.table(
  comment="Popular baby first names in New York. This data was ingested from the New York State Department of Health."
)
def baby_names_raw():
  df = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .option("inferSchema", True)
    .option("header", True)
    .load(volume_path)
  )
  df_renamed_column = df.withColumnRenamed("First Name", "First_Name")
  return df_renamed_column

##### Defines a materialized view named `baby_names_prepared` that validates ingested data.

In [None]:
# Define a materialized view that validates data and renames a column

@dlt.table(
  comment="New York popular baby first name data cleaned and prepared for analysis."
)
@dlt.expect("valid_first_name", "First_Name IS NOT NULL")
@dlt.expect_or_fail("valid_count", "Count > 0")
def baby_names_prepared():
  return (
    spark.read.table("baby_names_raw")
      .withColumnRenamed("Year", "Year_Of_Birth")
      .select("Year_Of_Birth", "First_Name", "Count")
  )

##### Defines a materialized view named `top_baby_names_2021` that has a highly refined view of the data.

In [None]:
# Define a materialized view that has a filtered, aggregated, and sorted view of the data

@dlt.table(
  comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
  return (
    spark.read.table("baby_names_prepared")
      .filter(expr("Year_Of_Birth == 2021"))
      .groupBy("First_Name")
      .agg(sum("Count").alias("Total_Count"))
      .sort(desc("Total_Count"))
      .limit(10)
  )

**OR**
#### **DLT Code (SQL)** 

In [None]:
-- Define a streaming table to ingest data from a volume

CREATE OR REFRESH STREAMING TABLE baby_names_raw
COMMENT "Popular baby first names in New York. This data was ingested from the New York State Department of Health."
AS SELECT Year, `First Name` AS First_Name, County, Sex, Count
FROM STREAM(read_files(
  '/Volumes/${my_catalog}/${my_schema}/${my_volume}/',
  format => 'csv',
  header => true,
  mode => 'FAILFAST'));

-- Define a materialized view that validates data and renames a column

CREATE OR REFRESH MATERIALIZED VIEW baby_names_prepared(
  CONSTRAINT valid_first_name EXPECT (First_Name IS NOT NULL),
  CONSTRAINT valid_count EXPECT (Count > 0) ON VIOLATION FAIL UPDATE
)
COMMENT "New York popular baby first name data cleaned and prepared for analysis."
AS SELECT
  Year AS Year_Of_Birth,
  First_Name,
  Count
FROM baby_names_raw;

-- Define a materialized view that provides a filtered, aggregated, and sorted view of the data

CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
  First_Name,
  SUM(Count) AS Total_Count
FROM baby_names_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC
LIMIT 10;

### **2. DLT Development Features**
#### 2.1 Modes
There are 2 modes available 
1. Development     
2. Production 

![image.png](attachment:image.png)

Above buttons is in the Pipelines UI to switch between these two modes.By default, pipelines run in **development** mode.



2.1.1. Development Mode

- **Reuses** a cluster to avoid the overhead of restarts
- By default, clusters run for **two hours**
- To control cluster shutdown behavior, `pipelines.clusterShutdown.delay` is used in pipeline configuration
- **Disables pipeline retries** so you can immediately detect and fix errors.

2.1.2. Production Mode

- **Shuts cluster** after imediate effect of complition of pipeline
- **Restarts cluster** for specific recoverable errors, memory leaks and stale credentials
- **Retries execution** in the event of specific errors, such as a failure to start a cluster.
- The default value for `pipelines.clusterShutdown.delay` is `0s`.

#### 2.2 Validate

To check whether a pipeline’s **source code is valid without running a full update**, use  `Validate`.

- check for definitions of datasets and flows.
- does not materialize or publish any datasets.
- Errors found during validation, such as incorrect table or column names, are reported in the UI