In [0]:
dataset = spark.read.load("dbfs:/FileStore/tables/ai4i2020___ai4i2020__6_.csv", format = "csv", header= "true", inferSchema = "true")

In [0]:
from pyspark.sql.functions import col,count,when
dataset.select([count(when(col(c).isNull(), c)).alias(c) for c in dataset.columns]).show()

In [0]:
trainDF, testDF = dataset.randomSplit([0.8, 0.2], seed=42)
print(trainDF.cache().count()) # Cache because accessing training data multiple times
print(testDF.count())

In [0]:
display(trainDF)

UDI,Product_ID,Type,Air_temperature,Process_temperature,Rotational_speed,Torque,TWF,HDF,PWF,OSF,RNF,Machine_failure
1,M14860,M,298,309,1551,43,0,0,0,0,0,PASS
2,L47181,L,298,309,1408,46,0,0,0,0,0,PASS
4,L47183,L,298,309,1433,40,1,0,0,0,0,FAIL
5,L47184,L,298,309,1408,40,0,0,0,0,0,PASS
6,M14865,M,298,309,1425,42,0,1,0,0,0,FAIL
8,L47187,L,298,309,1527,40,0,0,0,0,0,PASS
10,M14869,M,299,309,1741,28,0,0,0,0,0,PASS
11,H29424,H,298,309,1782,24,0,0,0,0,0,PASS
12,H29425,H,299,309,1423,44,0,0,0,1,0,FAIL
13,M14872,M,299,309,1339,51,1,0,0,0,0,FAIL


In [0]:
display(trainDF.select("Air_temperature").summary())

summary,Air_temperature
count,8079.0
mean,300.0388661963114
stddev,2.028937596632208
min,295.0
25%,298.0
50%,300.0
75%,302.0
max,305.0


In [0]:
display(trainDF
        .groupBy("Type")
        .count()
        .sort("count", ascending=False))

Type,count
L,4822
M,2433
H,824


In [0]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder
 
categoricalCols = ["Product_ID", "Type"]
 
# The following two lines are estimators. They return functions that we will later apply to transform the dataset.
stringIndexer = StringIndexer(inputCols=categoricalCols, outputCols=[x + "Index" for x in categoricalCols]).setHandleInvalid("keep")
encoder = OneHotEncoder(inputCols=stringIndexer.getOutputCols(), outputCols=[x + "OHE" for x in categoricalCols]) 
 
# The label column ("Machine failure") is also a string value - it has two possible values, "PASS" and "FAIL". 
# Convert it to a numeric value using StringIndexer.
labelToIndex = StringIndexer(inputCol="Machine_failure", outputCol="label")

In [0]:
stringIndexerModel = stringIndexer.fit(trainDF)
display(stringIndexerModel.transform(trainDF))

UDI,Product_ID,Type,Air_temperature,Process_temperature,Rotational_speed,Torque,TWF,HDF,PWF,OSF,RNF,Machine_failure,Product_IDIndex,TypeIndex
1,M14860,M,298,309,1551,43,0,0,0,0,0,PASS,5646.0,1.0
2,L47181,L,298,309,1408,46,0,0,0,0,0,PASS,824.0,0.0
4,L47183,L,298,309,1433,40,1,0,0,0,0,FAIL,825.0,0.0
5,L47184,L,298,309,1408,40,0,0,0,0,0,PASS,826.0,0.0
6,M14865,M,298,309,1425,42,0,1,0,0,0,FAIL,5647.0,1.0
8,L47187,L,298,309,1527,40,0,0,0,0,0,PASS,827.0,0.0
10,M14869,M,299,309,1741,28,0,0,0,0,0,PASS,5648.0,1.0
11,H29424,H,298,309,1782,24,0,0,0,0,0,PASS,0.0,2.0
12,H29425,H,299,309,1423,44,0,0,0,1,0,FAIL,1.0,2.0
13,M14872,M,299,309,1339,51,1,0,0,0,0,FAIL,5649.0,1.0


In [0]:
from pyspark.ml.feature import VectorAssembler
 
# This includes both the numeric columns and the one-hot encoded binary vector columns in our dataset.
numericCols = ["UDI", "Air_temperature", "Process_temperature", "Rotational_speed", "Torque","TWF",
              "HDF","PWF","OSF","RNF"]
assemblerInputs = [c + "OHE" for c in categoricalCols] + numericCols
vecAssembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")

In [0]:
from pyspark.ml.classification import LogisticRegression
 
lr = LogisticRegression(featuresCol="features", labelCol="label", regParam=1.0)

In [0]:
from pyspark.ml import Pipeline
 
# Define the pipeline based on the stages created in previous steps.
pipeline = Pipeline(stages=[stringIndexer, encoder, labelToIndex , vecAssembler, lr])
 
