In [None]:
%pip install dbl-tempo

Python interpreter will be restarted.
Python interpreter will be restarted.


In [None]:
import urllib
from pyspark.sql.window import Window
import pyspark.sql.functions as f
from pyspark.sql.functions import concat,col,concat_ws,to_timestamp
from pyspark.sql.types import *
from tempo import *
import boto3
from botocore.config import Config
# Note that the dlt library works while executing a pipeline workflow to build the table, 
# it does not work when executing the commands within the python notebook itself.      
import dlt


@dlt.create_table(
  comment="The raw machina dataset, ingested from the github sample paruqet file",
  table_properties={
    "quality": "bronze",
    "pipelines.autoOptimize.managed": "true"      
  }
)
def machina_raw():
    file_location = "/FileStore/tables/new_user_credentials.csv"
    file_type = "csv"

    # unmount in case mounted
    dbutils.fs.unmount("/mnt/machina-stg-parquet")    
    
    # Define file type
    file_type = "csv" # Whether the file has a header
    first_row_is_header = "true"# Delimiter used in the file
    delimiter = ","# Read the CSV file to spark dataframe
    aws_keys_df = spark.read.format(file_type)\
    .option("header", first_row_is_header)\
    .option("sep", delimiter)\
    .load(file_location)

    # Get the AWS access key and secret key from the spark dataframe
    ACCESS_KEY = aws_keys_df.where(col('User name')=='machina-log-user').select('Access key ID').collect()[0]['Access key ID']
    SECRET_KEY = aws_keys_df.where(col('User name')=='machina-log-user').select('Secret access key').collect()[0]['Secret access key']

    # Encode the secrete key
    ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

    # AWS S3 bucket name
    # s3://machina-stg-parquet
    AWS_S3_BUCKET = "machina-stg-parquet"# Mount name for the bucket
    MOUNT_NAME = "/mnt/machina-stg-parquet"# Source url
    SOURCE_URL = "s3n://{0}:{1}@{2}".format(ACCESS_KEY, ENCODED_SECRET_KEY, AWS_S3_BUCKET)# Mount the drive
    dbutils.fs.mount(SOURCE_URL, MOUNT_NAME)

    df = (spark.readStream.format("cloudFiles") \
          .option("cloudFiles.format", "parquet") \
          .option("mergeSchema", "true") \
          .schema("time STRING, value DOUBLE, field STRING, robot_id BIGINT, run_uuid DOUBLE, sensor_type STRING") \
          .load("/mnt/machina-stg-parquet"))
    
    return (df) 

#################################################################################################################################################################################
#################################################################################################################################################################################
@dlt.create_table(
  comment="Machina raw data set processed for data quality issues.",
  spark_conf={"spark.databricks.delta.schema.autoMerge.enabled": "true"},    
  table_properties={
    "quality": "silver", 
    "pipelines.autoOptimize.managed": "true"      
  }, 
    # https://sparkbyexamples.com/pyspark/pyspark-structtype-and-structfield/
    # https://sparkbyexamples.com/pyspark/pyspark-sql-types-datatype-with-examples/    
    # Pivot issue: "I was able to get around this by specifying the table schema in the table decorator."
    # https://community.databricks.com/s/question/0D58Y00009AIhACSA1/delta-live-tables-not-inferring-table-schema-properly     
  schema=StructType([ \
    StructField("run_uuid",  DoubleType(), False), \
    StructField("unix_milliseconds",  DoubleType(), False), \
    StructField("timestamp", TimestampType(), False), \
    StructField("fx_1",  DoubleType(), True), \
    StructField("fx_2",  DoubleType(), True), \
    StructField("fy_1",  DoubleType(), True), \
    StructField("fy_2",  DoubleType(), True), \
    StructField("fz_1",  DoubleType(), True), \
    StructField("fz_2",  DoubleType(), True), \
    StructField("x_1",  DoubleType(), True), \
    StructField("x_2",  DoubleType(), True), \
    StructField("y_1",  DoubleType(), True), \
    StructField("y_2",  DoubleType(), True), \
    StructField("z_1",  DoubleType(), True), \
    StructField("z_2",  DoubleType(), True)
  ]),
  partition_cols=["run_uuid"]  
)
@dlt.expect("valid_run_uuid_and_timestamp", "run_uuid IS NOT NULL AND timestamp IS NOT NULL")
def machina_cleaned():
    df = dlt.read("machina_raw") #"clickstream_raw")
    df = df.withColumn('field_robot_id', concat_ws('_',df.field,df.robot_id))
    df = df.withColumn("timestamp", to_timestamp(col("time")))
    df = df.drop(df['time'])  
    df = df.withColumn("unix_milliseconds", col("timestamp").cast("double")*1000)
    df = df.dropDuplicates()
    df = df.orderBy("run_uuid", "unix_milliseconds")    
    # The pivot() function is not supported. 
    # The pivot operation in Spark requires eager loading of input data to compute the schema of the output. This capability is not supported in Delta Live Tables.
    # https://docs.databricks.com/workflows/delta-live-tables/delta-live-tables-python-ref.html    
    # https://community.databricks.com/s/question/0D58Y000098j9FrSAI/why-does-dlttable-from-a-table-give-different-results-than-from-a-view
    # pivot workaround. 
    cols = ["fx_1", "fx_2", "fy_1", "fy_2", "fz_1", "fz_2", "x_1", "x_2", "y_1", "y_2", "z_1", "z_2"]
    pivotDF = df.groupBy("run_uuid", "unix_milliseconds", "timestamp").pivot("field_robot_id", cols).sum("value")
    #pivotDF = pivotDF.withColumn("run_uuid", pivotDF["run_uuid"].cast(DecimalType(38, 0))) 
    #pivotDF = pivotDF.withColumn("unix_milliseconds", pivotDF["unix_milliseconds"].cast(DecimalType(38, 0))) 
    return pivotDF

