In [None]:
def getSecret(secretName):
    linked_service = "ifmpmvault"
    akv_name = "ifm-vault"

    # Fetch the key from Azure Key Vault
    secretValue = mssparkutils.credentials.getSecret(
        linkedService=linked_service,
        akvName=akv_name, 
        secret=secretName)
    return secretValue

In [1]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

data_lake_account_name = getSecret('data-lake-account-name')
file_system_name = getSecret('file-system-name')

basepath = f"abfss://{file_system_name}@{data_lake_account_name}.dfs.core.windows.net"

In [2]:
telemetry_path = f'abfss://{file_system_name}@{data_lake_account_name}.dfs.core.windows.net/msdyn_iothub_generated/msdyn-iothub-ii3ywz6swv5z2/*/*/*' #2022/10/1' 
telemetry = spark.read.load(telemetry_path, format='json')

telemetry = telemetry.select("SystemProperties.connectionDeviceId","Body.Timestamp","Body.volt","Body.rotate","Body.pressure","Body.vibration")
cols = ['connectionDeviceId','tdatetime','volt','rotate','pressure','vibration']
telemetry = telemetry.toDF(*cols)

df_assets = spark.sql('''select msdyn_customerassetid, a.msdyn_name, i.msdyn_name as connectionDeviceId
from dataverse_d365env_generated_data.msdyn_customerasset as a 
left join dataverse_d365env_generated_data.connection as c on a.msdyn_customerassetid = c.record1id
left join dataverse_d365env_generated_data.msdyn_iotdevice as i on i.msdyn_iotdeviceid = c.record2id
''')
telemetry = telemetry.join(df_assets,on='connectionDeviceId').select('msdyn_customerassetid','tdatetime','volt','rotate','pressure','vibration')

telemetry = telemetry.groupBy(['msdyn_customerassetid','tdatetime']).agg(max("volt").alias("volt"),max("rotate").alias("rotate"),max("pressure").alias("pressure"),max("vibration").alias("vibration"))

In [21]:
# telemetry = spark.sql('''select t.*, machine_guid as msdyn_customerassetid, t.datetime as msdyn_alerttime 
# from machine_telemetry_data as t inner join machine_data as m on t.machineID = m.machineID''')

In [3]:
# rolling mean and standard deviation
# Temporary storage for rolling means
tel_mean = telemetry

# Which features are we interested in telemetry data set
rolling_features = ['volt','rotate', 'pressure', 'vibration']
      
# n hours = n * 3600 seconds  
time_val = 12 * 3600

# Choose the time_val hour timestamps to align the data
# dt_truncated looks at the column named "datetime" in the current data set.
# remember that Spark is lazy... this doesn't execute until it is in a withColumn statement.
dt_truncated = ((round(unix_timestamp(col("tdatetime")) / time_val) * time_val).cast("timestamp"))

In [4]:
import pyspark.sql.functions as F
from pyspark.sql.functions import col, unix_timestamp, round
from pyspark.sql.functions import datediff
from pyspark.sql.window import Window
from pyspark.sql.types import DoubleType

# We choose windows for our rolling windows 12hrs, 24 hrs and 36 hrs
lags = [12, 24, 36]

# align the data
for lag_n in lags:
    wSpec = Window.partitionBy('msdyn_customerassetid').orderBy('tdatetime').rowsBetween(1-lag_n, 0)
    for col_name in rolling_features:
        tel_mean = tel_mean.withColumn(col_name+'_rollingmean_'+str(lag_n), 
                                       F.avg(col(col_name)).over(wSpec))
        tel_mean = tel_mean.withColumn(col_name+'_rollingstd_'+str(lag_n), 
                                       F.stddev(col(col_name)).over(wSpec))

