# Data engineering with Databricks - Building our Manufacturing IOT platform

Building an IOT platform requires to ingest multiple datasources.  

It's a complex process requiring batch loads and streaming ingestion to support real-time insights, used for real-time monitoring among others.

Ingesting, transforming and cleaning data to create clean SQL tables for our downstream user (Data Analysts and Data Scientists) is complex.

<link href="https://fonts.googleapis.com/css?family=DM Sans" rel="stylesheet"/>
<div style="width:300px; text-align: center; float: right; margin: 30px 60px 10px 10px;  font-family: 'DM Sans'">
  <div style="height: 300px; width: 300px;  display: table-cell; vertical-align: middle; border-radius: 50%; border: 25px solid #fcba33ff;">
    <div style="font-size: 70px;  color: #70c4ab; font-weight: bold">
      73%
    </div>
    <div style="color: #1b5162;padding: 0px 30px 0px 30px;">of enterprise data goes unused for analytics and decision making</div>
  </div>
  <div style="color: #bfbfbf; padding-top: 5px">Source: Forrester</div>
</div>

<br>

## <img src="https://raw.githubusercontent.com/databricks-demos/dbdemos-resources/refs/heads/main/images/john.png" style="float:left; margin: -35px 0px 0px 0px" width="80px"> John, as Data engineer, spends immense time….


* Hand-coding data ingestion & transformations and dealing with technical challenges:<br>
  *Supporting streaming and batch, handling concurrent operations, small files issues, GDPR requirements, complex DAG dependencies...*<br><br>
* Building custom frameworks to enforce quality and tests<br><br>
* Building and maintaining scalable infrastructure, with observability and monitoring<br><br>
* Managing incompatible governance models from different systems
<br style="clear: both">

This results in **operational complexity** and overhead, requiring expert profile and ultimatly **putting data projects at risk**.


<!-- Collect usage data (view). Remove it to disable collection. View README for more details.  -->
<img width="1px" src="https://ppxrzfxige.execute-api.us-west-2.amazonaws.com/v1/analytics?category=lakehouse&org_id=1444828305810485&notebook=%2F01-Data-ingestion%2F01.1-DLT-Wind-Turbine-SQL&demo_name=lakehouse-iot-platform&event=VIEW&path=%2F_dbdemos%2Flakehouse%2Flakehouse-iot-platform%2F01-Data-ingestion%2F01.1-DLT-Wind-Turbine-SQL&version=1&user_hash=285b361d244545c3c8d34d28625a163a3358e9bd9b085d42cefe76c0b086ca5d">

# Simplify Ingestion and Transformation with Delta Live Tables

<img style="float: right" width="500px" src="https://raw.githubusercontent.com/databricks-demos/dbdemos-resources/refs/heads/main/images/manufacturing/lakehouse-iot-turbine/team_flow_john.png" />

In this notebook, we'll work as a Data Engineer to build our IOT platform. <br>
We'll ingest and clean our raw data sources to prepare the tables required for our BI & ML workload.

Databricks simplifies this task with Delta Live Table (DLT) by making Data Engineering accessible to all.

DLT allows Data Analysts to create advanced pipeline with plain SQL.

## Delta Live Table: A simple way to build and manage data pipelines for fresh, high quality data!

<div>
  <div style="width: 45%; float: left; margin-bottom: 10px; padding-right: 45px">
    <p>
      <img style="width: 50px; float: left; margin: 0px 5px 30px 0px;" src="https://raw.githubusercontent.com/QuentinAmbard/databricks-demo/main/retail/resources/images/lakehouse-retail/logo-accelerate.png"/> 
      <strong>Accelerate ETL development</strong> <br/>
      Enable analysts and data engineers to innovate rapidly with simple pipeline development and maintenance 
    </p>
    <p>
      <img style="width: 50px; float: left; margin: 0px 5px 30px 0px;" src="https://raw.githubusercontent.com/QuentinAmbard/databricks-demo/main/retail/resources/images/lakehouse-retail/logo-complexity.png"/> 
      <strong>Remove operational complexity</strong> <br/>
      By automating complex administrative tasks and gaining broader visibility into pipeline operations
    </p>
  </div>
  <div style="width: 48%; float: left">
    <p>
      <img style="width: 50px; float: left; margin: 0px 5px 30px 0px;" src="https://raw.githubusercontent.com/QuentinAmbard/databricks-demo/main/retail/resources/images/lakehouse-retail/logo-trust.png"/> 
      <strong>Trust your data</strong> <br/>
      With built-in quality controls and quality monitoring to ensure accurate and useful BI, Data Science, and ML 
    </p>
    <p>
      <img style="width: 50px; float: left; margin: 0px 5px 30px 0px;" src="https://raw.githubusercontent.com/QuentinAmbard/databricks-demo/main/retail/resources/images/lakehouse-retail/logo-stream.png"/> 
      <strong>Simplify batch and streaming</strong> <br/>
      With self-optimization and auto-scaling data pipelines for batch or streaming processing 
    </p>
