In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
dbutils.widgets.dropdown("environment", "prd", ["prd"], "1-Environment")
env = dbutils.widgets.get("environment")

dbutils.widgets.text("microbatch_seconds", "0", "2-Microbatch Seconds")
microbatch_seconds = int(dbutils.widgets.get("microbatch_seconds"))

source = "events"
country = "US"
source_path = f"/Volumes/{env}_bronze/landing/ticketmaster/{source}/{country}"
source_schema = f"/Volumes/{env}/pipeline/schema/api_ticketmaster/{source}"

target_table = f"prd_bronze.api_ticketmaster.{source}"
target_checkpoint = f"/Volumes/{env}/pipeline/checkpoint/bronze/api_ticketmaster/{source}"

streaming_queue_name = f"Ingestion {source}"
source_filter = "*.json"

print(f"Source: {source}")
print(f"Source Path: {source_path}")
print(f"Source Schema: {source_schema}")
print(f"Target Table: {target_table}")
print(f"Target Checkpoint: {target_checkpoint}")
print(f"Streaming Queue: {streaming_queue_name}")

Source: events
Source Path: /Volumes/prd_bronze/landing/ticketmaster/events/US
Source Schema: /Volumes/prd/pipeline/schema/api_ticketmaster/events
Target Table: prd_bronze.api_ticketmaster.events
Target Checkpoint: /Volumes/prd/pipeline/checkpoint/bronze/api_ticketmaster/events
Streaming Queue: Ingestion events


In [0]:
df = (spark
        .readStream.format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("cloudFiles.schemaLocation", f"{source_schema}")
        .option("cloudFiles.inferColumnTypes", "false")
        .option("cloudFiles.schemaEvolutionMode", "addNewColumns")
        .option("readerCaseSensitive", "false")
        .option("pathGlobalFilter", {source_filter})
        .load(f"{source_path}/")
    )

In [0]:
venue_schema = StructType([
    StructField("name", StringType()),
    StructField("type", StringType()),
    StructField("id", StringType()),
    StructField("test", BooleanType()),
    StructField("locale", StringType()),
    StructField("postalCode", StringType()),
    StructField("timezone", StringType()),
    StructField("city", StructType([StructField("name", StringType())])),
    StructField("state", StructType([
        StructField("name", StringType()),
        StructField("stateCode", StringType())
    ])),
    StructField("country", StructType([
        StructField("name", StringType()),
        StructField("countryCode", StringType())
    ])),
    StructField("address", StructType([StructField("line1", StringType())])),
    StructField("location", StructType([
        StructField("longitude", StringType()),
        StructField("latitude", StringType())
    ])),
    StructField("upcomingEvents", StructType([
        StructField("archtics", IntegerType(), True),
        StructField("ticketmaster", IntegerType(), True),
        StructField("_total", IntegerType(), True),
        StructField("_filtered", IntegerType(), True)
    ])),
    StructField("_links", StructType([
        StructField("self", StructType([StructField("href", StringType())]))
    ])),
    StructField("images", ArrayType(StructType([
        StructField("ratio", StringType()),
        StructField("url", StringType()),
        StructField("width", IntegerType()),
        StructField("height", IntegerType()),
        StructField("fallback", BooleanType())
    ])), True),
    StructField("url", StringType(), True),
    StructField("markets", ArrayType(StructType([
        StructField("name", StringType()),
        StructField("id", StringType())
    ])), True),
    StructField("dmas", ArrayType(StructType([
        StructField("id", IntegerType())
    ])), True),
    StructField("social", StructType([
        StructField("twitter", StructType([
            StructField("handle", StringType())
        ]))
    ]), True),
    StructField("boxOfficeInfo", StructType([
        StructField("openHoursDetail", StringType()),
        StructField("acceptedPaymentDetail", StringType()),
        StructField("willCallDetail", StringType())
    ]), True),
    StructField("parkingDetail", StringType(), True),
    StructField("accessibleSeatingDetail", StringType(), True),
    StructField("generalInfo", StructType([
        StructField("generalRule", StringType()),
        StructField("childRule", StringType())
    ]), True),
    StructField("ada", StructType([
        StructField("adaPhones", StringType()),
        StructField("adaCustomCopy", StringType()),
        StructField("adaHours", StringType())
    ]), True)
])