# Calculate lag values...
telemetry_feat = (tel_mean.withColumn("dt_truncated", dt_truncated)
                  .drop('volt', 'rotate', 'pressure', 'vibration')
                  .fillna(0)
                  .groupBy("msdyn_customerassetid","dt_truncated")
                  .agg(F.mean('volt_rollingmean_12').alias('volt_rollingmean_12'),
                       F.mean('rotate_rollingmean_12').alias('rotate_rollingmean_12'), 
                       F.mean('pressure_rollingmean_12').alias('pressure_rollingmean_12'), 
                       F.mean('vibration_rollingmean_12').alias('vibration_rollingmean_12'), 
                       F.mean('volt_rollingmean_24').alias('volt_rollingmean_24'),
                       F.mean('rotate_rollingmean_24').alias('rotate_rollingmean_24'), 
                       F.mean('pressure_rollingmean_24').alias('pressure_rollingmean_24'), 
                       F.mean('vibration_rollingmean_24').alias('vibration_rollingmean_24'),
                       F.mean('volt_rollingmean_36').alias('volt_rollingmean_36'),
                       F.mean('vibration_rollingmean_36').alias('vibration_rollingmean_36'),
                       F.mean('rotate_rollingmean_36').alias('rotate_rollingmean_36'), 
                       F.mean('pressure_rollingmean_36').alias('pressure_rollingmean_36'), 
                       F.stddev('volt_rollingstd_12').alias('volt_rollingstd_12'),
                       F.stddev('rotate_rollingstd_12').alias('rotate_rollingstd_12'), 
                       F.stddev('pressure_rollingstd_12').alias('pressure_rollingstd_12'), 
                       F.stddev('vibration_rollingstd_12').alias('vibration_rollingstd_12'), 
                       F.stddev('volt_rollingstd_24').alias('volt_rollingstd_24'),
                       F.stddev('rotate_rollingstd_24').alias('rotate_rollingstd_24'), 
                       F.stddev('pressure_rollingstd_24').alias('pressure_rollingstd_24'), 
                       F.stddev('vibration_rollingstd_24').alias('vibration_rollingstd_24'),
                       F.stddev('volt_rollingstd_36').alias('volt_rollingstd_36'),
                       F.stddev('rotate_rollingstd_36').alias('rotate_rollingstd_36'), 
                       F.stddev('pressure_rollingstd_36').alias('pressure_rollingstd_36'), 
                       F.stddev('vibration_rollingstd_36').alias('vibration_rollingstd_36'), ))

In [25]:
# errors = spark.sql('select e.* from machine_errors_data as e inner join machine_data as m on e.machineID = m.machineID')

In [5]:
errors = spark.sql('''select a.msdyn_customerassetid, reverse(split(i.msdyn_name,'_'))[0] as component, msdyn_alerttime from dataverse_d365env_generated_data.msdyn_customerasset as a 
left join dataverse_d365env_generated_data.connection as c on a.msdyn_customerassetid = c.record1id
left join dataverse_d365env_generated_data.msdyn_iotdevice as i on i.msdyn_iotdeviceid = c.record2id
inner join dataverse_d365env_generated_data.msdyn_iotalert as ia on ia.msdyn_customerasset = a.msdyn_customerassetid
''')

In [6]:
# create a column for each errorID 
error_ind = (errors.groupBy("msdyn_customerassetid","msdyn_alerttime","component").pivot('component')
             .agg(F.count('msdyn_customerassetid').alias('dummy')).drop('component').fillna(0)
             .groupBy("msdyn_customerassetid","msdyn_alerttime")
             .agg(F.sum('1').alias('error1sum'), 
                  F.sum('2').alias('error2sum'), 
                  F.sum('2').alias('error3sum'), 
                  F.sum('4').alias('error4sum')))

# join the telemetry data with errors
error_count = (telemetry.join(error_ind, 
                              ((telemetry['msdyn_customerassetid'] == error_ind['msdyn_customerassetid']) 
                               & (telemetry['tdatetime'] == error_ind['msdyn_alerttime'])), "left")
               .drop('volt', 'rotate', 'pressure', 'vibration')
               .drop(error_ind.msdyn_customerassetid).drop(error_ind.msdyn_alerttime))
            #    .fillna(0))

error_features = ['error1sum','error2sum', 'error3sum', 'error4sum']

