<a href="https://colab.research.google.com/github/swati-git/SQL/blob/main/UseMovingAveragesToFindAirQuality.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import col, avg, hour, to_timestamp


spark = SparkSession.builder.appName("AirQualityMovingAverages").getOrCreate()

hourly_data = [
    ("2023-05-01 00:00:00", 12.3),
    ("2023-05-01 01:00:00", 14.1),
    ("2023-05-01 02:00:00", 15.6),
    ("2023-05-01 03:00:00", 17.2),
    ("2023-05-01 04:00:00", 19.8),
    ("2023-05-01 05:00:00", 22.5),
    ("2023-05-01 06:00:00", 28.7),
    ("2023-05-01 07:00:00", 35.2),
    ("2023-05-01 08:00:00", 42.1),
    ("2023-05-01 09:00:00", 38.6),
    ("2023-05-01 10:00:00", 32.4),
    ("2023-05-01 11:00:00", 26.5),
    ("2023-05-01 12:00:00", 22.3),
    ("2023-05-01 13:00:00", 18.9),
    ("2023-05-01 14:00:00", 15.6),
    ("2023-05-01 15:00:00", 14.2),
    ("2023-05-01 16:00:00", 16.8),
    ("2023-05-01 17:00:00", 21.3),
    ("2023-05-01 18:00:00", 25.7),
    ("2023-05-01 19:00:00", 28.9),
    ("2023-05-01 20:00:00", 24.6),
    ("2023-05-01 21:00:00", 20.3),
    ("2023-05-01 22:00:00", 17.1),
    ("2023-05-01 23:00:00", 14.8),
    ("2023-05-02 00:00:00", 13.2)
]

df = spark.createDataFrame(hourly_data, ["timestamp", "pm25"])
df.show()


+-------------------+----+
|          timestamp|pm25|
+-------------------+----+
|2023-05-01 00:00:00|12.3|
|2023-05-01 01:00:00|14.1|
|2023-05-01 02:00:00|15.6|
|2023-05-01 03:00:00|17.2|
|2023-05-01 04:00:00|19.8|
|2023-05-01 05:00:00|22.5|
|2023-05-01 06:00:00|28.7|
|2023-05-01 07:00:00|35.2|
|2023-05-01 08:00:00|42.1|
|2023-05-01 09:00:00|38.6|
|2023-05-01 10:00:00|32.4|
|2023-05-01 11:00:00|26.5|
|2023-05-01 12:00:00|22.3|
|2023-05-01 13:00:00|18.9|
|2023-05-01 14:00:00|15.6|
|2023-05-01 15:00:00|14.2|
|2023-05-01 16:00:00|16.8|
|2023-05-01 17:00:00|21.3|
|2023-05-01 18:00:00|25.7|
|2023-05-01 19:00:00|28.9|
+-------------------+----+
only showing top 20 rows



In [2]:
df.dtypes


[('timestamp', 'string'), ('pm25', 'double')]

In [3]:
df = df.withColumn("timestamp", to_timestamp("timestamp"))

df.dtypes

[('timestamp', 'timestamp'), ('pm25', 'double')]

In [8]:
# Convert timestamp to long (milliseconds since epoch)
from pyspark.sql.functions import unix_timestamp
df = df.withColumn("timestamp_ms", unix_timestamp("timestamp").cast("long") * 1000)

# Define window on the long column
window_8hr = Window.orderBy("timestamp_ms").rangeBetween(
    -(8 * 60 * 60 * 1000),  # 8 hours in milliseconds
    0                       # current row
)

# Apply the window function
df_with_8hr_avg = df.withColumn("8hr_moving_avg", avg("pm25").over(window_8hr))

In [10]:
df_with_8hr_avg

DataFrame[timestamp: timestamp, pm25: double, timestamp_ms: bigint, 8hr_moving_avg: double]

In [11]:
df_with_8hr_avg.select(["timestamp", "pm25", "8hr_moving_avg"]).show()

