### IMPORTING PACKAGES

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import Window
from pyspark.sql import DataFrame
import pandas as pd
from pyspark.sql.functions import (
    min, max, year, month, col, isnan, isnull, when, count, countDistinct, 
    round, desc, sum as sum_, mean, stddev, variance, skewness, kurtosis, 
    explode, split, regexp_replace, to_timestamp, to_date, lit, datediff, current_date
)
from pyspark.sql.types import StringType
import matplotlib.pyplot as plt
from functools import reduce
from pyspark.sql import functions as F
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.sql.window import Window
from pyspark.sql.functions import create_map, lit
from itertools import chain
from pyspark.sql.functions import avg as F_avg
from pyspark.sql.functions import sum as F_sum
from pyspark.sql.functions import count as F_count
from pyspark.sql.functions import col, round as F_round
from pyspark.sql.functions import log1p 
import seaborn as sns
from pyspark.ml.stat import Correlation
import numpy as np
import pylab as pl
from pyspark.ml.linalg import Vectors

import warnings

# Suppress all warnings
warnings.filterwarnings("ignore")

In [2]:
# Changing the directory to a specific location
%cd C:\Users\june3\Downloads\SDWA\

C:\Users\june3\Downloads\SDWA


### CREATE SPARK SESSION

In [3]:
# Create a SparkSession
spark = SparkSession.builder \
    .appName("SDWA DATA ANALYSIS") \
    .getOrCreate()

# Define file paths and read each CSV file into a DataFrame
file_paths = ["SDWA_PUB_WATER_SYSTEMS.csv",
              "SDWA_SITE_VISITS.csv", 
              "SDWA_VIOLATIONS_ENFORCEMENT.csv"]


### LOADING .CSV FILES AND READING INTO SPARK DATAFRAMES

In [4]:
# Step 2: Read all CSV files into a list of Spark DataFrames
dfs = [spark.read.csv(file_path, header=True, inferSchema=True) for file_path in file_paths]

# Step 3: Unpack the DataFrames to individual variables
df1, df2, df3 = dfs

In [5]:
# defining a missing values funtion
def get_missing_values(df, dataframe_name):
    # Compute missing values for each column
    missing_df = df.select([
        count(when(isnull(c) | isnan(c), c)).alias(c) for c in df.columns
    ])
    
    # Convert to Pandas for better formatting
    missing = missing_df.toPandas().transpose().reset_index()
    missing.columns = ['Column', 'Missing_Count']
    
    # Display the missing values
    print(f"--- Missing Values in {dataframe_name} ---")
    print(missing.to_markdown(index=False))
    print("\n")

In [6]:
# defining a statistics summary function
def get_summary_statistics(df, dataframe_name):
    # Compute summary statistics
    summary = df.describe().toPandas().set_index('summary').transpose()
    
    # Rename the index to the dataframe name for clarity
    summary.index.name = 'Column'
    
    # Display the summary statistics
    print(f"--- Summary Statistics for {dataframe_name} ---")
    print(summary.to_markdown())
    print("\n")

### DATAFRAME 1

In [7]:
print(f" Dataframe 1: rows: {df1.count()}, columns: {len(df1.columns)} ")

 Dataframe 1: rows: 430915, columns: 51 