wSpec = Window.partitionBy('msdyn_customerassetid').orderBy('tdatetime').rowsBetween(1-24, 0)
for col_name in error_features:
    # We're only interested in the erros in the previous 24 hours.
    error_count = error_count.withColumn(col_name+'_rollingmean_24', 
                                         F.avg(col(col_name)).over(wSpec))

error_feat = (error_count.withColumn("dt_truncated", dt_truncated)
              .drop('error1sum', 'error2sum', 'error3sum', 'error4sum').fillna(0)
              .groupBy("msdyn_customerassetid","dt_truncated")
              .agg(F.mean('error1sum_rollingmean_24').alias('error1sum_rollingmean_24'), 
                   F.mean('error2sum_rollingmean_24').alias('error2sum_rollingmean_24'), 
                   F.mean('error3sum_rollingmean_24').alias('error3sum_rollingmean_24'), 
                   F.mean('error4sum_rollingmean_24').alias('error4sum_rollingmean_24')))

In [28]:
# maint = spark.sql('select * from machine_maint_data')

In [7]:
maint = spark.sql('''select a.msdyn_customerassetid, msdyn_iotalertname, reverse(split(i.msdyn_name,'_'))[0] as component, wo.modifiedon
    from dataverse_d365env_generated_data.msdyn_customerasset as a 
    inner join dataverse_d365env_generated_data.connection as c on a.msdyn_customerassetid = c.record1id
    inner join dataverse_d365env_generated_data.msdyn_iotdevice as i on i.msdyn_iotdeviceid = c.record2id
    inner join dataverse_d365env_generated_data.msdyn_workorder as wo on wo.msdyn_customerasset = a.msdyn_customerassetid and i.msdyn_iotdeviceid = wo. msdyn_iotalertname''')

# display(maint.take(10))

In [8]:
# create a column for each component replacement
import pyspark.sql.functions as F
maint_replace = (maint.groupBy("msdyn_customerassetid","modifiedon","component").pivot('component')
                 .agg(F.count('msdyn_customerassetid').alias('dummy')).fillna(0)
                 .groupBy("msdyn_customerassetid","modifiedon")
                 .agg(F.sum('1').alias('component1_age'), 
                      F.sum('2').alias('component2_age'), 
                      F.sum('3').alias('component3_age'),
                      F.sum('4').alias('component4_age')))

maint_replace = maint_replace.withColumnRenamed('modifiedon','datetime_maint')

print(maint_replace.count())
# display(maint_replace.take(2))

In [9]:
# We want to align the component information on telemetry features timestamps.
telemetry_times = (telemetry_feat.select(telemetry_feat.msdyn_customerassetid, telemetry_feat.dt_truncated)
                   .withColumnRenamed('dt_truncated','datetime_tel'))

# Grab component 1 records
maint_comp1 = (maint_replace.where(col("component1_age") == '1').withColumnRenamed('datetime','datetime_maint')
               .drop('component2_age', 'component3_age', 'component4_age'))

# Within each machine, get the last replacement date for each timepoint
maint_tel_comp1 = (telemetry_times.join(maint_comp1, 
                                        ((telemetry_times ['msdyn_customerassetid']== maint_comp1['msdyn_customerassetid']) 
                                         & (telemetry_times ['datetime_tel'] > maint_comp1['datetime_maint']) 
                                         & ( maint_comp1['component1_age'] == '1')))
                   .drop(maint_comp1.msdyn_customerassetid))

# Calculate the number of days between replacements
comp1 = (maint_tel_comp1.withColumn("sincelastcomp1", 
                                    datediff(maint_tel_comp1.datetime_tel, maint_tel_comp1.datetime_maint))
         .drop(maint_tel_comp1.datetime_maint).drop(maint_tel_comp1.component1_age))

print(comp1.count())
# display(comp1.take(5))
# display(comp1.filter(comp1.machineID == '625').orderBy(comp1.datetime_tel).limit(20))

In [10]:
# Grab component 2 records
maint_comp2 = (maint_replace.where(col("component2_age") == '1').withColumnRenamed('datetime','datetime_maint')
               .drop('component1_age', 'component3_age', 'component4_age'))

