In [180]:
%%pyspark
df = spark.read.option("multiline","true").json('abfss://vgdlg2container@vgdlg2.dfs.core.windows.net/sensor_data.json')
display(df)

StatementMeta(vgsparkpool, 1, 181, Finished, Available)

SynapseWidget(Synapse.DataFrame, 16a87480-caee-43da-b671-3fdb8ffced99)

In [181]:
df.printSchema()

StatementMeta(vgsparkpool, 1, 182, Finished, Available)

root
 |-- concentration: double (nullable = true)
 |-- gas_type: string (nullable = true)
 |-- light_intensity: long (nullable = true)
 |-- location: string (nullable = true)
 |-- metadata: struct (nullable = true)
 |    |-- calibration_date: string (nullable = true)
 |    |-- manufacturer: string (nullable = true)
 |    |-- model: string (nullable = true)
 |    |-- serial_number: string (nullable = true)
 |-- power_consumption: long (nullable = true)
 |-- pressure: double (nullable = true)
 |-- reading: string (nullable = true)
 |-- sensor_id: string (nullable = true)
 |-- sensor_type: string (nullable = true)
 |-- status: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- unit: string (nullable = true)
 |-- vibration_intensity: double (nullable = true)



In [182]:
df.describe().show()

StatementMeta(vgsparkpool, 1, 183, Finished, Available)

+-------+-----------------+--------+------------------+------------+-----------------+------------------+--------------------+---------+-----------+--------+--------------------+--------------------+-------------------+
|summary|    concentration|gas_type|   light_intensity|    location|power_consumption|          pressure|             reading|sensor_id|sensor_type|  status|           timestamp|                unit|vibration_intensity|
+-------+-----------------+--------+------------------+------------+-----------------+------------------+--------------------+---------+-----------+--------+--------------------+--------------------+-------------------+
|  count|               14|      16|                13|         129|                1|                13|                  72|      129|        129|       2|                 129|                 127|                 14|
|   mean|3.178571428571428|    null| 536.1538461538462|        null|           1250.0|100.23846153846155|  125.375281690

In [183]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType, LongType

schema = StructType([
    StructField("concentration", DoubleType(), True),
    StructField("gas_type", StringType(), True),
    StructField("light_intensity", LongType(), True),
    StructField("location", StringType(), True),
    StructField("metadata", StructType([
        StructField("calibration_date", StringType(), True),
        StructField("manufacturer", StringType(), True),
        StructField("model", StringType(), True),
        StructField("serial_number", StringType(), True)
    ]), True),
    StructField("power_consumption", LongType(), True),
    StructField("pressure", DoubleType(), True),
    StructField("reading", StringType(), True),
    StructField("sensor_id", StringType(), True),
    StructField("sensor_type", StringType(), True),
    StructField("status", StringType(), True),
    StructField("timestamp", TimestampType(), True),
    StructField("unit", StringType(), True),
    StructField("vibration_intensity", DoubleType(), True)
])


StatementMeta(vgsparkpool, 1, 184, Finished, Available)

In [184]:
structured_df = df.select(
    df["concentration"].cast(DoubleType()),
    df["gas_type"],
    df["light_intensity"].cast(LongType()),
    df["location"],
    df["metadata"],
    df["power_consumption"].cast(LongType()),
    df["pressure"].cast(DoubleType()),
    df["reading"],
    df["sensor_id"],
    df["sensor_type"],
    df["status"],
    df["timestamp"].cast(TimestampType()),
    df["unit"],
    df["vibration_intensity"].cast(DoubleType())
)


StatementMeta(vgsparkpool, 1, 185, Finished, Available)

In [186]:
from pyspark.sql.functions import col

# Selecting all columns and expanding the metadata struct
expanded_df = df.select(
    "*",
    col("metadata.calibration_date").alias("calibration_date"),
    col("metadata.manufacturer").alias("manufacturer"),
    col("metadata.model").alias("model"),
    col("metadata.serial_number").alias("serial_number")
)

# Dropping the original metadata column
expanded_df = expanded_df.drop("metadata")

StatementMeta(vgsparkpool, 1, 187, Finished, Available)

In [187]:
#To sort the columns in the database
column_order = [
    "sensor_id",
    "sensor_type",
    "location",
    "timestamp",
    "reading",
    "unit",
    "concentration",
    "gas_type",
    "light_intensity",
    "power_consumption",
    "pressure",
    "status",
    "vibration_intensity" ,
    "calibration_date", 
    "manufacturer",
     "model",
      "serial_number"
]

# Selecting columns in the specified order
arranged_df = expanded_df.select(column_order)
display(arranged_df)

StatementMeta(vgsparkpool, 1, 188, Finished, Available)

SynapseWidget(Synapse.DataFrame, 87a97b63-a9e7-4b57-ad83-80c65c974ac2)

In [188]:
# Separating the DataFrame based on sensor types


temperature_df = arranged_df.filter(arranged_df["sensor_type"] == "temperature")
# Remove columns with all rows as undefined in the temperature DataFrame
columns_to_drop = [col_name for col_name in temperature_df.columns if temperature_df.filter((col(col_name) != "undefined") & (col(col_name) != "")).count() == 0]
temperature_df = temperature_df.drop(*columns_to_drop)
# Show the contents of the temperature DataFrame
display(temperature_df)

StatementMeta(vgsparkpool, 1, 189, Finished, Available)

SynapseWidget(Synapse.DataFrame, ec48bbb6-6019-4433-a909-071077dbf70a)

In [189]:
humidity_df = arranged_df.filter(arranged_df["sensor_type"] == "humidity")
# Remove columns with all rows as undefined in the humidity DataFrame
columns_to_drop = [col_name for col_name in humidity_df.columns if humidity_df.filter((col(col_name) != "undefined") & (col(col_name) != "")).count() == 0]
humidity_df = humidity_df.drop(*columns_to_drop)
# Show the contents of the humidity DataFrame
display(humidity_df)

StatementMeta(vgsparkpool, 1, 190, Finished, Available)

SynapseWidget(Synapse.DataFrame, 59360ac0-1dc5-4ca1-9f80-9a7b547d2eb4)

In [190]:
pressure_df = arranged_df.filter(arranged_df["sensor_type"] == "pressure")
# Remove columns with all rows as undefined in the pressure DataFrame
columns_to_drop = [col_name for col_name in pressure_df.columns if pressure_df.filter((col(col_name) != "undefined") & (col(col_name) != "")).count() == 0]
pressure_df = pressure_df.drop(*columns_to_drop)
# Show the contents of the pressure DataFrame
display(pressure_df)

StatementMeta(vgsparkpool, 1, 191, Finished, Available)

SynapseWidget(Synapse.DataFrame, 1dc3a7a5-e2bd-4278-b8c7-7ab77ec9bd1e)

In [191]:
temperature_df.write.csv("abfss://vgdlg2container@vgdlg2.dfs.core.windows.net/temperature_data.csv", header=True, mode="overwrite")
humidity_df.write.csv("abfss://vgdlg2container@vgdlg2.dfs.core.windows.net/humidity_data.csv", header=True, mode="overwrite")
pressure_df.write.csv("abfss://vgdlg2container@vgdlg2.dfs.core.windows.net/pressure_data.csv", header=True, mode="overwrite")

StatementMeta(vgsparkpool, 1, 192, Finished, Available)