In [8]:
# Dropping irrelevant columns
df1_drop = df1.drop("SUBMISSIONYEARQUARTER", "PWS_NAME", "PRIMACY_AGENCY_CODE", "EPA_REGION", "PWS_TYPE_CODE", 
                    "OWNER_TYPE_CODE", "PRIMARY_SOURCE_CODE", "DBPR_SCHEDULE_CAT_CODE", "LT2_SCHEDULE_CAT_CODE", "CDS_ID", 
                    "POP_CAT_2_CODE", "POP_CAT_3_CODE", "POP_CAT_4_CODE", "SEASON_BEGIN_DATE", "SEASON_END_DATE", 
                    "POP_CAT_5_CODE", "POP_CAT_11_CODE", "GW_SW_CODE", "ORG_NAME",  "ADMIN_NAME", "EMAIL_ADDR", "PHONE_NUMBER", 
                    "PHONE_EXT_NUMBER", "FAX_NUMBER", "ALT_PHONE_NUMBER", "ADDRESS_LINE1", "ADDRESS_LINE2", "CITY_NAME",
                    "PWS_DEACTIVATION_DATE", "PRIMACY_TYPE",  "SUBMISSION_STATUS_CODE", "FIRST_REPORTED_DATE", "LAST_REPORTED_DATE", 
                    "SOURCE_PROTECTION_BEGIN_DATE", "IS_GRANT_ELIGIBLE_IND", "IS_WHOLESALER_IND", "ZIP_CODE", "STATE_CODE",
                    "OUTSTANDING_PERFORM_BEGIN_DATE","REDUCED_MONITORING_BEGIN_DATE", "REDUCED_MONITORING_END_DATE", 
                    "SEASONAL_STARTUP_SYSTEM", "REDUCED_RTCR_MONITORING")

In [9]:
# dropping irrelvant values in a column
df1_drop = df1_drop.filter(df1_drop["COUNTRY_CODE"] == "US")

In [10]:
df1_drop = df1_drop.drop("COUNTRY_CODE")

In [11]:
# Fill null values with default values
df1_drop = df1_drop.fillna({
    "OUTSTANDING_PERFORMER": "N",
    "SOURCE_WATER_PROTECTION_CODE": "N"
})

In [12]:
# Cast SERVICE_CONNECTIONS_COUNT to Integer (or appropriate numeric type)
df1_drop = df1_drop.withColumn(
    "SERVICE_CONNECTIONS_COUNT", 
    col("SERVICE_CONNECTIONS_COUNT").cast("integer")
)


In [13]:
# Aggregate total population served per PWSID
population_agg = df1_drop.groupBy("PWSID").agg(
    F.sum("POPULATION_SERVED_COUNT").alias("TOTAL_POPULATION_SERVED_COUNT")
)

# Aggregate total service connections per PWSID
service_agg = df1_drop.groupBy("PWSID").agg(
    F.sum("SERVICE_CONNECTIONS_COUNT").alias("TOTAL_SERVICE_CONNECTIONS_COUNT")
)

In [14]:
# Join population aggregation
df1_drop = df1_drop.join(population_agg, on="PWSID", how="left")

# Join service connections aggregation
df1_drop = df1_drop.join(service_agg, on="PWSID", how="left")


In [15]:
df1_drop = df1_drop.drop("POPULATION_SERVED_COUNT", "SERVICE_CONNECTIONS_COUNT" )

In [16]:
get_summary_statistics(df1_drop, "DATAFRAME 1")

--- Summary Statistics for DATAFRAME 1 ---
| Column                          |   count |           mean |          stddev | min       | max       |
|:--------------------------------|--------:|---------------:|----------------:|:----------|:----------|
| PWSID                           |  426231 |    7.72525e+07 |     2.08861e+07 | 010106001 | WY5690006 |
| PWS_ACTIVITY_CODE               |  426231 |                |                 | A         | P         |
| IS_SCHOOL_OR_DAYCARE_IND        |  426231 |                |                 | N         | Y         |
| SOURCE_WATER_PROTECTION_CODE    |  426231 |                |                 | N         | Y         |
| OUTSTANDING_PERFORMER           |  426231 |                |                 | N         | Y         |
| TOTAL_POPULATION_SERVED_COUNT   |  426231 | 1041.57        | 27926.7         | 0         | 9000000   |
| TOTAL_SERVICE_CONNECTIONS_COUNT |  426231 |  423.78        | 32281.2         | 0         | 9999999   |




In [17]:
df1_drop.printSchema()

root
 |-- PWSID: string (nullable = true)
 |-- PWS_ACTIVITY_CODE: string (nullable = true)
 |-- IS_SCHOOL_OR_DAYCARE_IND: string (nullable = true)
 |-- SOURCE_WATER_PROTECTION_CODE: string (nullable = false)
 |-- OUTSTANDING_PERFORMER: string (nullable = false)
 |-- TOTAL_POPULATION_SERVED_COUNT: long (nullable = true)
 |-- TOTAL_SERVICE_CONNECTIONS_COUNT: long (nullable = true)



