CI/CD for Databricks Pipelines: Git Integration & Automated Deployment
Demo Scenario:

Objective: Show how to automate testing and deployment of a Delta Live Tables (DLT) pipeline using GitHub Actions and Databricks CLI.

Steps:

Set up a Git repo with a DLT pipeline (e.g., the e-commerce ETL pipeline from Topic 1).

Configure CI/CD:

### Use databricks bundle to deploy pipelines as code.
We have two of the branch: main and prd 

Add a GitHub Actions workflow to run unit tests (e.g., validate schema, data quality checks) on PR merges.

Promote changes: Deploy from dev to prod using workspace-specific configs.

Show & Tell Focus:

Compare manual vs. automated deployments.

Demo rollback on failure (e.g., broken schema change).

unit tests play a vital role for several reasons:

Verification of Correctness: Unit tests verify the functionality of individual units of code, ensuring they behave as expected under various conditions.

Early Bug Detection: By identifying bugs early in the development process, developers can address them promptly, reducing the probability of issues propagating to other parts of the system.

Refactoring and Maintenance: Unit tests act as a safety net during code refactoring and maintenance, allowing developers to make changes confidently while ensuring consistent behavior.

Regression Testing: Unit tests serve as regression tests, ensuring that new changes or features do not break existing functionality, thereby maintaining system stability.

Run Function (myfunction.py)

In [0]:
!pip install pytest

In [0]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import pandas as pd

# Because this file is not a Databricks notebook, you
# must create a Spark session. Databricks notebooks
# create a Spark session for you by default.
spark = SparkSession.builder \
                    .appName('integrity-tests') \
                    .getOrCreate()

# Does the specified table exist in the specified database?
def tableExists(tableName, dbName):
  return spark.catalog.tableExists(f"{dbName}.{tableName}")

# Does the specified column exist in the given DataFrame?
def columnExists(dataFrame, columnName):
  if columnName in dataFrame.columns:
    return True
  else:
    return False

# How many rows are there for the specified value in the specified column
# in the given DataFrame?
def numRowsInColumnForValue(dataFrame, columnName, columnValue):
  df = dataFrame.filter(col(columnName) == columnValue)

  return df.count()

In [0]:
# envName     = "env_dev"
tableName   = "babynames"
dbName      = "leodb"
columnName  = "First Name"
columnValue = "CHLOE"

if tableExists(tableName, dbName):

  # df = spark.sql(f"SELECT * FROM {envName}.{dbName}.{tableName}")
  df = spark.read.csv(f"/Volumes/env_dev/leodb/babynames/babynames.csv")

  # And the specified column exists in that table...
  if columnExists(df, columnName):
    # Then report the number of rows for the specified value in that column.
    numRows = numRowsInColumnForValue(df, columnName, columnValue)

    print(f"There are {numRows} rows in '{tableName}' where '{columnName}' equals '{columnValue}'.")
  else:
    print(f"Column '{columnName}' does not exist in table '{tableName}' in schema (database) '{dbName}'.")
else:
  print(f"Table '{tableName}' does not exist in schema (database) '{dbName}'.") 

In [0]:
df1 = spark.read.csv(f"/Volumes/env_dev/leodb/babynames/babynames.csv",
    header=True,
    inferSchema=True,
    sep=",")
df1.display()

In [0]:
df = spark.createDataFrame(pd.read_csv('/Volumes/env_dev/leodb/babynames/babynames.csv'))
df

In [0]:
df.display()

In [0]:
import pytest
import pyspark
from myfunctions import *
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType, StringType

tableName   = "babynames"
dbName      = "leodb"
columnName  = "First Name"
columnValue = "CHLOE"

# Because this file is not a Databricks notebook, you
# must create a Spark session. Databricks notebooks
# create a Spark session for you by default.
spark = SparkSession.builder \
                    .appName('integrity-tests') \
                    .getOrCreate()

# Create fake data for the unit tests to run against.
# In general, it is a best practice to not run unit tests
# against functions that work with data in production.
schema = StructType([ \
  StructField("Year",     IntegerType(), True), \
  StructField("First Name",   StringType(),  True), \
  StructField("County",     StringType(),  True), \
  StructField("Sex",   StringType(),  True), \
  StructField("Count", IntegerType(), True), \
])

data = [ (2025, "Leo", "Albany",   "M", 10 ), \
         (2024, "Alex", "Albany",   "M", 8  ) ]

df = spark.createDataFrame(data, schema)

# Does the table exist?
def test_tableExists():
  assert tableExists(tableName, dbName) is True

# Does the column exist?
def test_columnExists():
  assert columnExists(df, columnName) is True

# Is there at least one row for the value in the specified column?
def test_numRowsInColumnForValue():
  assert numRowsInColumnForValue(df, columnName, columnValue) > 0