#################################################################################################################################################################################
#################################################################################################################################################################################
# Create another silver table, keep the cleaned up but non-processed data in machina_cleaned. 
# In the new silver table, I can add new features, deal with null values, and resampple the time series. 
@dlt.create_table(
  comment="Take transpose data in machina_cleaned, deal with null values, resample to 10 milliseconds, and add new feature columns.",
  spark_conf={"spark.databricks.delta.schema.autoMerge.enabled": "false"},    
  table_properties={
    "quality": "silver", 
    "pipelines.autoOptimize.managed": "true"      
  }, 
    # https://sparkbyexamples.com/pyspark/pyspark-structtype-and-structfield/
    # https://sparkbyexamples.com/pyspark/pyspark-sql-types-datatype-with-examples/    
    # Pivot issue: "I was able to get around this by specifying the table schema in the table decorator."
    # https://community.databricks.com/s/question/0D58Y00009AIhACSA1/delta-live-tables-not-inferring-table-schema-properly     
  schema=StructType([ \
    StructField("run_uuid",  DoubleType(), False), \
    StructField("unix_milliseconds",  DoubleType(), False), \
    StructField("timestamp", TimestampType(), False), \
    StructField("fx_1",  DoubleType(), True), \
    StructField("fx_2",  DoubleType(), True), \
    StructField("fy_1",  DoubleType(), True), \
    StructField("fy_2",  DoubleType(), True), \
    StructField("fz_1",  DoubleType(), True), \
    StructField("fz_2",  DoubleType(), True), \
    StructField("x_1",  DoubleType(), True), \
    StructField("x_2",  DoubleType(), True), \
    StructField("y_1",  DoubleType(), True), \
    StructField("y_2",  DoubleType(), True), \
    StructField("z_1",  DoubleType(), True), \
    StructField("z_2",  DoubleType(), True), \
    StructField("vx_1",  DoubleType(), True), \
    StructField("vy_1",  DoubleType(), True), \
    StructField("vz_1",  DoubleType(), True), \
    StructField("vx_2",  DoubleType(), True), \
    StructField("vy_2",  DoubleType(), True), \
    StructField("vz_2",  DoubleType(), True), \
    StructField("ax_1",  DoubleType(), True), \
    StructField("ay_1",  DoubleType(), True), \
    StructField("az_1",  DoubleType(), True), \
    StructField("ax_2",  DoubleType(), True), \
    StructField("ay_2",  DoubleType(), True), \
    StructField("az_2",  DoubleType(), True), \
    StructField("v1",  DoubleType(), True), \
    StructField("v2",  DoubleType(), True), \
    StructField("a1",  DoubleType(), True), \
    StructField("a2",  DoubleType(), True), \
    StructField("f1",  DoubleType(), True), \
    StructField("f2",  DoubleType(), True)
  ]),
  partition_cols=["run_uuid"]  
)
@dlt.expect("valid_run_uuid_and_timestamp", "run_uuid IS NOT NULL AND timestamp IS NOT NULL")
def machina_model_v1():
    df = dlt.read("machina_cleaned") #"clickstream_raw") # machina_cleaned
    #df = df.withColumn("unix_milliseconds", col("timestamp").cast("double")*1000)
    #df = df.orderBy("run_uuid", "unix_milliseconds")    

    df = df.drop(df['unix_milliseconds'])
    df_tsdf = TSDF(df, ts_col="timestamp", partition_cols = ["run_uuid"])
    cols = ["fx_1", "fx_2", "fy_1", "fy_2", "fz_1", "fz_2", "x_1", "x_2", "y_1", "y_2", "z_1", "z_2"]

    # What the following interpolation method does is:
    # 1. Aggregate columnA and columnBN  using mean into 10 ms intervals
    # 2. Interpolate any missing intervals or null values using linear fill and bfill for any remaining values. 
    interpolated_tsdf = df_tsdf.interpolate(
        freq="10 ms",
        func="mean",
        target_cols= cols,
        method="ffill" # ffill
    )

    df2 = interpolated_tsdf.interpolate(
        freq="10 ms",
        func="mean",
        target_cols= cols,
        method="bfill"
    )

    #df_linear = df2.df 
    #df_linear = df_linear.withColumn("unix_milliseconds", col("timestamp").cast("double")*1000)    
    #df_linear = df_linear.orderBy("run_uuid", "timestamp")      
    #df_linear = df_linear.select("run_uuid", "unix_milliseconds", "timestamp", "fx_1", "fx_2", "fy_1", "fy_2", "fz_1", "fz_2", "x_1", "x_2", "y_1", "y_2", "z_1", "z_2")

    df_ffill = df2.df 
    df_ffill = df_ffill.withColumn("unix_milliseconds", col("timestamp").cast("double")*1000)    
    df_ffill = df_ffill.orderBy("run_uuid", "timestamp")      
    df_ffill = df_ffill.select("run_uuid", "unix_milliseconds", "timestamp", "fx_1", "fx_2", "fy_1", "fy_2", "fz_1", "fz_2", "x_1", "x_2", "y_1", "y_2", "z_1", "z_2")   

    window = Window.partitionBy("run_uuid").orderBy("timestamp")
    # .fillna(value=0) # This fills all null integer cols with 0
    df_ffill = df_ffill.withColumn("unix_milliseconds_delta", f.col("unix_milliseconds") - f.lag(f.col("unix_milliseconds"), 1).over(window)).fillna(value=0) 
    df_ffill = df_ffill.withColumn("x_1_delta", f.col("x_1") - f.lag(f.col("x_1"), 1).over(window))
    df_ffill = df_ffill.withColumn("y_1_delta", f.col("y_1") - f.lag(f.col("y_1"), 1).over(window))
    df_ffill = df_ffill.withColumn("z_1_delta", f.col("z_1") - f.lag(f.col("z_1"), 1).over(window))
    df_ffill = df_ffill.withColumn("x_2_delta", f.col("x_2") - f.lag(f.col("x_2"), 1).over(window))
    df_ffill = df_ffill.withColumn("y_2_delta", f.col("y_2") - f.lag(f.col("y_2"), 1).over(window))
    df_ffill = df_ffill.withColumn("z_2_delta", f.col("z_2") - f.lag(f.col("z_2"), 1).over(window)).fillna(value=0)
    df_ffill = df_ffill.withColumn("vx_1", f.col("x_1_delta") / f.col("unix_milliseconds_delta"))
    df_ffill = df_ffill.withColumn("vy_1", f.col("y_1_delta") / f.col("unix_milliseconds_delta"))
    df_ffill = df_ffill.withColumn("vz_1", f.col("z_1_delta") / f.col("unix_milliseconds_delta"))
    df_ffill = df_ffill.withColumn("vx_2", f.col("x_2_delta") / f.col("unix_milliseconds_delta"))
    df_ffill = df_ffill.withColumn("vy_2", f.col("y_2_delta") / f.col("unix_milliseconds_delta"))
    df_ffill = df_ffill.withColumn("vz_2", f.col("z_2_delta") / f.col("unix_milliseconds_delta")).fillna(value=0)
    
    df_ffill = df_ffill.withColumn("ax_1", f.col("vx_1") / f.col("unix_milliseconds_delta"))
    df_ffill = df_ffill.withColumn("ay_1", f.col("vy_1") / f.col("unix_milliseconds_delta"))
    df_ffill = df_ffill.withColumn("az_1", f.col("vz_1") / f.col("unix_milliseconds_delta"))
    df_ffill = df_ffill.withColumn("ax_2", f.col("vx_2") / f.col("unix_milliseconds_delta"))
    df_ffill = df_ffill.withColumn("ay_2", f.col("vy_2") / f.col("unix_milliseconds_delta"))
    df_ffill = df_ffill.withColumn("az_2", f.col("vz_2") / f.col("unix_milliseconds_delta")).fillna(value=0)  
    # Placeholder columns until formula figured out. 
    df_ffill = df_ffill.withColumn("v1", f.lit(0.0))
    df_ffill = df_ffill.withColumn("v2", f.lit(0.0))
    df_ffill = df_ffill.withColumn("a1", f.lit(0.0))
    df_ffill = df_ffill.withColumn("a2", f.lit(0.0))
    df_ffill = df_ffill.withColumn("f1", f.lit(0.0))
    df_ffill = df_ffill.withColumn("f2", f.lit(0.0))      
    
    cols_to_drop = ("unix_milliseconds_delta","x_1_delta","y_1_delta","z_1_delta","x_2_delta","y_2_delta","z_2_delta")
    df_ffill = df_ffill.drop(*cols_to_drop)    
    return df_ffill      