+-------------------+----+------------------+
|          timestamp|pm25|    8hr_moving_avg|
+-------------------+----+------------------+
|2023-05-01 00:00:00|12.3|              12.3|
|2023-05-01 01:00:00|14.1|              13.2|
|2023-05-01 02:00:00|15.6|              14.0|
|2023-05-01 03:00:00|17.2|              14.8|
|2023-05-01 04:00:00|19.8|              15.8|
|2023-05-01 05:00:00|22.5|16.916666666666668|
|2023-05-01 06:00:00|28.7|18.599999999999998|
|2023-05-01 07:00:00|35.2|20.674999999999997|
|2023-05-01 08:00:00|42.1|23.055555555555554|
|2023-05-01 09:00:00|38.6|25.977777777777778|
|2023-05-01 10:00:00|32.4| 28.01111111111111|
|2023-05-01 11:00:00|26.5| 29.22222222222222|
|2023-05-01 12:00:00|22.3| 29.78888888888889|
|2023-05-01 13:00:00|18.9|29.688888888888886|
|2023-05-01 14:00:00|15.6|28.922222222222224|
|2023-05-01 15:00:00|14.2|27.311111111111114|
|2023-05-01 16:00:00|16.8|25.266666666666666|
|2023-05-01 17:00:00|21.3|22.955555555555556|
|2023-05-01 18:00:00|25.7|21.52222

In [13]:
window_24hr = Window.orderBy("timestamp_ms").rangeBetween(-(24 * 60 * 60), 0)  # 24 hours in seconds
result_df = df_with_8hr_avg.withColumn("24hr_moving_avg", avg("pm25").over(window_24hr))

display_df = result_df.select(
    "timestamp",
    col("pm25").alias("Hourly_PM25"),
    col("8hr_moving_avg").alias("8hr_Avg_PM25"),
    col("24hr_moving_avg").alias("24hr_Avg_PM25")
)

def pm25_to_aqi_category(pm25):
    if pm25 <= 12.0:
        return "Good"
    elif pm25 <= 35.4:
        return "Moderate"
    elif pm25 <= 55.4:
        return "Unhealthy for Sensitive Groups"
    elif pm25 <= 150.4:
        return "Unhealthy"
    elif pm25 <= 250.4:
        return "Very Unhealthy"
    else:
        return "Hazardous"

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
aqi_category_udf = udf(pm25_to_aqi_category, StringType())

final_df = display_df.withColumn("Hourly_AQI_Category", aqi_category_udf("Hourly_PM25")) \
                    .withColumn("8hr_AQI_Category", aqi_category_udf("8hr_Avg_PM25")) \
                    .withColumn("24hr_AQI_Category", aqi_category_udf("24hr_Avg_PM25"))

final_df.show(10, False)

from pyspark.sql.functions import when
warnings_df = final_df.withColumn(
    "Air_Quality_Warning",
    when(col("24hr_AQI_Category") == "Good", "No warning") \
    .when(col("24hr_AQI_Category") == "Moderate", "Unusually sensitive individuals should consider reducing prolonged outdoor exertion") \
    .when(col("24hr_AQI_Category") == "Unhealthy for Sensitive Groups", "Active children and adults, and people with respiratory disease should limit prolonged outdoor exertion") \
    .when(col("24hr_AQI_Category") == "Unhealthy", "Everyone may begin to experience health effects; sensitive groups should avoid all outdoor exertion") \
    .otherwise("Health alert: everyone may experience more serious health effects")
)

warnings_df.select("timestamp", "24hr_Avg_PM25", "24hr_AQI_Category", "Air_Quality_Warning").show(5, False)

+-------------------+-----------+------------------+-------------+------------------------------+----------------+------------------------------+
|timestamp          |Hourly_PM25|8hr_Avg_PM25      |24hr_Avg_PM25|Hourly_AQI_Category           |8hr_AQI_Category|24hr_AQI_Category             |
+-------------------+-----------+------------------+-------------+------------------------------+----------------+------------------------------+
|2023-05-01 00:00:00|12.3       |12.3              |12.3         |Moderate                      |Moderate        |Moderate                      |
|2023-05-01 01:00:00|14.1       |13.2              |14.1         |Moderate                      |Moderate        |Moderate                      |
|2023-05-01 02:00:00|15.6       |14.0              |15.6         |Moderate                      |Moderate        |Moderate                      |
|2023-05-01 03:00:00|17.2       |14.8              |17.2         |Moderate                      |Moderate        |Moderate  