<a href="https://colab.research.google.com/github/simerjit/pydeequ/blob/main/pydeequ_aircrash_test.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [29]:
# 1. Installs the required Java version for Spark.
# 2. Download a specific version of Apache Spark (version 3.2.0) that's pre-built for compatibility with Hadoop 2.7
# 3. Extract the contents of the downloaded Apache Spark tarball.
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.2.0/spark-3.2.0-bin-hadoop2.7.tgz
!tar xf spark-3.2.0-bin-hadoop2.7.tgz

In [28]:
#!pip uninstall pyspark==3.2.0

[0m

In [30]:
# Setting up environmental variables for pyspark.
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.0-bin-hadoop2.7"
os.environ["SPARK_VERSION"] = "3.2"

In [31]:
# Install pydeequ
!pip install pydeequ



In [32]:
# Set up a Spark session integrated with the PyDeequ library for data quality checks
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.functions import concat_ws
import pydeequ

spark = (SparkSession
         .builder
         .config("spark.jars.packages", pydeequ.deequ_maven_coord)
         .config("spark.jars.excludes", pydeequ.f2j_maven_coord)
         .getOrCreate())


In [33]:
# Reading air crashes dataset
df = spark.read.csv('/content/aircrashes.csv', header=True, inferSchema=True)

In [34]:
df = df.withColumn("Year", col("Year").cast("int"))
df = df.withColumn("Date", concat_ws("-", "Day", "Month", "Year"))

In [35]:
# Replacing space in column names with "_" and removing characters like "(", ")", "/".
for col in df.columns:
    new_col = col.replace(" ", "_").replace("(", "").replace(")", "").replace("/", "_")
    df = df.withColumnRenamed(col, new_col)

In [36]:
df.show()

+----+-------+---------+---+--------------+---------------------+--------------------+--------------------+--------------------+-------------+---------------------+-------------+-----------------+
|Year|Quarter|    Month|Day|Country_Region|Aircraft_Manufacturer|            Aircraft|            Location|            Operator|Sum_of_Ground|Sum_of_Fatalities_air|Sum_of_Aboard|             Date|
+----+-------+---------+---+--------------+---------------------+--------------------+--------------------+--------------------+-------------+---------------------+-------------+-----------------+
|1908|  Qtr 3|September| 17|      Virginia|         Wright Flyer|   Wright Flyer III?|  Fort Myer Virginia|Army U.S. - Military|            0|                    1|            2|17-September-1908|
|1909|  Qtr 3|September|  7|       France?|               Wright|   Wright ByplaneSC1|Juvisy-sur-Orge F...|                 N/A|            0|                    1|            1| 7-September-1909|
|1912|  Qtr 3| 

**Metrics Computation**

In [37]:
from pydeequ.analyzers import AnalysisRunner, AnalyzerContext, ApproxCountDistinct, Completeness, Compliance, Mean, Size, Correlation

# Analyzing the 'Year' column
# To see if the number of fatalities has decreased over the years, possibly due to advancements in aviation technology and safety measures.
analysisResult = AnalysisRunner(spark) \
                  .onData(df) \
                  .addAnalyzer(Size()) \
                  .addAnalyzer(Completeness("Year")) \
                  .addAnalyzer(ApproxCountDistinct("Year")) \
                  .addAnalyzer(Mean("Year")) \
                  .addAnalyzer(Compliance("Year", "Year > 1900")) \
                  .addAnalyzer(Correlation("Year", "Sum_of_Fatalities_air")) \
                  .run()

# To retrieve the analysis results
analysisResult_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult)
analysisResult_df.show(truncate=False)

+-----------+--------------------------+-------------------+-------------------+
|entity     |instance                  |name               |value              |
+-----------+--------------------------+-------------------+-------------------+
|Column     |Year                      |Compliance         |0.9956452889944576 |
|Column     |Year                      |Completeness       |0.9956452889944576 |
|Column     |Year                      |ApproxCountDistinct|113.0              |
|Column     |Year                      |Mean               |1970.938369781312  |
|Dataset    |*                         |Size               |5052.0             |
|Mutlicolumn|Year,Sum_of_Fatalities_air|Correlation        |0.16161932596456652|
+-----------+--------------------------+-------------------+-------------------+



**Run a profile of the dataset**

In [38]:
from pydeequ.profiles import ColumnProfilerRunner
result = ColumnProfilerRunner(spark) \
                .onData(df) \
                .run()
for col, profile in result.profiles.items():
       print(f'Column \'{col}\'')
       print('\t',f'completeness: {profile.completeness}')
       print('\t',f'approximate number of distinct values: {profile.approximateNumDistinctValues}')
       print('\t',f'datatype: {profile.dataType}')

Column 'Aircraft_Manufacturer'
	 completeness: 0.996437054631829
	 approximate number of distinct values: 634
	 datatype: String
Column 'Location'
	 completeness: 0.9928741092636579
	 approximate number of distinct values: 4092
	 datatype: String
