In [0]:
print("hello")

hello


In [0]:
!pip install pydeequ

[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:
%restart_python

In [0]:
dbutils.widgets.text("input_path", "")
input_path = dbutils.widgets.get("input_path")

print(f"Reading data from: {input_path}")

Reading data from: output/


In [0]:
# Your SAS token (remove leading '?')
sas_token = "sp=racwdli&st=2025-08-15T06:55:55Z&se=2025-08-15T15:10:55Z&sv=2024-11-04&sr=c&sig=FGloLJXmQl2lYrh8zSTOOiwGtXdrLOvR9jseeLM1ZL4%3D"

# 2. Container and account from your path
# Example: abfss://output@storage1srs.dfs.core.windows.net/...
container_name = "folder"
storage_account_name = "storage1srs"

# 3. Configure Spark to use SAS token
spark.conf.set(
    f"fs.azure.sas.{container_name}.{storage_account_name}.blob.core.windows.net",
    sas_token
)

dbutils.fs.ls(f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/")

[FileInfo(path='wasbs://folder@storage1srs.blob.core.windows.net/_SUCCESS', name='_SUCCESS', size=0, modificationTime=1755238335000),
 FileInfo(path='wasbs://folder@storage1srs.blob.core.windows.net/output/', name='output/', size=0, modificationTime=1755243083000),
 FileInfo(path='wasbs://folder@storage1srs.blob.core.windows.net/part-00000-69436a35-67b2-4b62-96ac-cde31f75e264-c000.csv', name='part-00000-69436a35-67b2-4b62-96ac-cde31f75e264-c000.csv', size=520, modificationTime=1755238335000)]

In [0]:
d_path = f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/{input_path}"

# 4. Read the CSV
print(f"Reading IoT data from: {d_path}")

df = spark.read.csv(d_path)

Reading IoT data from: wasbs://folder@storage1srs.blob.core.windows.net/output/


In [0]:
columns = [
    "PlantID",
    "DeviceID",
    "Timestamp_UTC",
    "Temperature_C",
    "Humidity_pct",
    "Vibration_mm_s",
    "Pressure_bar",
    "Energy_kWh",
    "Status"
]

# Read CSV without using the header from file
df = spark.read.format("csv") \
    .option("header", "false") \
    .option("inferSchema", "true") \
    .load(d_path) \
    .toDF(*columns)

df.printSchema()
df.show()

root
 |-- PlantID: string (nullable = true)
 |-- DeviceID: string (nullable = true)
 |-- Timestamp_UTC: timestamp (nullable = true)
 |-- Temperature_C: double (nullable = true)
 |-- Humidity_pct: double (nullable = true)
 |-- Vibration_mm_s: double (nullable = true)
 |-- Pressure_bar: double (nullable = true)
 |-- Energy_kWh: double (nullable = true)
 |-- Status: string (nullable = true)

+-------+---------+-------------------+-------------+------------+--------------+------------+----------+-------+
|PlantID| DeviceID|      Timestamp_UTC|Temperature_C|Humidity_pct|Vibration_mm_s|Pressure_bar|Energy_kWh| Status|
+-------+---------+-------------------+-------------+------------+--------------+------------+----------+-------+
|PLANT_B|SENSOR_B2|2025-08-14 09:00:20|         69.5|        47.5|           2.3|         3.0|       5.0|     OK|
|PLANT_A|SENSOR_A2|2025-08-14 09:00:05|         71.9|        46.2|           2.0|         3.3|       5.5|     OK|
|PLANT_A|SENSOR_A1|2025-08-14 09:00:00

In [0]:
import os
os.environ["SPARK_VERSION"] = "3.3"
os.environ['PYSPARK_SUBMIT_ARGS'] = '--conf spark.sql.shuffle.partitions=8 pyspark-shell'
import json
from pyspark.sql import SparkSession
from pydeequ.checks import Check, CheckLevel
from pydeequ.verification import VerificationSuite
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import col

In [0]:
df = df.withColumnRenamed("Timestamp (UTC)", "Timestamp_UTC")

In [0]:
check = (
    Check(spark, CheckLevel.Error, "IoT Data Quality Checks")
    .hasSize(lambda x: x >= 3)
    .isNonNegative("Temperature_C")
    .isNonNegative("Humidity_pct")
    .isNonNegative("Vibration_mm_s")
    .isNonNegative("Pressure_bar")
    .isNonNegative("Energy_kWh")
)

In [0]:
result = (
    VerificationSuite(spark)
    .onData(df)
    .addCheck(check)
    .run()
)


In [0]:
result_dict = result.checkResults
report_json = json.dumps(result_dict, indent=4)

In [0]:
print(report_json)

[
    {
        "check_status": "Success",
        "check_level": "Error",
        "constraint_status": "Success",
        "check": "IoT Data Quality Checks",
        "constraint_message": "",
        "constraint": "SizeConstraint(Size(None))"
    },
    {
        "check_status": "Success",
        "check_level": "Error",
        "constraint_status": "Success",
        "check": "IoT Data Quality Checks",
        "constraint_message": "",
        "constraint": "ComplianceConstraint(Compliance(Temperature_C is non-negative,COALESCE(CAST(Temperature_C AS DECIMAL(20,10)), 0.0) >= 0,None))"
    },
    {
        "check_status": "Success",
        "check_level": "Error",
        "constraint_status": "Success",
        "check": "IoT Data Quality Checks",
        "constraint_message": "",
        "constraint": "ComplianceConstraint(Compliance(Humidity_pct is non-negative,COALESCE(CAST(Humidity_pct AS DECIMAL(20,10)), 0.0) >= 0,None))"
    },
    {
        "check_status": "Success",
        "che

In [0]:

from pydeequ.verification import VerificationSuite, VerificationResult

In [0]:
# 4. Save Data Quality Result to Delta Table
result_df = VerificationResult.checkResultsAsDataFrame(spark, result)
result_df.show(truncate=False)

result_df.write.format("delta").mode("overwrite").saveAsTable("iot_data_quality_results")



+-----------------------+-----------+------------+--------------------------------------------------------------------------------------------------------------------------------+-----------------+------------------+
|check                  |check_level|check_status|constraint                                                                                                                      |constraint_status|constraint_message|
+-----------------------+-----------+------------+--------------------------------------------------------------------------------------------------------------------------------+-----------------+------------------+
|IoT Data Quality Checks|Error      |Success     |SizeConstraint(Size(None))                                                                                                      |Success          |                  |
|IoT Data Quality Checks|Error      |Success     |ComplianceConstraint(Compliance(Temperature_C is non-negative,COALESCE(CAST(Temper

In [0]:
# 5. Silver Layer - Add Fahrenheit column
silver_df = df.withColumn("Temperature_F", col("Temperature_C") * 9/5 + 32)
silver_df.write.format("delta").mode("overwrite").saveAsTable("iot_silver")

In [0]:
from pyspark.sql import functions as F

# Gold Layer - Aggregate per Plant with safe column names
gold_df = silver_df.groupBy("PlantID").agg(
    F.avg("Temperature_C").alias("avg_temperature_c"),
    F.max("Vibration_mm_s").alias("max_vibration_mm_s"),
    F.min("Pressure_bar").alias("min_pressure_bar")
)

gold_df.write.format("delta").mode("overwrite").saveAsTable("iot_gold")


In [0]:
silver_path = "wasbs://folder@storage1srs.blob.core.windows.net/output/iot_silver"
gold_path = "wasbs://folder@storage1srs.blob.core.windows.net/output/iot_gold"
gold_df.write.format("delta").mode("overwrite").save(gold_path)
silver_df.write.format("delta").mode("overwrite").save(silver_path)




In [0]:
print("=== Deequ Report ===")
print(report_json)
 


=== Deequ Report ===
[
    {
        "check_status": "Success",
        "check_level": "Error",
        "constraint_status": "Success",
        "check": "IoT Data Quality Checks",
        "constraint_message": "",
        "constraint": "SizeConstraint(Size(None))"
    },
    {
        "check_status": "Success",
        "check_level": "Error",
        "constraint_status": "Success",
        "check": "IoT Data Quality Checks",
        "constraint_message": "",
        "constraint": "ComplianceConstraint(Compliance(Temperature_C is non-negative,COALESCE(CAST(Temperature_C AS DECIMAL(20,10)), 0.0) >= 0,None))"
    },
    {
        "check_status": "Success",
        "check_level": "Error",
        "constraint_status": "Success",
        "check": "IoT Data Quality Checks",
        "constraint_message": "",
        "constraint": "ComplianceConstraint(Compliance(Humidity_pct is non-negative,COALESCE(CAST(Humidity_pct AS DECIMAL(20,10)), 0.0) >= 0,None))"
    },
    {
        "check_status": "S

In [0]:
import json
from pydeequ.verification import VerificationResult

# Convert to Spark DF
result_df = VerificationResult.checkResultsAsDataFrame(spark, result)

# Collect to Python list of dicts
result_list = [row.asDict() for row in result_df.collect()]
result_dict = {"result": result_list}

# Write single JSON file to DBFS (using local path /dbfs/)
local_path = "/dbfs/FileStore/shared_uploads/traininguser5@sudosu.ai/deequ_result.json"
with open(local_path, "w") as f:
    json.dump(result_dict, f, indent=4)

print(f" JSON saved at dbfs:/FileStore/shared_uploads/traininguser5@sudosu.ai/deequ_result.json")

# Read as text
with open("/dbfs/FileStore/shared_uploads/traininguser5@sudosu.ai/deequ_result.json", "r") as f:
    print(f.read())
    
if any(r.get("check_status") != "Success" for r in result_list):
    raise SystemExit("Data quality checks failed")

print("Deequ checks passed ")

 JSON saved at dbfs:/FileStore/shared_uploads/traininguser5@sudosu.ai/deequ_result.json
{
    "result": [
        {
            "check": "IoT Data Quality Checks",
            "check_level": "Error",
            "check_status": "Success",
            "constraint": "SizeConstraint(Size(None))",
            "constraint_status": "Success",
            "constraint_message": ""
        },
        {
            "check": "IoT Data Quality Checks",
            "check_level": "Error",
            "check_status": "Success",
            "constraint": "ComplianceConstraint(Compliance(Temperature_C is non-negative,COALESCE(CAST(Temperature_C AS DECIMAL(20,10)), 0.0) >= 0,None))",
            "constraint_status": "Success",
            "constraint_message": ""
        },
        {
            "check": "IoT Data Quality Checks",
            "check_level": "Error",
            "check_status": "Success",
            "constraint": "ComplianceConstraint(Compliance(Humidity_pct is non-negative,COALESCE(CA