# Define the pipeline model.
pipelineModel = pipeline.fit(trainDF)
 
# Apply the pipeline model to the test dataset.
predDF = pipelineModel.transform(testDF)

In [0]:
display(predDF.select("features", "label", "prediction", "probability"))

features,label,prediction,probability
"Map(vectorType -> sparse, length -> 8092, indices -> List(8079, 8082, 8083, 8084, 8085, 8086), values -> List(1.0, 3.0, 298.0, 309.0, 1498.0, 49.0))",0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.8100204505614224, 0.18997954943857764))"
"Map(vectorType -> sparse, length -> 8092, indices -> List(8079, 8082, 8083, 8084, 8085, 8086), values -> List(1.0, 7.0, 298.0, 309.0, 1558.0, 42.0))",0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.8118392596467682, 0.18816074035323183))"
"Map(vectorType -> sparse, length -> 8092, indices -> List(8080, 8082, 8083, 8084, 8085, 8086, 8089), values -> List(1.0, 9.0, 298.0, 309.0, 1667.0, 29.0, 1.0))",1.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.7039133444693658, 0.2960866555306342))"
"Map(vectorType -> sparse, length -> 8092, indices -> List(8080, 8082, 8083, 8084, 8085, 8086, 8088), values -> List(1.0, 14.0, 299.0, 309.0, 1742.0, 30.0, 1.0))",1.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.7018582787872603, 0.29814172121273974))"
"Map(vectorType -> sparse, length -> 8092, indices -> List(8080, 8082, 8083, 8084, 8085, 8086), values -> List(1.0, 20.0, 299.0, 309.0, 1632.0, 33.0))",0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.8154542141328469, 0.1845457858671531))"
"Map(vectorType -> sparse, length -> 8092, indices -> List(8079, 8082, 8083, 8084, 8085, 8086), values -> List(1.0, 24.0, 299.0, 309.0, 1758.0, 26.0))",0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.8182518621513506, 0.1817481378486494))"
"Map(vectorType -> sparse, length -> 8092, indices -> List(8079, 8082, 8083, 8084, 8085, 8086), values -> List(1.0, 30.0, 299.0, 309.0, 1693.0, 30.0))",0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.8172047560189407, 0.18279524398105929))"
"Map(vectorType -> sparse, length -> 8092, indices -> List(8080, 8082, 8083, 8084, 8085, 8086), values -> List(1.0, 36.0, 299.0, 309.0, 1452.0, 49.0))",0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.8112794227993688, 0.18872057720063118))"
"Map(vectorType -> sparse, length -> 8092, indices -> List(8079, 8082, 8083, 8084, 8085, 8086, 8088), values -> List(1.0, 46.0, 299.0, 309.0, 1489.0, 49.0, 1.0))",1.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.6962588748995497, 0.30374112510045026))"
"Map(vectorType -> sparse, length -> 8092, indices -> List(8080, 8082, 8083, 8084, 8085, 8086, 8089), values -> List(1.0, 47.0, 299.0, 309.0, 1843.0, 26.0, 1.0))",1.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.7082808227131682, 0.29171917728683183))"


In [0]:
display(pipelineModel.stages[-1], predDF.drop("prediction", "rawPrediction", "probability"), "ROC")

False Positive Rate,True Positive Rate,Threshold
0.0,0.0,0.4235082639529082
0.0,0.0666666666666666,0.4235082639529082
0.0,0.1333333333333333,0.3223338305953623
0.0,0.2,0.3166097368624835
0.0,0.2666666666666666,0.3161909471723105
0.0,0.3333333333333333,0.3059539341760187
0.0,0.4,0.2986552598275716
0.0,0.4666666666666667,0.2978833764314473
0.0,0.5333333333333333,0.2978272420730511
0.0,0.6,0.2973811399843016


In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
 
bcEvaluator = BinaryClassificationEvaluator(metricName="areaUnderROC")
print(f"Area under ROC curve: {bcEvaluator.evaluate(predDF)}")
 
mcEvaluator = MulticlassClassificationEvaluator(metricName="accuracy")
print(f"Accuracy: {mcEvaluator.evaluate(predDF)}")

In [0]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
 
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5, 2.0])
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
             .build())

In [0]:
# Create a 3-fold CrossValidator
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=bcEvaluator, numFolds=3, parallelism = 4)
 
# Run cross validations. This step takes a few minutes and returns the best model found from the cross validation.
cvModel = cv.fit(trainDF)

In [0]:
# Use the model identified by the cross-validation to make predictions on the test dataset
cvPredDF = cvModel.transform(testDF)
 
# Evaluate the model's performance based on area under the ROC curve and accuracy 
print(f"Area under ROC curve: {bcEvaluator.evaluate(cvPredDF)}")
print(f"Accuracy: {mcEvaluator.evaluate(cvPredDF)}")