</div>
</div>

<br style="clear:both">

<img src="https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-logo.png" style="float: right;" width="200px">

## Delta Lake

All the tables we'll create in the Data Intelligence Platform will be stored as Delta Lake table. Delta Lake is an open storage framework for reliability and performance.<br>
It provides many functionalities (ACID Transaction, DELETE/UPDATE/MERGE, Clone zero copy, Change data Capture...)<br>
For more details on Delta Lake, run dbdemos.install('delta-lake')



## Building a Delta Live Table pipeline to ingest IOT sensor and detect faulty equipments

In this example, we'll implement a end 2 end DLT pipeline consuming our Wind Turbine sensor data. <br/>
We'll use the medaillon architecture but we could build star schema, data vault or any other modelisation.

We'll incrementally load new data with the autoloader, enrich this information and then load a model from MLFlow to perform our predictive maintenance analysis.

This information will then be used to build our AI/BI dashboards to track our wind turbine farm status, faulty equipment impact and recommendations to reduce potential downtime.

### Dataset:

* <strong>Turbine metadata</strong>: Turbine ID, location (1 row per turbine)
* <strong>Turbine sensor stream</strong>: Realtime streaming flow from wind turbine sensor (vibration, energy produced, speed etc)
* <strong>Turbine status</strong>: Historical turbine status based to analyse which part is faulty (used as label in our ML model)


Let's implement the following flow: 
 
<div><img width="1100px" src="https://github.com/databricks-demos/dbdemos-resources/raw/main/images/manufacturing/lakehouse-iot-turbine/lakehouse-manuf-iot-turbine-full.png"/></div>

*Note that we're including the ML model our [Data Scientist built]($../04-Data-Science-ML/04.1-automl-predictive-maintenance-turbine) using Databricks AutoML to predict the churn. We'll cover that in the next section.*

Your DLT Pipeline has been installed and started for you! Open the <a dbdemos-pipeline-id="dlt-iot-wind-turbine" href="#joblist/pipelines/e6653220-a089-4b36-9474-c84a5328706d" target="_blank">IOT Wind Turbine Delta Live Table pipeline</a> to see it in action.<br/>

*(Note: The pipeline will automatically start once the initialization job is completed with dbdemos, this might take a few minutes... Check installation logs for more details)*

In [0]:
-- %python #uncomment to scan the data from the notebook
-- display(spark.read.json('/Volumes/main/dbdemos_iot_turbine_team3_fs/turbine_raw_landing/turbine'))
-- display(spark.read.json('/Volumes/main/dbdemos_iot_turbine_team3_fs/turbine_raw_landing/historical_turbine_status')) #Historical turbine status analyzed

In [0]:
-- %python #uncomment to scan the data from the notebook
-- display(spark.read.parquet('/Volumes/main/dbdemos_iot_turbine_team3_fs/turbine_raw_landing/incoming_data'))


## 1/ Ingest data: ingest data using Auto Loader (cloudFiles)

<div><img src="https://github.com/databricks-demos/dbdemos-resources/raw/main/images/manufacturing/lakehouse-iot-turbine/lakehouse-manuf-iot-turbine-1.png" width="700px" style="float: right"/></div>

Ingesting data from stream source can be challenging. In this example we'll incrementally load the files from our cloud storage, only getting the new one (in near real-time or triggered every X hours).

Note that while our streaming data is added to our cloud storage, we could easily ingest from kafka directly : `.format(kafka)`

Auto-loader provides for you:

- Schema inference and evolution
- Scalability handling million of files
- Simplicity: just define your ingestion folder, Databricks take care of the rest!

For more details on autoloader, run `dbdemos.install('auto-loader')`

Let's use it to our pipeline and ingest the raw JSON & CSV data being delivered in our blob storage `/demos/manufacturing/iot_turbine/...`. 

In [0]:
CREATE STREAMING TABLE turbine (
  CONSTRAINT correct_schema EXPECT (_rescued_data IS NULL)
)
COMMENT "Turbine details, with location, wind turbine model type etc"
AS SELECT * FROM cloud_files("/Volumes/main/dbdemos_iot_turbine_team3_fs/turbine_raw_landing/turbine", "json", map("cloudFiles.inferColumnTypes" , "true"))