In [18]:
# Before Dropping Duplicates
print(f" Dataframe 1: rows: {df1_drop.count()}, columns: {len(df1_drop.columns)} ")

 Dataframe 1: rows: 426231, columns: 7 


In [19]:
df1_drop = df1_drop.dropDuplicates(["PWSID"])

In [20]:
# After Dropping Duplicates
print(f" Dataframe 1: rows: {df1_drop.count()}, columns: {len(df1_drop.columns)} ")

 Dataframe 1: rows: 426231, columns: 7 


In [21]:
get_missing_values(df1_drop, "DATAFRAME 1")

--- Missing Values in DATAFRAME 1 ---
| Column                          |   Missing_Count |
|:--------------------------------|----------------:|
| PWSID                           |               0 |
| PWS_ACTIVITY_CODE               |               0 |
| IS_SCHOOL_OR_DAYCARE_IND        |               0 |
| SOURCE_WATER_PROTECTION_CODE    |               0 |
| OUTSTANDING_PERFORMER           |               0 |
| TOTAL_POPULATION_SERVED_COUNT   |               0 |
| TOTAL_SERVICE_CONNECTIONS_COUNT |               0 |




### DATAFRAME 2

In [22]:
print(f" Dataframe 2: rows: {df2.count()}, columns: {len(df2.columns)} ")

 Dataframe 2: rows: 2359090, columns: 20 


In [23]:
df2.printSchema()