In [0]:
cvPredDF.createOrReplaceTempView("finalPredictions")

In [0]:
%sql
SELECT Type, prediction, count(*) AS count
FROM finalPredictions
GROUP BY Type, prediction
ORDER BY Type

Type,prediction,count
H,1.0,14
H,0.0,165
L,0.0,1084
L,1.0,94
M,1.0,62
M,0.0,502


In [0]:
%sql
SELECT Product_ID, prediction, count(*) AS count
FROM finalPredictions
WHERE prediction = 1
GROUP BY Product_ID, prediction
ORDER BY Product_ID

Product_ID,prediction,count
H30339,1.0,1
H32504,1.0,1
H33492,1.0,1
H33769,1.0,1
H34462,1.0,1
H35973,1.0,1
H38089,1.0,1
H38549,1.0,1
H38822,1.0,1
H39210,1.0,1


In [0]:
%sql
SELECT UDI, prediction, count(*) AS count
FROM finalPredictions
WHERE prediction = 1
GROUP BY UDI, prediction
ORDER BY UDI

UDI,prediction,count
46,1.0,1
70,1.0,1
161,1.0,1
377,1.0,1
417,1.0,1
480,1.0,1
582,1.0,1
761,1.0,1
812,1.0,1
825,1.0,1


In [0]:
%scala
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.eventhubs._
import com.microsoft.azure.eventhubs._

val namespaceName = "azuretrial"
val eventHubName = "final"
val sasKeyName = "root"
val sasKey = "ryNdDPw2AaX8BvMYjyAWZMUQV8z+bh1W4SEkKaLuRQg="
val connStr = new com.microsoft.azure.eventhubs.ConnectionStringBuilder()
            .setNamespaceName(namespaceName)
            .setEventHubName(eventHubName)
            .setSasKeyName(sasKeyName)
            .setSasKey(sasKey)

val customEventhubParameters =
  EventHubsConf(connStr.toString())
  .setMaxEventsPerTrigger(50)

val incomingStream = spark.readStream.format("eventhubs").options(customEventhubParameters.toMap).load()

val messages =
  incomingStream
  .withColumn("Offset", $"offset".cast(LongType))
  .withColumn("Time (readable)", $"enqueuedTime".cast(TimestampType))
  .withColumn("Timestamp", $"enqueuedTime".cast(LongType))
  .withColumn("Data", $"body".cast(StringType))
  .select("Data")

//messages.writeStream.outputMode("append").format("console").option("truncate", false).start().awaitTermination()

messages.writeStream.format("delta").outputMode("append").option("checkpointLocation","/data/events/_checkpoints/data_file_1").table("test")

In [0]:
%scala
val messages = spark.readStream.table("e3").select("Data").writeStream.outputMode("append")

In [0]:
rdd4 = spark.sql("""select 
          substring(Data,1,charindex(' ',data)-1) as UDI, 
          substring(data,4,charindex(' ',data)+4) as Product_ID, 
          substring(data,11,charindex(' ',data)-1) as Type, 
          substring(data,14,charindex(' ',data)+0) as Air_temperature,
          substring(data,17,charindex(' ',data)+1) as Process_temperature,
          substring(data,21,charindex(' ',data)+2) as Rotational_speed,
          substring(data,26,charindex(' ',data)+0) as Torque,
          substring(data,29,charindex(' ',data)-1) as TWF,
          substring(data,31,charindex(' ',data)-1) as HDF,
          substring(data,33,charindex(' ',data)-1) as PWF,
          substring(data,35,charindex(' ',data)-1) as OSF,
          substring(data,37,charindex(' ',data)-1) as RNF from test""").rdd

In [0]:
df = rdd4.toDF()
from pyspark.sql.types import DecimalType, IntegerType
df1 = df.withColumn("UDI", df["UDI"].cast(DecimalType())).withColumn("Air_temperature", df["Air_temperature"].cast(DecimalType())).withColumn("Process_temperature", df["Process_temperature"].cast(DecimalType())).withColumn("Rotational_speed", df["Rotational_speed"].cast(DecimalType())).withColumn("Torque", df["Torque"].cast(DecimalType())).withColumn("TWF", df["TWF"].cast(IntegerType())).withColumn("HDF", df["HDF"].cast(IntegerType())).withColumn("PWF", df["PWF"].cast(IntegerType())).withColumn("OSF", df["OSF"].cast(IntegerType())).withColumn("RNF", df["RNF"].cast(IntegerType()))

In [0]:
df1.show()

In [0]:
test = cvModel.transform(df1)
test.createOrReplaceTempView("finalPredictions")

In [0]:
spark.sql("describe extended finalPredictions").show()

In [0]:
spark.sql("select UDI, probability, prediction from finalPredictions").show(200)