In [None]:
# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
import pyspark.sql.functions as F


# Create Spark session
spark = SparkSession.builder.appName("WaterQualityPipeline").getOrCreate()

In [None]:
# Load Bronze layer data
bronze_df = spark.read.format("delta").load("/mnt/datalake/bronze/water_quality")

In [None]:
bronze_df.count()

Data types checking

In [None]:
bronze_df.describe()

In [None]:
bronze_df.printSchema()

Checking Null values

In [None]:
def count_missings(spark_df,sort=True):
    """
    Counts number of nulls and nans in each column
    """
    df = spark_df.select([F.count(F.when(F.isnan(c) | F.isnull(c), c)).alias(c) for (c,c_type) in spark_df.dtypes if c_type not in ('timestamp', 'string', 'date')]).toPandas()

    if len(df) == 0:
        print("There are no any missing values!")
        return None

    if sort:
        return df.rename(index={0: 'count'}).T.sort_values("count",ascending=False)

    return df

In [None]:
count_missings(bronze_df)

Dropping unwanted columns

In [None]:
# Data Cleaning: Drop unnecessary columns
silver_df = bronze_df.drop(
    "monitoringSiteIdentifierScheme",
    "observedPropertyDeterminandCode",
    "procedureAnalyticalMethod",
    "parameterSampleDepth",
    "resultObservationStatus",
    "remarks",
    "metadata_beginLifeSpanVersion",
    "metadata_statusCode",
    "metadata_observationStatus",
    "metadata_statements",
    "metadata_versionId",
    "resultStandardDeviationValue"
)

Renaming columns

In [None]:
# Rename columns for better understanding
silver_df = silver_df.withColumnRenamed("countryCode", "Country_Code") \
    .withColumnRenamed("monitoringSiteIdentifier", "Monitoring_SiteID") \
    .withColumnRenamed("parameterWaterBodyCategory", "Water_Body") \
    .withColumnRenamed("observedPropertyDeterminandLabel", "Determinand_Label") \
    .withColumnRenamed("procedureAnalysedMatrix", "Analyzed_Matrix") \
    .withColumnRenamed("resultUom", "Result_Unit") \
    .withColumnRenamed("phenomenonTimeReferenceYear", "Reference_Year") \
    .withColumnRenamed("parameterSamplingPeriod", "Sampling_Period") \
    .withColumnRenamed("procedureLOQValue", "LOQ_Value") \
    .withColumnRenamed("resultNumberOfSamples", "Num_of_Samples") \
    .withColumnRenamed("resultQualityNumberOfSamplesBelowLOQ", "Quality_Samples") \
    .withColumnRenamed("resultQualityMinimumBelowLOQ", "Quality_MinimumValue") \
    .withColumnRenamed("resultMinimumValue", "Minimum_Value") \
    .withColumnRenamed("resultQualityMeanBelowLOQ", "Quality_Meanvalue") \
    .withColumnRenamed("resultMeanValue", "Mean_Value") \
    .withColumnRenamed("resultQualityMaximumBelowLOQ", "Quality_MaximumValue") \
    .withColumnRenamed("resultMaximumValue", "Maximum_value") \
    .withColumnRenamed("resultQualityMedianBelowLOQ", "Quality_MedianValue") \
    .withColumnRenamed("resultMedianValue", "Median_Value")

Mapping Country Codes to Country Names

In [None]:
# Define the full mapping of country codes to country names
country_mapping = {
    "AT": "Austria",
    "CZ": "Czech Republic",
    "DE": "Germany",
    "BE": "Belgium",
    "ES": "Spain",
    "SK": "Slovakia",
    "SE": "Sweden",
    "DK": "Denmark",
    "IE": "Ireland",
    "CH": "Switzerland",
    "RO": "Romania",
    "EL": "Greece",
    "NO": "Norway",
    "BG": "Bulgaria",
    "SI": "Slovenia",
    "FR": "France",
    "IT": "Italy",
    "PL": "Poland",
    "LT": "Lithuania",
    "HR": "Croatia",
    "RS": "Serbia",
    "LV": "Latvia",
    "CY": "Cyprus",
    "AL": "Albania",
    "MK": "North Macedonia",
    "BA": "Bosnia and Herzegovina",
    "MT": "Malta",
    "FI": "Finland",
    "XK": "Kosovo",
    "EE": "Estonia",
    "TR": "Turkey",
    "LU": "Luxembourg",
    "HU": "Hungary",
    "PT": "Portugal",
    "NL": "Netherlands",
    "IS": "Iceland",
    "LI": "Liechtenstein",
    "ME": "Montenegro"
}


