LinQ User Log Data Stream

[DDL LinQ User Log Info](https://dbc-2c417bf1-de85.cloud.databricks.com/?o=3218536095592070#notebook/2075143146396255/command/298267976626763)


In [None]:
from pyspark.sql.types import StructType, StructField, StringType, LongType, ArrayType, TimestampType
import pyspark.sql.functions as F
import datetime
import boto3
import pytz

todaysDate= datetime.datetime.now(pytz.timezone('US/Eastern')).strftime("%Y%m%d")

In [None]:
# variables for paths
s3_bucket = "<work-s3-bucket>"
data_directory = "<data_directory>"
checkpoint_directory = "<checkpoint_directory>"

In [None]:
eventSchema = StructType([
    StructField("cusip", StringType(), True),
    StructField("organization", StringType(), True),
    StructField("path", StringType(), True),
    StructField("role", StringType(), True),
    StructField("source", StringType(), True),
    StructField("user", StringType(), True)
])

finalSchema = StructType([
    StructField("Event", ArrayType(eventSchema), True)
])

# Read data using readStream
incoming = spark.readStream.format("cloudFiles") \
    .option("cloudFiles.format", "json") \
    .option("wholeText", "true") \
    .option("ignoreMissingFiles", "true") \
    .schema(finalSchema) \
    .load("/mnt/Raw_Bucket/application_data/DataLogs/") \
        .select("*", "_metadata.file_path", "_metadata.file_name", "_metadata.file_modification_time") \
        .withColumn("file_modification_date", F.to_date("file_modification_time")) \
        .withColumn("file_modification_time", F.date_format("file_modification_time", "HH:mm:ss")) \
        .withColumn("utc_processing_time", F.current_timestamp()) \
        .withColumn("Event", F.explode("Event"))


In [None]:
# explode and save df
exploded_incoming = incoming.select(
    incoming["Event"]["cusip"].alias("cusip"),
    incoming["Event"]["organization"].alias("org"),
    incoming["Event"]["role"].alias("role"),
    incoming["Event"]["source"].alias("source"),
    incoming["Event"]["user"].alias("user"),
    "file_modification_date",
    "file_modification_time",
    "utc_processing_time",
    incoming["Event"]["path"].alias("path"),
    "file_name",
    "file_path"    
    )


In [None]:
# Write stream to table using f-string for paths
exploded_incoming.writeStream \
    .format("delta") \
    .option("cloudFiles.inferColumnTypes", "true") \
    .option("checkpointLocation", f"s3://{s3_bucket}/{checkpoint_directory}/{todays_date}") \
    .start(f's3://{s3_bucket}/{data_directory}/')

Out[4]: <pyspark.sql.streaming.query.StreamingQuery at 0xffff786addf0>