In [2]:
!apt-get update -qq
!apt-get install -y openjdk-8-jdk-headless -qq
!pip install pyspark==3.5 delta-spark fastapi "uvicorn[standard]" -q

W: Skipping acquire of configured file 'main/source/Sources' as repository 'https://r2u.stat.illinois.edu/ubuntu jammy InRelease' does not seem to provide it (sources.list entry misspelt?)
Selecting previously unselected package libxtst6:amd64.
(Reading database ... 126333 files and directories currently installed.)
Preparing to unpack .../libxtst6_2%3a1.2.3-1build4_amd64.deb ...
Unpacking libxtst6:amd64 (2:1.2.3-1build4) ...
Selecting previously unselected package openjdk-8-jre-headless:amd64.
Preparing to unpack .../openjdk-8-jre-headless_8u442-b06~us1-0ubuntu1~22.04_amd64.deb ...
Unpacking openjdk-8-jre-headless:amd64 (8u442-b06~us1-0ubuntu1~22.04) ...
Selecting previously unselected package openjdk-8-jdk-headless:amd64.
Preparing to unpack .../openjdk-8-jdk-headless_8u442-b06~us1-0ubuntu1~22.04_amd64.deb ...
Unpacking openjdk-8-jdk-headless:amd64 (8u442-b06~us1-0ubuntu1~22.04) ...
Setting up libxtst6:amd64 (2:1.2.3-1build4) ...
Setting up openjdk-8-jre-headless:amd64 (8u442-b06~us1

In [3]:
from pyspark.sql import SparkSession
from delta import configure_spark_with_delta_pip
from pyspark.sql.functions import to_date, col, current_timestamp
from pyspark.sql.types import IntegerType, LongType

builder = SparkSession.builder \
    .appName("SmartFieldPoc") \
    .master("local[*]") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
spark = configure_spark_with_delta_pip(builder).getOrCreate()

source_path = "/content/predictive_maintenance_dataset.csv"
df = spark.read.csv(source_path, header=True, inferSchema=True)

In [6]:
# @title Bronze Layer

bronze_path = "/tmp/delta/bronze_pmd"
df.write.format("delta").mode("overwrite").save(bronze_path)
bronze_count = spark.read.format("delta").load(bronze_path).count()
print(f"Bronze table written to {bronze_path} with {bronze_count} rows")

Bronze table written to /tmp/delta/bronze_pmd with 124494 rows


In [None]:
# @title Silver Layer & Transformations

silver_df = spark.read.format("delta").load(bronze_path)
# 1. Parse date and cast key columns to proper types
silver_df = (
    silver_df
    .withColumn("event_date", to_date(col("date"), "M/d/yyyy"))
    .withColumn("failure", col("failure").cast(IntegerType()))
    .withColumn("metric1", col("metric1").cast(LongType()))
    .withColumn("metric2", col("metric2").cast(IntegerType()))
    .withColumn("metric3", col("metric3").cast(IntegerType()))
    .withColumn("metric4", col("metric4").cast(IntegerType()))
    .withColumn("metric5", col("metric5").cast(IntegerType()))
    .withColumn("metric6", col("metric6").cast(LongType()))
    .withColumn("metric7", col("metric7").cast(IntegerType()))
    .withColumn("metric8", col("metric8").cast(IntegerType()))
    .withColumn("metric9", col("metric9").cast(IntegerType()))
)
# 2. Identify & print duplicate records before dropping
from pyspark.sql.functions import count

dup_df = silver_df.groupBy("device","event_date").count().filter("count > 1")
dup_count = dup_df.count()
if dup_count > 0:
    print(f"Found {dup_count} duplicate device-date entries:")
    dup_df.show(truncate=False)
    # Show sample duplicate rows for inspection
    silver_df.join(
        dup_df.select("device","event_date"), ["device","event_date"]
    ).orderBy("device","event_date").show(truncate=False)
else:
    print("No duplicate device-date entries found.")


    # 3. Drop duplicates and remove rows with critical nulls
silver_df = silver_df.dropDuplicates(["device","event_date"]) \
    .filter(col("device").isNotNull() & col("event_date").isNotNull())

# 4. Filter out unrealistic extreme values (e.g., negative metrics) Filter out unrealistic extreme values (e.g., negative metrics)
silver_df = silver_df.filter(
    (col("metric1") >= 0) & (col("metric2") >= 0) & (col("metric6") >= 0)
)
# 5. Add metadata columns for auditing (e.g., ingest timestamp)
silver_df = silver_df.withColumn("ingest_time", current_timestamp())

# 5. Write Silver table partitioned by event_date for performance & allow schema evolution
silver_path = "/tmp/delta/silver_pmd"
silver_df.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .option("mergeSchema", "true") \
    .partitionBy("event_date") \
    .save(silver_path)
silver_count = spark.read.format("delta").load(silver_path).count()
print(f"Silver table written with partitions to {silver_path} with {silver_count} rows")

Found 1 duplicate device-date entries:
+--------+----------+-----+
|device  |event_date|count|
+--------+----------+-----+
|S1F0R4Q8|2015-07-10|2    |
+--------+----------+-----+

+--------+----------+---------+-------+---------+-------+-------+-------+-------+-------+-------+-------+-------+
|device  |event_date|date     |failure|metric1  |metric2|metric3|metric4|metric5|metric6|metric7|metric8|metric9|
+--------+----------+---------+-------+---------+-------+-------+-------+-------+-------+-------+-------+-------+
|S1F0R4Q8|2015-07-10|7/10/2015|0      |192721392|0      |0      |0      |8      |213700 |0      |0      |0      |
|S1F0R4Q8|2015-07-10|7/10/2015|0      |192721392|0      |0      |0      |8      |213700 |0      |0      |0      |
+--------+----------+---------+-------+---------+-------+-------+-------+-------+-------+-------+-------+-------+

Silver table written with partitions to /tmp/delta/silver_pmd with 124493 rows


In [None]:
# @title Data Validation & Quality Checks

from pyspark.sql.functions import count, when, col

silver = spark.read.format("delta").load(silver_path)
total_silver = silver.count()
print(f"Total rows in Silver: {total_silver}")

# 6.0 Bronze vs Silver row count validation
if bronze_count == total_silver:
    print("All bronze rows have been processed into Silver.")
else:
    diff = bronze_count - total_silver
    print(f"Warning: Bronze vs Silver row count mismatch. Bronze has {bronze_count} rows; Silver has {total_silver} rows.")
    print(f"Total rows dropped or filtered during transformation: {diff}")

# 6.1 Null checks for critical columns
null_counts = silver.select(
    *[count(when(col(c).isNull(), True)).alias(c + '_nulls')
      for c in ['device','event_date','failure','metric1','metric2','metric6']]
)
null_counts.show()
assert all(v == 0 for v in null_counts.collect()[0]), "Null values found in critical columns"
print("No nulls in critical columns.")

# 6.2 Duplicate check: ensure each device-date pair is unique
dup_count = silver.groupBy("device","event_date").count().filter("count > 1").count()
assert dup_count == 0, f"Found {dup_count} duplicate device-date entries"
print("No duplicate device-date records.")

# 6.3 Range checks for metrics: negative values indicate errors
range_issues = silver.select(
    count(when(col('metric1') < 0, True)).alias('metric1_neg'),
    count(when(col('metric2') < 0, True)).alias('metric2_neg'),
    count(when(col('metric6') < 0, True)).alias('metric6_neg')
)
range_issues.show()
assert range_issues.collect()[0]['metric1_neg'] == 0, "Negative values found in metric1"
print("No negative metric values.")

# 6.4 Failure flag sanity: values should only be 0 or 1
invalid_fail = silver.filter(~col('failure').isin([0,1])).count()
assert invalid_fail == 0, f"Invalid failure flags: {invalid_fail}"
print("Failure flags are valid (0 or 1).")

print("Data quality validation completed successfully.")


Total rows in Silver: 124493
Total rows dropped or filtered during transformation: 1
+------------+----------------+-------------+-------------+-------------+-------------+
|device_nulls|event_date_nulls|failure_nulls|metric1_nulls|metric2_nulls|metric6_nulls|
+------------+----------------+-------------+-------------+-------------+-------------+
|           0|               0|            0|            0|            0|            0|
+------------+----------------+-------------+-------------+-------------+-------------+

No nulls in critical columns.
No duplicate device-date records.
+-----------+-----------+-----------+
|metric1_neg|metric2_neg|metric6_neg|
+-----------+-----------+-----------+
|          0|          0|          0|
+-----------+-----------+-----------+

No negative metric values.
Failure flags are valid (0 or 1).
Metric1 p1: 0.0, p99: 244140480.0
Metric2 p1: 0.0, p99: 64968.0
Data quality validation completed successfully.


In [None]:
# @title Feature Engineering
from pyspark.sql.window import Window
from pyspark.sql.functions import avg, max, dayofmonth, month, lag, col

# Time-series features: rolling avg of metric1 over past 3 days
window_spec = Window.partitionBy("device").orderBy("event_date").rowsBetween(-3, 0)
silver_ts = silver_df.withColumn("rolling_avg_metric1_3d", avg("metric1").over(window_spec))
# Lag feature: previous day's metric2
silver_ts = silver_ts.withColumn("lag_metric2_1d", lag("metric2", 1).over(Window.partitionBy("device").orderBy("event_date")))

# Categorical features: extract month and day
silver_ts = silver_ts.withColumn("event_month", month(col("event_date"))) \
                   .withColumn("event_day", dayofmonth(col("event_date")))

# Aggregate to feature table
features_df = (
    silver_ts
    .groupBy("device")
    .agg(
        avg("rolling_avg_metric1_3d").alias("avg_roll3d_metric1"),
        avg("lag_metric2_1d").alias("avg_lag1d_metric2"),
        avg("metric1").alias("avg_metric1"),
        max("metric2").alias("max_metric2")
    )
)
# Join back a categorical example
features_df = features_df.join(
    silver_ts.select("device","event_month","event_day").distinct(), on="device", how="left"
)

features_df.show(5)
features_path = "/tmp/delta/features_pmd"
features_df.write.format("delta").mode("overwrite").save(features_path)
print(f"Feature table written to {features_path}")


+--------+--------------------+-----------------+--------------------+-----------+-----------+---------+
|  device|  avg_roll3d_metric1|avg_lag1d_metric2|         avg_metric1|max_metric2|event_month|event_day|
+--------+--------------------+-----------------+--------------------+-----------+-----------+---------+
|S1F01085|1.2712392355555554E8|             55.8|1.1593295066666667E8|         56|          1|        3|
|S1F01085|1.2712392355555554E8|             55.8|1.1593295066666667E8|         56|          1|        1|
|S1F01085|1.2712392355555554E8|             55.8|1.1593295066666667E8|         56|          1|        2|
|S1F01085|1.2712392355555554E8|             55.8|1.1593295066666667E8|         56|          1|        4|
|S1F01085|1.2712392355555554E8|             55.8|1.1593295066666667E8|         56|          1|        5|
+--------+--------------------+-----------------+--------------------+-----------+-----------+---------+
only showing top 5 rows

Feature table written to /tmp/

In [None]:
# @title Sample API to ML

pandas_df = features_df.toPandas()
print("Converted features to Pandas DataFrame:", pandas_df.shape)

# Convert pandas data to dict for API
feature_dict = pandas_df.to_dict(orient='list')

Converted features to Pandas DataFrame: (124493, 7)


In [None]:
api_code = f'''
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import pandas as pd

# Load features into memory
features = pd.DataFrame({feature_dict})

app = FastAPI(title="Predictive Maintenance Feature API", description="Serves features for ML models", version="1.0")

class FeatureResponse(BaseModel):
    device: str
    avg_roll3d_metric1: float
    avg_lag1d_metric2: float
    avg_metric1: float
    max_metric2: float
    event_month: int
    event_day: int

@app.get("/features/{{device}}", response_model=FeatureResponse, tags=["Features"])
def get_features(device: str):
    df = features[features["device"] == device]
    if df.empty:
        raise HTTPException(status_code=404, detail="Device not found")
    record = df.iloc[0].to_dict()
    return record

# Run with: uvicorn app:app --reload --host 0.0.0.0 --port 8000
'''
with open("app.py","w") as f:
    f.write(api_code)
print("FastAPI app created: app.py (Swagger available at /docs)")

# Cell 10: (Optional) Start API server
# !uvicorn app:app --reload --host 0.0.0.0 --port 8000

FastAPI app created: app.py (Swagger available at /docs)
