***This is a Snowflake Notebook. Please import this notebook in Snowflake Notebook UI to continue running the BRONZE ingestion pipeline***

# Overview

This notebook guides you through the setup and execution of a BRONZE ingestion pipeline using Snowflake and Iceberg tables. It covers the following steps:

- Importing required Python packages and establishing a Snowflake session.
- Setting up user-specific variables, roles, databases, and schemas.
- Creating file formats for ingesting different types of files such as CSV and Parquet for batch data ingestion.
- Different ingestion patterns supported by Snowflake for loading data into Snowflake Iceberg tables
- Creating and loading raw data tables (locations, schedules, cancel codes, and movements) using Iceberg.
- Providing SQL and Python code examples for data ingestion and exploration.

Follow the instructions and code cells to complete the ingestion pipeline and prepare your data for further analysis.

In [None]:
# Import python packages
import streamlit as st
import pandas as pd

# We can also use Snowpark for our analyses!
from snowflake.snowpark.context import get_active_session
session = get_active_session()


# Set User Number

Update the cell below with your unique user number for this lab. This ensures that all resources you create are isolated and do not conflict with those of other users.

**usernum = '999'** or **usernum = '001'**

In [None]:
usernum = str('<INSERT USER NUMBER HERE>')

In [None]:



SET USERNAME = 'HOL_USER_' || '{{usernum}}';
SELECT $USERNAME;


In [None]:
SET HOLROLE = $USERNAME || '_FULL_ROLE';
SET DB_NAME = $USERNAME || '_DB';
SET SCHEMANAME = 'BRONZE';

In [None]:
USE ROLE IDENTIFIER($HOLROLE);
USE DATABASE IDENTIFIER($DB_NAME);
USE SCHEMA IDENTIFIER($SCHEMANAME);

# Create File Formats for Data Ingestion

The following cell creates a file format named `CSV_FORMAT` in Snowflake. This file format is configured to handle CSV files with specific parsing options, such as parsing headers, using a comma as the field delimiter, trimming spaces, and treating empty fields as null values. This format will be used for ingesting CSV data into Snowflake tables.

In [None]:
CREATE OR REPLACE FILE FORMAT CSV_FORMAT
TYPE = CSV
PARSE_HEADER = TRUE
FIELD_DELIMITER = ','
TRIM_SPACE = TRUE
NULL_IF = ( 'null')
EMPTY_FIELD_AS_NULL = TRUE;

# Create File Format for Parquet Data Ingestion

The following cell creates a file format named `parquet_format` in Snowflake. This format will be used for ingesting Parquet data into Snowflake Iceberg tables.

In [None]:
CREATE OR REPLACE FILE FORMAT parquet_format
  TYPE = PARQUET
  USE_VECTORIZED_SCANNER = TRUE
  USE_LOGICAL_TYPE = TRUE
  BINARY_AS_TEXT = TRUE;

In [None]:
# Creating a Snowflake Stage for Raw File Ingestion

A Snowflake stage is created to facilitate the ingestion of raw files that are pushed to the landing zone. The stage uses a storage integration, which provides a secure and managed access channel to an external S3 bucket. This ensures that data can be ingested efficiently and securely without exposing sensitive credentials.

**Note:** The storage integration (`s3_iceberg_hol_int`) has already been pre-created as part of the environment setup, so you do not need to configure it manually.