# Within each machine, get the last replacement date for each timepoint
maint_tel_comp2 = (telemetry_times.join(maint_comp2, 
                                        ((telemetry_times ['msdyn_customerassetid']== maint_comp2['msdyn_customerassetid']) 
                                         & (telemetry_times ['datetime_tel'] > maint_comp2['datetime_maint']) 
                                         & ( maint_comp2['component2_age'] == '1')))
                   .drop(maint_comp2.msdyn_customerassetid))

# Calculate the number of days between replacements
comp2 = (maint_tel_comp2.withColumn("sincelastcomp2", 
                                    datediff(maint_tel_comp2.datetime_tel, maint_tel_comp2.datetime_maint))
         .drop(maint_tel_comp2.datetime_maint).drop(maint_tel_comp2.component2_age))

# print(comp2.count())
# display(comp2.take(5))
# display(comp2.filter(comp2.machineID == '625').orderBy(comp2.datetime_tel).limit(5))

In [11]:
# Grab component 3 records
maint_comp3 = (maint_replace.where(col("component3_age") == '1').withColumnRenamed('datetime','datetime_maint')
               .drop('component1_age', 'component2_age', 'component4_age'))

# Within each machine, get the last replacement date for each timepoint
maint_tel_comp3 = (telemetry_times.join(maint_comp3, ((telemetry_times ['msdyn_customerassetid']==maint_comp3['msdyn_customerassetid']) 
                                                      & (telemetry_times ['datetime_tel'] > maint_comp3['datetime_maint']) 
                                                      & ( maint_comp3['component3_age'] == '1')))
                   .drop(maint_comp3.msdyn_customerassetid))

# Calculate the number of days between replacements
comp3 = (maint_tel_comp3.withColumn("sincelastcomp3", 
                                    datediff(maint_tel_comp3.datetime_tel, maint_tel_comp3.datetime_maint))
         .drop(maint_tel_comp3.datetime_maint).drop(maint_tel_comp3.component3_age))


# print(comp3.count())
# comp3.filter(comp3.machineID == '625').orderBy(comp3.datetime_tel).limit(5).toPandas().head(5)

In [12]:
# Grab component 4 records
maint_comp4 = (maint_replace.where(col("component4_age") == '1').withColumnRenamed('datetime','datetime_maint')
               .drop('component1_age', 'component2_age', 'component3_age'))

# Within each machine, get the last replacement date for each timepoint
maint_tel_comp4 = telemetry_times.join(maint_comp4, ((telemetry_times['msdyn_customerassetid']==maint_comp4['msdyn_customerassetid']) 
                                                     & (telemetry_times['datetime_tel'] > maint_comp4['datetime_maint']) 
                                                     & (maint_comp4['component4_age'] == '1'))).drop(maint_comp4.msdyn_customerassetid)

# Calculate the number of days between replacements
comp4 = (maint_tel_comp4.withColumn("sincelastcomp4", 
                                    datediff(maint_tel_comp4.datetime_tel, maint_tel_comp4.datetime_maint))
         .drop(maint_tel_comp4.datetime_maint).drop(maint_tel_comp4.component4_age))

# print(comp4.count())
# comp4.filter(comp4.machineID == '625').orderBy(comp4.datetime_tel).limit(5).toPandas().head(5)

In [13]:
# Join component 3 and 4
comp3_4 = (comp3.join(comp4, ((comp3['msdyn_customerassetid'] == comp4['msdyn_customerassetid']) 
                              & (comp3['datetime_tel'] == comp4['datetime_tel'])), "left")
           .drop(comp4.msdyn_customerassetid).drop(comp4.datetime_tel))

# Join component 2 to 3 and 4
comp2_3_4 = (comp2.join(comp3_4, ((comp2['msdyn_customerassetid'] == comp3_4['msdyn_customerassetid']) 
                                  & (comp2['datetime_tel'] == comp3_4['datetime_tel'])), "left")
             .drop(comp3_4.msdyn_customerassetid).drop(comp3_4.datetime_tel))