In [None]:
# Create the when-otherwise conditions for each country code
country_name_expr = when(col("Country_Code") == "AT", "Austria") \
    .when(col("Country_Code") == "CZ", "Czech Republic") \
    .when(col("Country_Code") == "DE", "Germany") \
    .when(col("Country_Code") == "BE", "Belgium") \
    .when(col("Country_Code") == "ES", "Spain") \
    .when(col("Country_Code") == "SK", "Slovakia") \
    .when(col("Country_Code") == "SE", "Sweden") \
    .when(col("Country_Code") == "DK", "Denmark") \
    .when(col("Country_Code") == "IE", "Ireland") \
    .when(col("Country_Code") == "CH", "Switzerland") \
    .when(col("Country_Code") == "RO", "Romania") \
    .when(col("Country_Code") == "EL", "Greece") \
    .when(col("Country_Code") == "NO", "Norway") \
    .when(col("Country_Code") == "BG", "Bulgaria") \
    .when(col("Country_Code") == "SI", "Slovenia") \
    .when(col("Country_Code") == "FR", "France") \
    .when(col("Country_Code") == "IT", "Italy") \
    .when(col("Country_Code") == "PL", "Poland") \
    .when(col("Country_Code") == "LT", "Lithuania") \
    .when(col("Country_Code") == "HR", "Croatia") \
    .when(col("Country_Code") == "RS", "Serbia") \
    .when(col("Country_Code") == "LV", "Latvia") \
    .when(col("Country_Code") == "CY", "Cyprus") \
    .when(col("Country_Code") == "AL", "Albania") \
    .when(col("Country_Code") == "MK", "North Macedonia") \
    .when(col("Country_Code") == "BA", "Bosnia and Herzegovina") \
    .when(col("Country_Code") == "MT", "Malta") \
    .when(col("Country_Code") == "FI", "Finland") \
    .when(col("Country_Code") == "XK", "Kosovo") \
    .when(col("Country_Code") == "EE", "Estonia") \
    .when(col("Country_Code") == "TR", "Turkey") \
    .when(col("Country_Code") == "LU", "Luxembourg") \
    .when(col("Country_Code") == "HU", "Hungary") \
    .when(col("Country_Code") == "PT", "Portugal") \
    .when(col("Country_Code") == "NL", "Netherlands") \
    .when(col("Country_Code") == "IS", "Iceland") \
    .when(col("Country_Code") == "LI", "Liechtenstein") \
    .when(col("Country_Code") == "ME", "Montenegro") \
    .otherwise("Unknown")  # Default value for unmapped codes


# Add the 'Country_Name' column to the DataFrame
silver_df = silver_df.withColumn("Country_Name", country_name_expr)


# Show the results
silver_df.show(1)


Water body mapping

In [None]:
# Define the full mapping of water body codes to full names
water_body_mapping = {
    "GW": "Ground Water",
    "RW": "River Water",
    "LW": "Lake Water",
    "TW": "Tap Water",
    "CW": "Canal Water"
}

# Create the when-otherwise conditions for each water body code
water_body_name_expr = when(col("water_body") == "GW", "Ground Water") \
    .when(col("water_body") == "RW", "River Water") \
    .when(col("water_body") == "LW", "Lake Water") \
    .when(col("water_body") == "TW", "Tap Water") \
    .when(col("water_body") == "CW", "Canal Water") \
    .otherwise("Unknown")  # Default value for unmapped codes


# Add the 'Water_Body_Name' column to the DataFrame
silver_df = silver_df.withColumn("water_body", water_body_name_expr)


# Show the results
silver_df.display()


Analyzed Matrix

In [None]:
# Define the full mapping of Analyzed_Matrix codes to full names
analyzed_matrix_mapping = {
    "W": "Water",
    "W-DIS": "Water - Dissolved",
    "W-SPM": "Water - Suspended Particulate Matter"
}


# Create the when-otherwise conditions for each Analyzed_Matrix code
analyzed_matrix_name_expr = when(col("Analyzed_Matrix") == "W", "Water") \
    .when(col("Analyzed_Matrix") == "W-DIS", "Water - Dissolved") \
    .when(col("Analyzed_Matrix") == "W-SPM", "Water - Suspended Particulate Matter") \
    .otherwise("Unknown")  # Default value for unmapped codes


# Add the 'Analyzed_Matrix_Name' column to the DataFrame
silver_df = silver_df.withColumn("Analyzed_Matrix", analyzed_matrix_name_expr)


# Show the results
silver_df.display()


In [None]:
silver_df.count()

In [None]:
silver_df = silver_df.drop("Country_Code")

In [None]:
silver_df.count()

In [None]:
silver_df.display()

In [None]:
# Write the cleaned data to the Silver layer with schema evolution enabled
silver_df.write.format("delta") \
    .option("mergeSchema", "true") \
    .mode("overwrite") \
    .save("/mnt/datalake/silver/water_quality_cleaned")