In [0]:
CREATE STREAMING TABLE sensor_bronze (
  CONSTRAINT correct_schema EXPECT (_rescued_data IS NULL),
  CONSTRAINT correct_energy EXPECT (energy IS NOT NULL and energy > 0) ON VIOLATION DROP ROW
)
COMMENT "Raw sensor data coming from json files ingested in incremental with Auto Loader: vibration, energy produced etc. 1 point every X sec per sensor."
AS SELECT * FROM cloud_files("/Volumes/main/dbdemos_iot_turbine_team3_fs/turbine_raw_landing/incoming_data", "parquet", map("cloudFiles.inferColumnTypes" , "true"))

In [0]:
CREATE STREAMING TABLE historical_turbine_status (
  CONSTRAINT correct_schema EXPECT (_rescued_data IS NULL)
)
COMMENT "Turbine status to be used as label in our predictive maintenance model (to know which turbine is potentially faulty)"
AS SELECT * FROM cloud_files("/Volumes/main/dbdemos_iot_turbine_team3_fs/turbine_raw_landing/historical_turbine_status", "json", map("cloudFiles.inferColumnTypes" , "true"))

In [0]:
CREATE STREAMING TABLE parts 
COMMENT "Turbine parts from our manufacturing system"
AS SELECT * FROM cloud_files("/Volumes/main/dbdemos_iot_turbine_team3_fs/turbine_raw_landing/parts", "json", map("cloudFiles.inferColumnTypes" , "true"))


## 2/ Compute aggregations: merge sensor data at an hourly level

<div><img src="https://github.com/databricks-demos/dbdemos-resources/raw/main/images/manufacturing/lakehouse-iot-turbine/lakehouse-manuf-iot-turbine-2.png" width="700px" style="float: right"/></div>

To be able to analyze our data, we'll compute statistical metrics every at an ourly basis, such as standard deviation and quartiles.

*Note that we'll be recomputing all the table to keep this example simple. We could instead UPSERT the current hour with a stateful agregation*

In [0]:
CREATE MATERIALIZED VIEW sensor_hourly (
  CONSTRAINT turbine_id_valid EXPECT (turbine_id IS not NULL)  ON VIOLATION DROP ROW,
  CONSTRAINT timestamp_valid EXPECT (hourly_timestamp IS not NULL)  ON VIOLATION DROP ROW
)
COMMENT "Hourly sensor stats, used to describe signal and detect anomalies"
AS
SELECT turbine_id,
      date_trunc('hour', from_unixtime(timestamp)) AS hourly_timestamp, 
      avg(energy)          as avg_energy,
      stddev_pop(sensor_A) as std_sensor_A,
      stddev_pop(sensor_B) as std_sensor_B,
      stddev_pop(sensor_C) as std_sensor_C,
      stddev_pop(sensor_D) as std_sensor_D,
      stddev_pop(sensor_E) as std_sensor_E,
      stddev_pop(sensor_F) as std_sensor_F,
      percentile_approx(sensor_A, array(0.1, 0.3, 0.6, 0.8, 0.95)) as percentiles_sensor_A,
      percentile_approx(sensor_B, array(0.1, 0.3, 0.6, 0.8, 0.95)) as percentiles_sensor_B,
      percentile_approx(sensor_C, array(0.1, 0.3, 0.6, 0.8, 0.95)) as percentiles_sensor_C,
      percentile_approx(sensor_D, array(0.1, 0.3, 0.6, 0.8, 0.95)) as percentiles_sensor_D,
      percentile_approx(sensor_E, array(0.1, 0.3, 0.6, 0.8, 0.95)) as percentiles_sensor_E,
      percentile_approx(sensor_F, array(0.1, 0.3, 0.6, 0.8, 0.95)) as percentiles_sensor_F
  FROM LIVE.sensor_bronze GROUP BY hourly_timestamp, turbine_id


## 3/ Build our table used by ML Engineers: join sensor aggregates with wind turbine metadata and historical status

<div><img src="https://github.com/databricks-demos/dbdemos-resources/raw/main/images/manufacturing/lakehouse-iot-turbine/lakehouse-manuf-iot-turbine-3.png" width="700px" style="float: right"/></div>

Next, we'll build a final table joining sensor aggregate with our turbine information.

This table will contain all the data required to be able to infer potential turbine failure.