event_schema = StructType([
    StructField("name", StringType()),
    StructField("type", StringType()),
    StructField("id", StringType()),
    StructField("test", BooleanType()),
    StructField("locale", StringType()),
    StructField("images", ArrayType(StructType([
        StructField("ratio", StringType()),
        StructField("url", StringType()),
        StructField("width", IntegerType()),
        StructField("height", IntegerType()),
        StructField("fallback", BooleanType())
    ]))),
    StructField("dates", StructType([
        StructField("start", StructType([
            StructField("localDate", StringType()),
            StructField("localTime", StringType()),
            StructField("dateTime", StringType()),
            StructField("dateTBD", BooleanType()),
            StructField("dateTBA", BooleanType()),
            StructField("timeTBA", BooleanType()),
            StructField("noSpecificTime", BooleanType())
        ])),
        StructField("end", StructType([
            StructField("localDate", StringType()),
            StructField("localTime", StringType()),
            StructField("dateTime", StringType()),
            StructField("approximate", BooleanType()),
            StructField("noSpecificTime", BooleanType())
        ])),
        StructField("timezone", StringType()),
        StructField("status", StructType([
            StructField("code", StringType())
        ])),
        StructField("spanMultipleDays", BooleanType())
    ])),
    StructField("ticketing", StructType([
        StructField("safeTix", StructType([
            StructField("enabled", BooleanType())
        ])),
        StructField("id", StringType()),
        StructField("allInclusivePricing", StructType([
            StructField("enabled", BooleanType())
        ]), True)
    ])),
    StructField("_links", StructType([
        StructField("self", StructType([StructField("href", StringType())])),
        StructField("venues", ArrayType(StructType([StructField("href", StringType())])))
    ])),
    StructField("_embedded", StructType([
        StructField("venues", ArrayType(venue_schema))
    ])),
    StructField("url", StringType(), True),
    StructField("sales", StructType([
        StructField("public", StructType([
            StructField("startDateTime", StringType()),
            StructField("startTBD", BooleanType()),
            StructField("startTBA", BooleanType()),
            StructField("endDateTime", StringType())
        ]), True),
        StructField("presales", ArrayType(StructType([
            StructField("startDateTime", StringType()),
            StructField("endDateTime", StringType()),
            StructField("name", StringType())
        ])), True)
    ]), True),
    StructField("classifications", ArrayType(StructType([
        StructField("primary", BooleanType()),
        StructField("segment", StructType([
            StructField("id", StringType()),
            StructField("name", StringType())
        ])),
        StructField("genre", StructType([
            StructField("id", StringType()),
            StructField("name", StringType())
        ])),
        StructField("subGenre", StructType([
            StructField("id", StringType()),
            StructField("name", StringType())
        ])),
        StructField("type", StructType([
            StructField("id", StringType()),
            StructField("name", StringType())
        ])),
        StructField("subType", StructType([
            StructField("id", StringType()),
            StructField("name", StringType())
        ])),
        StructField("family", BooleanType())
    ])), True),
    StructField("promoter", StructType([
        StructField("id", StringType()),
        StructField("name", StringType()),
        StructField("description", StringType())
    ]), True),
    StructField("promoters", ArrayType(StructType([
        StructField("id", StringType()),
        StructField("name", StringType()),
        StructField("description", StringType())
    ])), True),
    StructField("pleaseNote", StringType(), True),
    StructField("seatmap", StructType([
        StructField("staticUrl", StringType()),
        StructField("id", StringType())
    ]), True),
    StructField("accessibility", StructType([
        StructField("ticketLimit", IntegerType()),
        StructField("id", StringType())
    ]), True),
    StructField("ageRestrictions", StructType([
        StructField("legalAgeEnforced", BooleanType()),
        StructField("id", StringType())
    ]), True),
    StructField("priceRanges", ArrayType(StructType([
        StructField("type", StringType()),
        StructField("currency", StringType()),
        StructField("min", DoubleType()),
        StructField("max", DoubleType())
    ])), True)
])

schema = StructType([
    StructField("events", ArrayType(event_schema))
])

df2 = (
    df.withColumn(
        "_embedded", 
        from_json(col("_embedded"), schema)
    ).select(
        explode(col("_embedded.events")).alias("event")
    ).select("event.*")
    .withColumn("__file_name", expr(f"regexp_replace(url_decode(_metadata.file_path), '^{source_path}/', '')"))
    .withColumn("__ingestion_timestamp", to_timestamp(expr("split(__file_name, '-')[3]"), "yyyyMMddHHmmss"))
)

In [0]:
is_stream =  microbatch_seconds > 1
trigger_kwargs = {"processingTime": f"{microbatch_seconds} seconds"} if is_stream else {"availableNow": True}

(df2.writeStream
    .format("delta")
    .option("mergeSchema", "true")
    .option("checkpointLocation", f"{target_checkpoint}")
    .queryName(streaming_queue_name)
    .trigger(**trigger_kwargs)
    .outputMode("append")
    .toTable(target_table))

<pyspark.sql.connect.streaming.query.StreamingQuery at 0x7f696a24d1d0>