In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg
import json
from azure.storage.blob import BlobServiceClient


In [0]:
# Initialize Spark session
spark = SparkSession.builder.appName("IoTSensor").getOrCreate()

# Azure Blob Storage configuration
storage_account_name = "datastorage4zims"
storage_account_key = "RZswKvPTf1D45dEOu1mBDMi206vsDxw0eGRvgT9/kwfTSLoNFx+jMa36pwumSubp7J0Z0Wa/no/i+ASt83SUGQ=="
connection_string = (f"DefaultEndpointsProtocol=https;"
                     f"AccountName={storage_account_name};"
                     f"AccountKey={storage_account_key};"
                     f"EndpointSuffix=core.windows.net")
container_name = "sensor-data"
output_container_name = "sensorinsights"

# Set Spark configuration for Azure Blob Storage access
spark.conf.set(f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net", storage_account_key)

# Initialize BlobServiceClient
blob_service_client = BlobServiceClient.from_connection_string(connection_string)



In [0]:
# Function to list files in a container
def list_files(container_name):
    return blob_service_client.get_container_client(container_name).list_blobs()

# Function to load data from a file path
def load_data(file_path):
    df = spark.read.format("csv").option("header", "true").load(file_path)
    df.printSchema()  # Check the schema to confirm the data structure
    return df

# Function to cast all columns except 'sensor_id', 'timestamp', 'location' to numeric and calculate averages
def aggregate_metrics(df):
    # Identify columns to average, ignoring 'sensor_id', 'timestamp', and 'location'
    numeric_columns = [col for col, dtype in df.dtypes 
                       if col not in ("sensor_id", "timestamp", "location")]

    # Cast selected columns to numeric type and calculate averages
    for col in numeric_columns:
        df = df.withColumn(col, df[col].cast("float"))
    
    # Prepare aggregation for numeric columns only
    avg_columns = [avg(col).alias(f"avg_{col}") for col in numeric_columns]

    # Group by 'location' and 'timestamp' and calculate averages
    aggregated_df = df.groupBy("location", "timestamp").agg(*avg_columns)
    
    # Ensure 'sensor_id', 'timestamp', and 'location' are retained
    final_df = aggregated_df.select("location", "timestamp", *[f"avg_{col}" for col in numeric_columns])
    final_df.show(5)  # Show sample output for verification
    return final_df

# Function to save data to Azure Blob Storage
def save_to_blob(data, blob_name):
    json_data = [json.loads(item) for item in data]
    blob_client = blob_service_client.get_blob_client(container=output_container_name, blob=blob_name)
    blob_client.upload_blob(json.dumps(json_data, indent=2), overwrite=True)

In [0]:
# Main processing logic
for blob in list_files(container_name):
    file_path = f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/{blob.name}"
    df = load_data(file_path)

    # Check if 'location' and 'timestamp' exist for grouping
    if "location" in df.columns and "timestamp" in df.columns:
        metrics_df = aggregate_metrics(df)
        if metrics_df is not None:
            output_file = f"avg_{blob.name.replace('.csv', '')}.json"
            save_to_blob(metrics_df.toJSON().collect(), output_file)
        else:
            print(f"Skipping {blob.name} as no numeric columns were found.")
    else:
        print(f"Skipping {blob.name} as it does not contain 'location' and 'timestamp' columns.")


root
 |-- sensor_id: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- location: string (nullable = true)
 |-- air_quality_index: string (nullable = true)
 |-- pm2_5: string (nullable = true)
 |-- pm10: string (nullable = true)

+-------------+-------------------+---------------------+------------------+------------------+
|     location|          timestamp|avg_air_quality_index|         avg_pm2_5|          avg_pm10|
+-------------+-------------------+---------------------+------------------+------------------+
|    Docklands|2024-09-01 02:00:00|                 58.0|15.300000190734863|30.200000762939453|
|    Footscray|2024-09-01 04:00:00|                 90.0|              25.0|50.099998474121094|
|      Fitzroy|2024-09-01 01:00:00|                 65.0|              18.0|              35.5|
|Melbourne CBD|2024-09-01 00:00:00|                 45.0|              10.5|              25.0|
|     St Kilda|2024-09-01 03:00:00|                 72.0|20.100000381469727|  