root
 |-- SUBMISSIONYEARQUARTER: string (nullable = true)
 |-- PWSID: string (nullable = true)
 |-- VISIT_ID: string (nullable = true)
 |-- VISIT_DATE: string (nullable = true)
 |-- AGENCY_TYPE_CODE: string (nullable = true)
 |-- VISIT_REASON_CODE: string (nullable = true)
 |-- MANAGEMENT_OPS_EVAL_CODE: string (nullable = true)
 |-- SOURCE_WATER_EVAL_CODE: string (nullable = true)
 |-- SECURITY_EVAL_CODE: string (nullable = true)
 |-- PUMPS_EVAL_CODE: string (nullable = true)
 |-- OTHER_EVAL_CODE: string (nullable = true)
 |-- COMPLIANCE_EVAL_CODE: string (nullable = true)
 |-- DATA_VERIFICATION_EVAL_CODE: string (nullable = true)
 |-- TREATMENT_EVAL_CODE: string (nullable = true)
 |-- FINISHED_WATER_STOR_EVAL_CODE: string (nullable = true)
 |-- DISTRIBUTION_EVAL_CODE: string (nullable = true)
 |-- FINANCIAL_EVAL_CODE: string (nullable = true)
 |-- VISIT_COMMENTS: string (nullable = true)
 |-- FIRST_REPORTED_DATE: string (nullable = true)
 |-- LAST_REPORTED_DATE: string (nullable = tru

In [24]:
# dropping irrelevant columns
df2_drop = df2.drop("SUBMISSIONYEARQUARTER", "AGENCY_TYPE_CODE", "VISIT_REASON_CODE",  
                    "VISIT_ID", "VISIT_DATE",  "FIRST_REPORTED_DATE", "VISIT_COMMENTS", "LAST_REPORTED_DATE")

In [25]:
# dropping null values
df2_drop = df2_drop.dropna()

In [26]:
df2_drop.printSchema()

root
 |-- PWSID: string (nullable = true)
 |-- MANAGEMENT_OPS_EVAL_CODE: string (nullable = true)
 |-- SOURCE_WATER_EVAL_CODE: string (nullable = true)
 |-- SECURITY_EVAL_CODE: string (nullable = true)
 |-- PUMPS_EVAL_CODE: string (nullable = true)
 |-- OTHER_EVAL_CODE: string (nullable = true)
 |-- COMPLIANCE_EVAL_CODE: string (nullable = true)
 |-- DATA_VERIFICATION_EVAL_CODE: string (nullable = true)
 |-- TREATMENT_EVAL_CODE: string (nullable = true)
 |-- FINISHED_WATER_STOR_EVAL_CODE: string (nullable = true)
 |-- DISTRIBUTION_EVAL_CODE: string (nullable = true)
 |-- FINANCIAL_EVAL_CODE: string (nullable = true)



In [27]:
# Dataframe before Dropping Duplicates
print(f" Dataframe 2: rows: {df2_drop.count()}, columns: {len(df2_drop.columns)} ")

 Dataframe 2: rows: 835088, columns: 12 


In [28]:
df2_drop = df2_drop.dropDuplicates(["PWSID"])

In [29]:
# Dataframe after Dropping Duplicates
print(f" Dataframe 2: rows: {df2_drop.count()}, columns: {len(df2_drop.columns)} ")

 Dataframe 2: rows: 127857, columns: 12 


In [30]:
get_missing_values(df2_drop, "DATAFRAME 2")

--- Missing Values in DATAFRAME 2 ---
| Column                        |   Missing_Count |
|:------------------------------|----------------:|
| PWSID                         |               0 |
| MANAGEMENT_OPS_EVAL_CODE      |               0 |
| SOURCE_WATER_EVAL_CODE        |               0 |
| SECURITY_EVAL_CODE            |               0 |
| PUMPS_EVAL_CODE               |               0 |
| OTHER_EVAL_CODE               |               0 |
| COMPLIANCE_EVAL_CODE          |               0 |
| DATA_VERIFICATION_EVAL_CODE   |               0 |
| TREATMENT_EVAL_CODE           |               0 |
| FINISHED_WATER_STOR_EVAL_CODE |               0 |
| DISTRIBUTION_EVAL_CODE        |               0 |
| FINANCIAL_EVAL_CODE           |               0 |




### DATAFRAME 3

In [31]:
# Display dataframe shape
print(f" Dataframe 3: rows: {df3.count()}, columns: {len(df3.columns)} ")

 Dataframe 3: rows: 14501860, columns: 38 


In [32]:
# dropping irrelevant columns
df3_drop = df3.drop("SUBMISSIONYEARQUARTER", "FACILITY_ID", "VIOLATION_CODE", "COMPL_PER_BEGIN_DATE", 
                    "CONTAMINANT_CODE", "COMPL_PER_END_DATE", "STATE_MCL", "RULE_GROUP_CODE", "SAMPLE_RESULT_ID", 
                    "CORRECTIVE_ACTION_ID", "ENFORCEMENT_ACTION_TYPE_CODE","CALCULATED_PUB_NOTIF_TIER", "PWS_DEACTIVATION_DATE",
                    "PUBLIC_NOTIFICATION_TIER", "SEVERITY_IND_CNT", "FEDERAL_MCL", "RULE_CODE", "RULE_FAMILY_CODE",
                      "VIOL_FIRST_REPORTED_DATE", "VIOL_LAST_REPORTED_DATE", "ENF_FIRST_REPORTED_DATE", "ENF_LAST_REPORTED_DATE",
                      "VIOL_ORIGINATOR_CODE", "ENF_ORIGINATOR_CODE", "FIRST_REPORTED_DATE", "LAST_REPORTED_DATE")

In [33]:
# dropping null values in a column
df3_drop = df3_drop.dropna(subset = ["VIOL_MEASURE"])

In [34]:
# Imputing null values
df3_drop = df3_drop.fillna({"IS_MAJOR_VIOL_IND": "N", 
                            "CALCULATED_RTC_DATE": "12/31/2024",
                            "NON_COMPL_PER_END_DATE": "12/31/2024",
                            "NON_COMPL_PER_END_DATE": "12/31/2024",
                            "ENFORCEMENT_DATE": "12/31/2024",
                            "ENF_ACTION_CATEGORY": "Unknown",
                            "UNIT_OF_MEASURE": "MG/L"})

In [35]:
# filtering certains value rows in a column
df3_drop = df3_drop.filter(df3_drop["UNIT_OF_MEASURE"] == "MG/L")

In [36]:
# Aggregate variables per PWSID
enforcement_agg = df3_drop.groupBy("PWSID").agg(
    F.count("ENFORCEMENT_ID").alias("TOTAL_ENFORCEMENTS")
)

violation_agg = df3_drop.groupBy("PWSID").agg(
    F.count("VIOLATION_ID").alias("TOTAL_VIOLATIONS")
)

In [37]:
# Join population aggregation
df3_drop = df3_drop.join(enforcement_agg, on="PWSID", how="left")

# Join service connections aggregation
df3_drop = df3_drop.join(violation_agg, on="PWSID", how="left")


In [38]:
df3_drop = df3_drop.drop("UNIT_OF_MEASURE", "VIOLATION_ID", "ENFORCEMENT_ID")

In [39]:
get_missing_values(df3_drop, "DATAFRAME 2")

--- Missing Values in DATAFRAME 2 ---
| Column                   |   Missing_Count |
|:-------------------------|----------------:|
| PWSID                    |               0 |
| NON_COMPL_PER_BEGIN_DATE |               0 |
| NON_COMPL_PER_END_DATE   |               0 |
| VIOLATION_CATEGORY_CODE  |               0 |
| IS_HEALTH_BASED_IND      |               0 |
| VIOL_MEASURE             |               0 |
| IS_MAJOR_VIOL_IND        |               0 |
| CALCULATED_RTC_DATE      |               0 |
| VIOLATION_STATUS         |               0 |
| ENFORCEMENT_DATE         |               0 |
| ENF_ACTION_CATEGORY      |               0 |
| TOTAL_ENFORCEMENTS       |               0 |
| TOTAL_VIOLATIONS         |               0 |




In [40]:
# Convert date columns from string to DateType with the correct format
df3_drop = df3_drop.withColumn(
    "NON_COMPL_PER_BEGIN_DATE",
    to_date(col("NON_COMPL_PER_BEGIN_DATE"), "MM/dd/yyyy")
)

df3_drop = df3_drop.withColumn(
    "NON_COMPL_PER_END_DATE",
    to_date(col("NON_COMPL_PER_END_DATE"), "MM/dd/yyyy")
)

df3_drop = df3_drop.withColumn(
    "CALCULATED_RTC_DATE",
    to_date(col("CALCULATED_RTC_DATE"), "MM/dd/yyyy")
)

df3_drop = df3_drop.withColumn(
    "ENFORCEMENT_DATE",
    to_date(col("ENFORCEMENT_DATE"), "MM/dd/yyyy")
)

In [41]:
# Updating the 'NON_COMPL_PER_END_DATE' column
df3_drop = df3_drop.withColumn(
    "NON_COMPL_PER_END_DATE",
    when(col("NON_COMPL_PER_END_DATE").isNull(), lit("2024-12-31"))
    .when(col("NON_COMPL_PER_END_DATE") > current_date(), lit("2024-12-31"))
    .otherwise(col("NON_COMPL_PER_END_DATE"))  # Preserve original value if other conditions are not met
)

In [46]:
# Non-Compliance Duration
df3_drop = df3_drop.withColumn("NON_COMPLIANCE_DURATION_DAYS", 
                               F.datediff("NON_COMPL_PER_END_DATE", "NON_COMPL_PER_BEGIN_DATE"))

In [47]:
df3_drop.filter(df3_drop["NON_COMPLIANCE_DURATION_DAYS"] < 0 ).select("NON_COMPLIANCE_DURATION_DAYS", "NON_COMPL_PER_END_DATE", "NON_COMPL_PER_BEGIN_DATE").show()

+----------------------------+----------------------+------------------------+
|NON_COMPLIANCE_DURATION_DAYS|NON_COMPL_PER_END_DATE|NON_COMPL_PER_BEGIN_DATE|
+----------------------------+----------------------+------------------------+
|                          -1|            1994-01-31|              1994-02-01|
|                          -1|            1994-01-31|              1994-02-01|
|                          -1|            1994-01-31|              1994-02-01|
|                          -1|            1994-01-31|              1994-02-01|
|                          -1|            1994-01-31|              1994-02-01|
|                          -1|            1994-01-31|              1994-02-01|
|                          -1|            1994-01-31|              1994-02-01|
|                          -1|            1994-01-31|              1994-02-01|
|                          -1|            1994-01-31|              1994-02-01|
|                          -1|            1994-01-31

In [48]:
# Step 3: Swap begin and end dates only for rows with incorrect dates
df_incorrect_dates_fixed = df3_drop.withColumn(
    "BEGIN_DATE_FIXED",
    F.when(F.col("NON_COMPL_PER_BEGIN_DATE") > F.col("NON_COMPL_PER_END_DATE"), F.col("NON_COMPL_PER_END_DATE")).otherwise(F.col("NON_COMPL_PER_BEGIN_DATE"))
).withColumn(
    "END_DATE_FIXED",
    F.when(F.col("NON_COMPL_PER_BEGIN_DATE") > F.col("NON_COMPL_PER_END_DATE"), F.col("NON_COMPL_PER_BEGIN_DATE")).otherwise(F.col("NON_COMPL_PER_END_DATE"))
)

# Step 4: Recalculate the duration for the corrected rows
df_incorrect_dates_fixed = df_incorrect_dates_fixed.withColumn(
    "NON_COMPLIANCE_DURATION_DAYS", 
    F.datediff(F.col("END_DATE_FIXED"), F.col("BEGIN_DATE_FIXED"))
)

In [49]:
df3_drop = df_incorrect_dates_fixed.drop("NON_COMPL_PER_BEGIN_DATE", "NON_COMPL_PER_END_DATE")

In [50]:
# Flag for open violations
df3_drop = df3_drop.withColumn(
    "IS_VIOLATION_OPEN",
    when(col("END_DATE_FIXED") == lit("2024-12-31"), 1).otherwise(0)
)

In [51]:
# Flag for enforcement taken
df3_drop = df3_drop.withColumn(
    "ENFORCEMENT_ACTION_TAKEN",
    when(col("ENFORCEMENT_DATE") < current_date(), 1).otherwise(0)
)

In [52]:
df3_drop = df3_drop.withColumn(
    "LATE_COMPLIANCE",
    when(
        (col("CALCULATED_RTC_DATE") > col("ENFORCEMENT_DATE")) &
        (col("ENFORCEMENT_DATE") != lit("2024-12-31")),
        1
    ).otherwise(0)
)

In [53]:
# Define compliance status
df3_drop = df3_drop.withColumn(
    "COMPLIANCE_STATUS",
    when(col("NON_COMPLIANCE_DURATION_DAYS").isNull(), "Compliant")
    .when((col("NON_COMPLIANCE_DURATION_DAYS").isNotNull()) & (col("ENFORCEMENT_ACTION_TAKEN") == 0), "Non-Compliant (No Enforcement)")
    .when((col("NON_COMPLIANCE_DURATION_DAYS").isNotNull()) & (col("ENFORCEMENT_ACTION_TAKEN") == 1) & (col("LATE_COMPLIANCE") == 0), "Non-Compliant (With Enforcement)")
    .when((col("LATE_COMPLIANCE") == 1), "Late Compliance")
    .otherwise("Unknown")
)

In [54]:
# Aggregating multiple features at once
df3_aggregated = df3_drop.groupBy("PWSID").agg(
    F.round(F.avg("NON_COMPLIANCE_DURATION_DAYS"), 0).alias("AVG_VIOLATION_DURATION_DAYS"),  # Rounded average
    F.sum("IS_VIOLATION_OPEN").alias("OPEN_VIOLATIONS_COUNT"),  # Total open violations
    F.sum("LATE_COMPLIANCE").alias("TOTAL_LATE_COMPLIANT_ACTIONS")  # Total late compliant actions
)

In [55]:
# Join service connections aggregation
df3_drop = df3_drop.join(df3_aggregated, on="PWSID", how="left")

In [56]:
df3_drop = df3_drop.drop("BEGIN_DATE_FIXED", "END_DATE_FIXED", "CALCULATED_RTC_DATE", "ENFORCEMENT_DATE",
                        "NON_COMPLIANCE_DURATION_DAYS", "ENFORCEMENT_ACTION_TAKEN",  "LATE_COMPLIANCE", "IS_VIOLATION_OPEN"
                        )

In [57]:
df3_drop.printSchema()

root
 |-- PWSID: string (nullable = true)
 |-- VIOLATION_CATEGORY_CODE: string (nullable = true)
 |-- IS_HEALTH_BASED_IND: string (nullable = true)
 |-- VIOL_MEASURE: double (nullable = true)
 |-- IS_MAJOR_VIOL_IND: string (nullable = false)
 |-- VIOLATION_STATUS: string (nullable = true)
 |-- ENF_ACTION_CATEGORY: string (nullable = false)
 |-- TOTAL_ENFORCEMENTS: long (nullable = true)
 |-- TOTAL_VIOLATIONS: long (nullable = true)
 |-- COMPLIANCE_STATUS: string (nullable = false)
 |-- AVG_VIOLATION_DURATION_DAYS: double (nullable = true)
 |-- OPEN_VIOLATIONS_COUNT: long (nullable = true)
 |-- TOTAL_LATE_COMPLIANT_ACTIONS: long (nullable = true)



In [58]:
get_summary_statistics(df3_drop, "DATAFRAME 3")

--- Summary Statistics for DATAFRAME 3 ---
| Column                       |   count |          mean |        stddev | min             | max                              |
|:-----------------------------|--------:|--------------:|--------------:|:----------------|:---------------------------------|
| PWSID                        | 1974819 |   8.98512e+07 |   2.05634e+07 | 010106001       | WY5680211                        |
| VIOLATION_CATEGORY_CODE      | 1974819 |               |               | MCL             | TT                               |
| IS_HEALTH_BASED_IND          | 1974819 |               |               | N               | Y                                |
| VIOL_MEASURE                 | 1974819 |   3.97418     | 318.332       | 0.0             | 99999.0                          |
| IS_MAJOR_VIOL_IND            | 1974819 |               |               | N               | Y                                |
| VIOLATION_STATUS             | 1974819 |               |   

In [59]:
# Before Dropping Duplicates
print(f" Dataframe 3: rows: {df3_drop.count()}, columns: {len(df3_drop.columns)} ")

 Dataframe 3: rows: 1974819, columns: 13 


In [60]:
df3_drop = df3_drop.dropDuplicates(["PWSID"])

In [61]:
# After Dropping Duplicates
print(f" Dataframe 3: rows: {df3_drop.count()}, columns: {len(df3_drop.columns)} ")

 Dataframe 3: rows: 109381, columns: 13 


In [62]:
get_missing_values(df3_drop, "DATAFRAME 3")

--- Missing Values in DATAFRAME 3 ---
| Column                       |   Missing_Count |
|:-----------------------------|----------------:|
| PWSID                        |               0 |
| VIOLATION_CATEGORY_CODE      |               0 |
| IS_HEALTH_BASED_IND          |               0 |
| VIOL_MEASURE                 |               0 |
| IS_MAJOR_VIOL_IND            |               0 |
| VIOLATION_STATUS             |               0 |
| ENF_ACTION_CATEGORY          |               0 |
| TOTAL_ENFORCEMENTS           |               0 |
| TOTAL_VIOLATIONS             |               0 |
| COMPLIANCE_STATUS            |               0 |
| AVG_VIOLATION_DURATION_DAYS  |               0 |
| OPEN_VIOLATIONS_COUNT        |               0 |
| TOTAL_LATE_COMPLIANT_ACTIONS |               0 |




### DATAFRAME TRANSFORMATIONS

In [63]:
# List of DataFrames
dfs = [df1_drop, df2_drop, df3_drop]

# Merge all DataFrames on 'id' using an inner join
df_combined = reduce(lambda df1, df2: df1.join(df2, "PWSID", how="inner"), dfs)

In [64]:
df_combined = df_combined.dropDuplicates()

In [65]:
print(f" Dataframes combined: rows: {df_combined.count()}, columns: {len(df_combined.columns)} ")

 Dataframes combined: rows: 43073, columns: 30 


In [None]:
# converting the final dataframe into pandas


In [None]:
# Saving the csv file into the drive
df_original.to_csv('C:/Users/june3/OneDrive/Desktop/my_output_result/df_original.csv', index=False) 

In [83]:
spark.stop()