For more details on storage integrations in Snowflake, refer to the official documentation: [Snowflake Storage Integration](https://docs.snowflake.com/en/sql-reference/sql/create-storage-integration)

In [None]:
--create external stage
CREATE OR REPLACE STAGE ICEBRG_HOL_STG
  STORAGE_INTEGRATION = s3_iceberg_hol_int
  URL = 's3://sf-iceberg-hol-landing/';

# Creating the LOCATIONS_RAW Table

The following step creates an Iceberg table named `LOCATIONS_RAW` in Snowflake. This table is designed to store raw location data, which will be ingested from CSV files. 

In [None]:
# create locations bronze table. 
session.sql("""
create or replace iceberg table locations_raw (
location_id varchar,
name varchar,
description varchar,
tiploc varchar,
crs varchar,
nlc varchar,
stanox varchar,
notes varchar,
longitude varchar,
latitude varchar,
isOffNetwork varchar,
timingPointType varchar
)
EXTERNAL_VOLUME = 'iceberg_hol_bronze_vol'
CATALOG = 'SNOWFLAKE'
BASE_LOCATION = 'hol_user_""" + usernum + """/locations_raw/'
catalog_sync = iceberg_hol_oc_int;
""").show()

In [None]:
--Data is loaded from csv files in landing stage

COPY INTO locations_raw
from @ICEBRG_HOL_STG/openraildata-talk-carl-partridge-ukrail_locations.csv
FILE_FORMAT = CSV_FORMAT
MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE;

# Creating the SCHEDULE_RAW Table

The following step creates an Iceberg table named `SCHEDULE_RAW` in Snowflake. This table is designed to store raw schedule data ingested from Parquet files. 

**Note:** The `SCHEDULE_RAW` table contains nested columns (such as arrays and objects) to accommodate complex schedule data structures. These nested columns will be flattened and transformed in subsequent pipeline steps to facilitate easier querying and analysis.

In [None]:
--create schedule table

CREATE OR REPLACE ICEBERG TABLE SCHEDULE_RAW (
	CIF_BANK_HOLIDAY_RUNNING STRING,
	CIF_STP_INDICATOR STRING,
	CIF_TRAIN_UID STRING,
	APPLICABLE_TIMETABLE STRING,
	ATOC_CODE STRING,
	NEW_SCHEDULE_SEGMENT OBJECT(traction_class STRING, uic_code STRING),
	SCHEDULE_DAYS_RUNS STRING,
	SCHEDULE_END_DATE DATE,
	SCHEDULE_SEGMENT OBJECT(signalling_id STRING, CIF_train_category STRING, CIF_headcode STRING, CIF_course_indicator STRING, CIF_train_service_code STRING, CIF_business_sector STRING, CIF_power_type STRING, CIF_timing_load STRING, CIF_speed STRING, CIF_operating_characteristics STRING, CIF_train_class STRING, CIF_sleepers STRING, CIF_reservations STRING, CIF_connection_indicator STRING, CIF_catering_code STRING, CIF_service_branding STRING, schedule_location ARRAY(OBJECT(arrival STRING, departure STRING, engineering_allowance STRING, line STRING, pass STRING, path STRING, location_type STRING, pathing_allowance STRING, performance_allowance STRING, platform STRING, public_arrival STRING, public_departure STRING, record_identity STRING, tiploc_code STRING, tiploc_instance STRING))),
	SCHEDULE_START_DATE DATE,
	TRAIN_STATUS STRING,
	TRANSACTION_TYPE STRING
)
 EXTERNAL_VOLUME = 'iceberg_hol_bronze_vol'
 CATALOG = 'SNOWFLAKE'
 BASE_LOCATION = 'hol_user_{{usernum}}/SCHEDULE_RAW/'
 catalog_sync = iceberg_hol_oc_int;

## Understanding the `load_mode` Parameter in the `COPY INTO` Command

The `load_mode` parameter in the `COPY INTO` command controls how files are ingested into Snowflake tables. When working with Parquet files and Iceberg tables, setting `load_mode = add_files_copy` enables a lightweight and cost-effective ingestion process.

**Key Benefits of `ADD_FILES_COPY`:**
- **Lightweight Ingestion:** Instead of physically copying data, Snowflake registers the Parquet files' metadata, making the ingestion process much faster.
- **Cost Effective:** Since data is not duplicated, storage costs are minimized and compute usage is reduced.
- **Efficient for Large Datasets:** Ideal for scenarios where large volumes of Parquet files need to be ingested quickly into Iceberg tables.

This approach is especially useful for data lakes and big data workflows, where minimizing data movement and optimizing resource usage are critical.

In [None]:

COPY INTO SCHEDULE_RAW
from @ICEBRG_HOL_STG/train_schedule/
FILE_FORMAT = parquet_format
load_mode = add_files_copy
MATCH_BY_COLUMN_NAME = CASE_SENSITIVE;

# Reading Delta Tables In-Place with Snowflake

Snowflake supports reading Delta Lake tables directly in place, without requiring any data copy or movement. This capability allows you to query and analyze data stored in Delta format on external storage, leveraging Snowflake's compute and security features.

**Key Points:**
- **No Data Copy Required:** Snowflake can read Delta tables in place, eliminating the need for time-consuming and costly data duplication.
- **External Volume & Catalog Integrations:** The required external volume and catalog integrations have already been created as part of the environment setup.
- **Iceberg Metadata Generation:** When the external volume is created with the `ALLOW_WRITE` property set to `TRUE`, Snowflake can automatically generate Iceberg metadata for Delta tables, enabling seamless interoperability

For more information, refer to the official documentation:
- [External Volumes in Snowflake](https://docs.snowflake.com/en/user-guide/data-external-volumes)
- [Catalog Integrations in Snowflake](https://docs.snowflake.com/en/user-guide/catalog-integration)

In [None]:

CREATE OR REPLACE ICEBERG TABLE CANCEL_CODE_REFERENCES_RAW
  CATALOG = ICEBERG_HOL_DELTA_INT
  EXTERNAL_VOLUME = iceberg_hol_bronze_vol
  BASE_LOCATION = 'cancel_code_references_raw/cancel_codes/'
  AUTO_REFRESH = TRUE;

In [None]:
select * from CANCEL_CODE_REFERENCES_RAW limit 10;

# Creating MOVEMENTS_RAW_00X Tables

The following cell creates three Iceberg tables: `MOVEMENTS_RAW_0001`, `MOVEMENTS_RAW_0002`, and `MOVEMENTS_RAW_0003`. These tables are designed to store different types of raw train event data:

- **MOVEMENTS_RAW_0001:** Holds train activations event data.
- **MOVEMENTS_RAW_0002:** Holds train cancellations event data.
- **MOVEMENTS_RAW_0003:** Holds train movements event data.

While the table structures are defined in the next cell, the actual data will be loaded into these tables using the Snowflake Openflow service, which enables efficient and scalable ingestion of streaming event data.

The source data for these tables originates from a Kafka environment hosted within Snowflake Container Services. A producer program pulls events from the Network Rail message queue and pushes the messages to the Kafka broker, from where they are ingested into the respective tables.

In [None]:


create or replace ICEBERG TABLE MOVEMENTS_RAW_0001 (
	"VALUE" VARCHAR
)
EXTERNAL_VOLUME = 'iceberg_hol_bronze_vol'
CATALOG = 'SNOWFLAKE'
BASE_LOCATION = 'hol_user_{{usernum}}/movements_raw_001/'
catalog_sync = iceberg_hol_oc_int;

create or replace ICEBERG TABLE MOVEMENTS_RAW_0002 (
    "VALUE" VARCHAR
)
EXTERNAL_VOLUME = 'iceberg_hol_bronze_vol'
CATALOG = 'SNOWFLAKE'
BASE_LOCATION = 'hol_user_{{usernum}}/movements_raw_002/'
catalog_sync = iceberg_hol_oc_int;

create or replace ICEBERG TABLE MOVEMENTS_RAW_0003 (
   "VALUE" VARCHAR
)
EXTERNAL_VOLUME = 'iceberg_hol_bronze_vol'
CATALOG = 'SNOWFLAKE'
BASE_LOCATION = 'hol_user_{{usernum}}/movements_raw_003/'
catalog_sync = iceberg_hol_oc_int;

# Accessing the Openflow Service

To continue with the lab, please open a new browser tab and navigate to the Openflow service URL provided in your lab login instructions. 

**Instructions:**
- Use the same credentials you used to log in to Snowflake.
- The Openflow service URL is included in your lab materials or login instructions.
- Follow the guidelines provided in the README file under the `bronze/openflow` directory to run the Openflow pipeline and ingest data.