#################################################################################################################################################################################
#################################################################################################################################################################################
@dlt.create_table(
  comment="Create stats per run id.",
  spark_conf={"spark.databricks.delta.schema.autoMerge.enabled": "true"},    
  table_properties={
    "quality": "gold", 
    "pipelines.autoOptimize.managed": "true"      
  },  
  schema=StructType([ \
    StructField("run_uuid",  DoubleType(), False), \
    StructField("min_timestamp", TimestampType(), True), \
    StructField("max_timestamp", TimestampType(), True), \
    StructField("min_unix_milliseconds",  DoubleType(), True), \
    StructField("max_unix_milliseconds",  DoubleType(), True), \
    StructField("total_runtime_seconds",  LongType(), True), \
    StructField("total_runtime_ms",  DoubleType(), True), \
    StructField("total_distance_traveled",  DoubleType(), True), \
  ]),
)
@dlt.expect("valid_run_uuid", "run_uuid IS NOT NULL")
def machina_run_uuid_stats():
    df = dlt.read("machina_model_v1") #"clickstream_raw")
    window = Window.partitionBy("run_uuid").orderBy("timestamp")

    groupDF = df.groupBy("run_uuid").agg(f.min("timestamp"), f.max("timestamp"), f.min("unix_milliseconds"), f.max("unix_milliseconds"))

    groupDF = groupDF.withColumn('total_runtime_seconds',col("max(timestamp)").cast("long") - col('min(timestamp)').cast("long")) \
        .withColumn('total_runtime_ms',col("max(unix_milliseconds)") - col('min(unix_milliseconds)')) \
        .withColumn('total_distance_traveled', f.lit(0.0)) 

    groupDF = groupDF.withColumnRenamed("min(timestamp)", "min_timestamp")
    groupDF = groupDF.withColumnRenamed("max(timestamp)", "max_timestamp")
    groupDF = groupDF.withColumnRenamed("min(unix_milliseconds)", "min_unix_milliseconds")
    groupDF = groupDF.withColumnRenamed("max(unix_milliseconds)", "max_unix_milliseconds")
    return groupDF

[0;31m---------------------------------------------------------------------------[0m
[0;31mModuleNotFoundError[0m                       Traceback (most recent call last)
[0;32m<command-180995071469317>[0m in [0;36m<cell line: 17>[0;34m()[0m
[1;32m     15[0m [0;32mimport[0m [0mboto3[0m[0;34m[0m[0;34m[0m[0m
[1;32m     16[0m [0;32mfrom[0m [0mbotocore[0m[0;34m.[0m[0mconfig[0m [0;32mimport[0m [0mConfig[0m[0;34m[0m[0;34m[0m[0m
[0;32m---> 17[0;31m [0;32mimport[0m [0mdlt[0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m     18[0m [0;34m[0m[0m
[1;32m     19[0m [0;34m[0m[0m

[0;32m/databricks/python_shell/dbruntime/PythonPackageImportsInstrumentation/__init__.py[0m in [0;36mimport_patch[0;34m(name, globals, locals, fromlist, level)[0m
[1;32m    169[0m             [0;31m# Import the desired module. If you’re seeing this while debugging a failed import,[0m[0;34m[0m[0;34m[0m[0;34m[0m[0m
[1;32m    170[0m             [0;31m# look at prec