In [0]:
CREATE MATERIALIZED VIEW turbine_training_dataset 
COMMENT "Hourly sensor stats, used to describe signal and detect anomalies"
AS
SELECT CONCAT(t.turbine_id, '-', s.start_time) AS composite_key, array(std_sensor_A, std_sensor_B, std_sensor_C, std_sensor_D, std_sensor_E, std_sensor_F) AS sensor_vector, * except(t._rescued_data, s._rescued_data, m.turbine_id) FROM LIVE.sensor_hourly m
    INNER JOIN LIVE.turbine t USING (turbine_id)
    INNER JOIN LIVE.historical_turbine_status s ON m.turbine_id = s.turbine_id AND from_unixtime(s.start_time) < m.hourly_timestamp AND from_unixtime(s.end_time) > m.hourly_timestamp


## 4/ Get model from registry and add flag faulty turbines

<div><img src="https://github.com/databricks-demos/dbdemos-resources/raw/main/images/manufacturing/lakehouse-iot-turbine/lakehouse-manuf-iot-turbine-4.png" width="700px" style="float: right"/></div>

Our Data scientist team has been able to read data from our previous table and build a predictive maintenance model using Auto ML and saved it into Databricks Model registry (we'll see how to do that next).

One of the key value of the Lakehouse is that we can easily load this model and predict faulty turbines with into our pipeline directly.

Note that we don't have to worry about the model framework (sklearn or other), MLFlow abstract that for us.

All we have to do is load the model, and call it as a SQL function (or python)

In [0]:
-- specify all the field to enforce the primary key
CREATE MATERIALIZED VIEW turbine_current_features
 (
    turbine_id STRING NOT NULL,
    hourly_timestamp TIMESTAMP,
    avg_energy DOUBLE,
    std_sensor_A DOUBLE,
    std_sensor_B DOUBLE,
    std_sensor_C DOUBLE,
    std_sensor_D DOUBLE,
    std_sensor_E DOUBLE,
    std_sensor_F DOUBLE,
    country STRING,
    lat STRING,
    location STRING,
    long STRING,
    model STRING,
    state STRING,
   CONSTRAINT turbine_current_features_pk PRIMARY KEY (turbine_id))
COMMENT "Wind turbine features based on model prediction"
AS
WITH latest_metrics AS (
  SELECT *, ROW_NUMBER() OVER(PARTITION BY turbine_id, hourly_timestamp ORDER BY hourly_timestamp DESC) AS row_number FROM LIVE.sensor_hourly
)
SELECT * EXCEPT(m.row_number,_rescued_data, percentiles_sensor_A,percentiles_sensor_B, percentiles_sensor_C, percentiles_sensor_D, percentiles_sensor_E, percentiles_sensor_F) 
FROM latest_metrics m
   INNER JOIN LIVE.turbine t USING (turbine_id)
   WHERE m.row_number=1 and turbine_id is not null

In [0]:
-- Note: The AI model predict_maintenance is loaded from the 01.2-DLT-Wind-Turbine-SQL-UDF notebook
CREATE MATERIALIZED VIEW turbine_current_status 
COMMENT "Wind turbine last status based on model prediction"
AS
SELECT *, 
    predict_maintenance(hourly_timestamp, avg_energy, std_sensor_A, std_sensor_B, std_sensor_C, std_sensor_D, std_sensor_E, std_sensor_F, location, model, state) as prediction 
  FROM LIVE.turbine_current_features


## Conclusion
Our <a dbdemos-pipeline-id="dlt-iot-wind-turbine" href="#joblist/pipelines/e6653220-a089-4b36-9474-c84a5328706d" target="_blank">DLT Data Pipeline</a> is now ready using purely SQL. We have an end 2 end cycle, and our ML model has been integrated seamlessly by our Data Engineering team.


For more details on model training, open the [model training notebook]($../04-Data-Science-ML/04.1-automl-iot-turbine-predictive-maintenance)

Our final dataset includes our ML prediction for our Predictive Maintenance use-case. 

We are now ready to build our <a dbdemos-dashboard-id="turbine-analysis" href="/sql/dashboardsv3/01f02600455d1bdaaeb15ed8fab3e23f">AI/BI Dashboard</a> to track the main KPIs and status of our entire Wind Turbine Farm and build complete <a dbdemos-dashboard-id="turbine-predictive" href="/sql/dashboardsv3/01f02600455d1bdaaeb15ed8fab3e23f">Predictive maintenance AI/BI Dashboard</a>.


<img src="https://github.com/databricks-demos/dbdemos-resources/raw/main/images/manufacturing/lakehouse-iot-turbine/lakehouse-manuf-iot-dashboard-1.png" width="1000px">