# Join component 1 to 2, 3 and 4
comps_feat = (comp1.join(comp2_3_4, ((comp1['msdyn_customerassetid'] == comp2_3_4['msdyn_customerassetid']) 
                                      & (comp1['datetime_tel'] == comp2_3_4['datetime_tel'])), "left")
               .drop(comp2_3_4.msdyn_customerassetid).drop(comp2_3_4.datetime_tel)
               .groupBy("msdyn_customerassetid", "datetime_tel")
               .agg(F.max('sincelastcomp1').alias('sincelastcomp1'), 
                    F.max('sincelastcomp2').alias('sincelastcomp2'), 
                    F.max('sincelastcomp3').alias('sincelastcomp3'), 
                    F.max('sincelastcomp4').alias('sincelastcomp4'))
               .fillna(0))

# Choose the time_val hour timestamps to align the data
dt_truncated = ((round(unix_timestamp(col("datetime_tel")) / time_val) * time_val).cast("timestamp"))

# Collect data
maint_feat = (comps_feat.withColumn("dt_truncated", dt_truncated)
              .groupBy("msdyn_customerassetid","dt_truncated")
              .agg(F.mean('sincelastcomp1').alias('component1_age'), 
                   F.mean('sincelastcomp2').alias('component2_age'), 
                   F.mean('sincelastcomp3').alias('component3_age'), 
                   F.mean('sincelastcomp4').alias('component4_age')))

# print(maint_feat.count())
# display(maint_feat.limit(10))

In [36]:
# machines = spark.sql('select * from machine_data')

In [14]:
spark.sql('refresh table dataverse_d365env_generated_data.msdyn_customerasset')
machines = spark.sql('select msdyn_customerassetid,msdyn_name, msdyn_productname, modifiedon from dataverse_d365env_generated_data.msdyn_customerasset')
machines = machines.withColumn("age", floor(datediff(F.current_timestamp(), F.col("modifiedon"))/365.25))
# display(machines)

In [15]:
machines_feat = machines

In [16]:
# join error features with component maintenance features
error_maint = (error_feat.join(maint_feat, 
                               ((error_feat['msdyn_customerassetid'] == maint_feat['msdyn_customerassetid']) 
                                & (error_feat['dt_truncated'] == maint_feat['dt_truncated'])), "left")
               .drop(maint_feat.msdyn_customerassetid).drop(maint_feat.dt_truncated))

# now join that with machines features
error_maint_feat = (error_maint.join(machines_feat, 
                                     ((error_maint['msdyn_customerassetid'] == machines_feat['msdyn_customerassetid'])), "left")
                    .drop(machines_feat.msdyn_customerassetid))

# Clean up some unecessary columns
error_maint_feat = error_maint_feat.select([c for c in error_maint_feat.columns if c not in 
                                            {'error1sum', 'error2sum', 'error3sum', 'error4sum', 'error5sum'}])

# join telemetry with error/maint/machine features to create final feature matrix
final_feat = (telemetry_feat.join(error_maint_feat, 
                                  ((telemetry_feat['msdyn_customerassetid'] == error_maint_feat['msdyn_customerassetid']) 
                                   & (telemetry_feat['dt_truncated'] == error_maint_feat['dt_truncated'])), "left")
              .drop(error_maint_feat.msdyn_customerassetid).drop(error_maint_feat.dt_truncated))

In [17]:
final_feat.dtypes

In [41]:
# failures = spark.sql('select * from machine_failures_data')

In [18]:
failures = spark.sql('''select a.msdyn_customerassetid as msdyn_customerassetid1, reverse(split(i.msdyn_name,'_'))[0] as component, in.modifiedon as incidenttime from dataverse_d365env_generated_data.msdyn_customerasset as a 
left join dataverse_d365env_generated_data.connection as c on a.msdyn_customerassetid = c.record1id
left join dataverse_d365env_generated_data.msdyn_iotdevice as i on i.msdyn_iotdeviceid = c.record2id
inner join dataverse_d365env_generated_data.msdyn_iotalert as ia on ia.msdyn_customerasset = a.msdyn_customerassetid
inner join dataverse_d365env_generated_data.incident as in on in.msdyn_iotalert = ia.msdyn_iotalertid
''')