Column 'Sum_of_Aboard'
	 completeness: 0.9926761678543151
	 approximate number of distinct values: 252
	 datatype: Integral
Column 'Operator'
	 completeness: 0.9926761678543151
	 approximate number of distinct values: 2655
	 datatype: String
Column 'Month'
	 completeness: 0.9996041171813144
	 approximate number of distinct values: 25
	 datatype: String
Column 'Sum_of_Fatalities_air'
	 completeness: 0.9926761678543151
	 approximate number of distinct values: 192
	 datatype: Integral
Column 'Quarter'
	 completeness: 1.0
	 approximate number of distinct values: 26
	 datatype: String
Column 'Year'
	 completeness: 0.9956452889944576
	 approximate number of distinct values: 113
	 datatype: Integral
Column 'Day'
	 completeness: 0.9994061757719715
	 a

**Constraint Verification**

In [39]:
from pydeequ.checks import *
from pydeequ.verification import *

# Define the check
check = Check(spark, CheckLevel.Warning, "Air Crash Data Check")

# Run the verification
checkResult = VerificationSuite(spark) \
    .onData(df) \
    .addCheck(
        check.hasSize(lambda x: x >= 5000) \
        .isComplete("Year") \
        .isComplete("Aircraft_Manufacturer") \
        .isNonNegative("Sum_of_Fatalities_air") \
        .isNonNegative("Year")) \
    .run()

# Convert the results to a DataFrame and show
checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
checkResult_df.show(truncate=False)

Python Callback server started!
+--------------------+-----------+------------+----------------------------------------------------------------------------------------------------------------------------------------------+-----------------+-------------------------------------------------------------------+
|check               |check_level|check_status|constraint                                                                                                                                    |constraint_status|constraint_message                                                 |
+--------------------+-----------+------------+----------------------------------------------------------------------------------------------------------------------------------------------+-----------------+-------------------------------------------------------------------+
+--------------------+-----------+------------+------------------------------------------------------------------------------------------

**Automated constraint suggestion**

In [40]:
from pydeequ.suggestions import *

suggestionResult = ConstraintSuggestionRunner(spark) \
             .onData(df) \
             .addConstraintRule(DEFAULT()) \
             .run()

for sugg in suggestionResult['constraint_suggestions']:
       print(f"Constraint suggestion for \'{sugg['column_name']}\': {sugg['description']}")
       print(f"The corresponding Python code is: {sugg['code_for_constraint']}\n")

Constraint suggestion for 'Aircraft_Manufacturer': 'Aircraft_Manufacturer' has less than 1% missing values
The corresponding Python code is: .hasCompleteness("Aircraft_Manufacturer", lambda x: x >= 0.99, "It should be above 0.99!")

Constraint suggestion for 'Location': 'Location' has less than 1% missing values
The corresponding Python code is: .hasCompleteness("Location", lambda x: x >= 0.99, "It should be above 0.99!")

Constraint suggestion for 'Sum_of_Aboard': 'Sum_of_Aboard' has no negative values
The corresponding Python code is: .isNonNegative("Sum_of_Aboard")

Constraint suggestion for 'Sum_of_Aboard': 'Sum_of_Aboard' has less than 1% missing values
The corresponding Python code is: .hasCompleteness("Sum_of_Aboard", lambda x: x >= 0.99, "It should be above 0.99!")

Constraint suggestion for 'Operator': 'Operator' has less than 1% missing values
The corresponding Python code is: .hasCompleteness("Operator", lambda x: x >= 0.99, "It should be above 0.99!")

Constraint suggestion

**Metrics Repository**

In [43]:
from pydeequ.repository import FileSystemMetricsRepository, ResultKey
from pydeequ.analyzers import AnalysisRunner, ApproxCountDistinct

metrics_file = FileSystemMetricsRepository.helper_metrics_file(spark, 'metrics.json')
repository = FileSystemMetricsRepository(spark, metrics_file)

In [44]:
# Define a tag for the result key
key_tags = {'tag': 'AirCrashAnalysis'}
resultKey = ResultKey(spark, ResultKey.current_milli_time(), key_tags)

# Run the analysis
analysisResult = AnalysisRunner(spark) \
                         .onData(df) \
                         .addAnalyzer(ApproxCountDistinct('Year')) \
                         .useRepository(repository) \
                         .saveOrAppendResult(resultKey) \
                         .run()

In [45]:
analysisResult_metrics_repository = repository.load() \
                           .before(ResultKey.current_milli_time()) \
                           .getSuccessMetricsAsDataFrame()
analysisResult_metrics_repository.show()

+------+--------+-------------------+-----+-------------+----------------+
|entity|instance|               name|value| dataset_date|             tag|
+------+--------+-------------------+-----+-------------+----------------+
|Column|    Year|ApproxCountDistinct|113.0|1695001177325|AirCrashAnalysis|
+------+--------+-------------------+-----+-------------+----------------+

