In [0]:
import dlt
from pyspark.sql.functions import *

@dlt.table
def notes_st():
    """
    Reads the raw clinical notes TXT data as a streaming source.
    """
    path = "/Volumes/hls_omop/cdm_542/landing/notes"

    schema = """
        value STRING,
        note_id STRING
    """

    return (
        spark.readStream.schema(schema)
            .format("cloudFiles")
            .option("cloudFiles.format", "text")
            .option("wholeText", True)
            .load(path)
            .withColumn("note_id", col("_metadata.file_name"))
            )

In [0]:
@dlt.table
def notes_mv():
    """
    Strips the patient name and the patient ID out of the note ID field and creates two new fields for them.
    """
    return (
    spark.read.table("notes_st")
    .withColumnRenamed("value", "note_text")
    .withColumn("patient_name", regexp_replace(regexp_substr('note_id', lit(r'^([A-Za-záéíóúÁÉÍÓÚ]*_){1,4}')), '_', ' '))
    .withColumn("patient_id", regexp_substr('note_id', lit(r'[a-z0-9]*-[a-z0-9]*-[a-z0-9]*-[a-z0-9]*-[a-z0-9]*')))
)