In [19]:
dt_truncated = ((round(unix_timestamp(col("incidenttime")) / time_val) * time_val).cast("timestamp"))

fail_diff = (failures.withColumn("dt_truncated", dt_truncated)
             .drop(failures.incidenttime))

In [20]:
# map the failure data to final feature matrix
labeled_features = (final_feat.join(fail_diff, 
                                    ((final_feat['msdyn_customerassetid'] == fail_diff['msdyn_customerassetid1']) 
                                     & (final_feat['dt_truncated'] == fail_diff['dt_truncated'])), "left")
                    .drop(fail_diff.msdyn_customerassetid1)
                    .drop(fail_diff.dt_truncated)
                    .withColumn('failure', F.when(col('component') == "1", 1.0).otherwise(col('component')))
                    .withColumn('failure', F.when(col('component') == "2", 2.0).otherwise(col('component')))
                    .withColumn('failure', F.when(col('component') == "3", 3.0).otherwise(col('component')))
                    .withColumn('failure', F.when(col('component') == "4", 4.0).otherwise(col('component')))
                    )

labeled_features = (labeled_features.withColumn("failure", labeled_features.failure.cast(DoubleType())).fillna(0))

In [None]:
# # To get the frequency of each component failure 
# # df = labeled_features.select(labeled_features.failure).toPandas()
# # df['failure'].value_counts()
# labeled_features.groupBy('failure').count().show()

In [21]:
# lag values to manually backfill label (bfill =7)
my_window = Window.partitionBy('msdyn_customerassetid').orderBy(labeled_features.dt_truncated.desc())

# Create the previous 7 days 
labeled_features = (labeled_features.withColumn("prev_value1", 
                                                F.lag(labeled_features.failure).
                                                over(my_window)).fillna(0))
labeled_features = (labeled_features.withColumn("prev_value2", 
                                                F.lag(labeled_features.prev_value1).
                                                over(my_window)).fillna(0))
labeled_features = (labeled_features.withColumn("prev_value3", 
                                                F.lag(labeled_features.prev_value2).
                                                over(my_window)).fillna(0))
labeled_features = (labeled_features.withColumn("prev_value4", 
                                                F.lag(labeled_features.prev_value3).
                                                over(my_window)).fillna(0)) 
labeled_features = (labeled_features.withColumn("prev_value5", 
                                                F.lag(labeled_features.prev_value4).
                                                over(my_window)).fillna(0)) 
labeled_features = (labeled_features.withColumn("prev_value6", 
                                                F.lag(labeled_features.prev_value5).
                                                over(my_window)).fillna(0))
labeled_features = (labeled_features.withColumn("prev_value7", 
                                                F.lag(labeled_features.prev_value6).
                                                over(my_window)).fillna(0))

# Create a label features
labeled_features = (labeled_features.withColumn('label', labeled_features.failure + 
                                                labeled_features.prev_value1 +
                                                labeled_features.prev_value2 +
                                                labeled_features.prev_value3 +
                                                labeled_features.prev_value4 +
                                                labeled_features.prev_value5 + 
                                                labeled_features.prev_value6 + 
                                                labeled_features.prev_value7))

# Restrict the label to be on the range of 0:4, and remove extra columns
labeled_features = (labeled_features.withColumn('label_e', F.when(col('label') > 4, 4.0)
                                                .otherwise(col('label')))
                    .drop(labeled_features.prev_value1).drop(labeled_features.prev_value2)
                    .drop(labeled_features.prev_value3).drop(labeled_features.prev_value4)
                    .drop(labeled_features.prev_value5).drop(labeled_features.prev_value6)
                    .drop(labeled_features.prev_value7).drop(labeled_features.label))

In [22]:
labeled_features.write.mode("overwrite").saveAsTable('machine_data_features')

In [None]:
# #%%sql

# select label_e, count(*) from machine_data_features 
# group by label_e