<h1 style="text-align: center;">DPDQ HW</h1>
<h2 style="text-align: center;">Name : Siddhesh Maheshwari</h2>
<h2 style="text-align: center;">MDS202347</h2>

# Data Quality Analysis on Adult Dataset
#### Using pydeequ

In [1]:
import warnings
warnings.filterwarnings("ignore")

In [2]:
!pip install -q pyspark==3.2.0
!pip install pydeequ



In [3]:
from pyspark.sql import SparkSession, Row
!pyspark --version
%env SPARK_VERSION=3.3

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.2.0
      /_/
                        
Using Scala version 2.12.15, OpenJDK 64-Bit Server VM, 11.0.26
Branch HEAD
Compiled by user ubuntu on 2021-10-06T12:46:30Z
Revision 5d45a415f3a29898d92380380cfd82bfc7f579ea
Url https://github.com/apache/spark
Type --help for more information.
env: SPARK_VERSION=3.3


# Step 1 : Import libraries

In [51]:
import pydeequ
from pydeequ.profiles import *
import os
import kagglehub
import pandas as pd
import numpy as np
import json
from pyspark.sql import SparkSession
from pydeequ.profiles import ColumnProfilerRunner
from pydeequ.suggestions import ConstraintSuggestionRunner, DEFAULT
from pydeequ.checks import Check, CheckLevel
from pydeequ.verification import VerificationSuite, VerificationResult
from pydeequ.analyzers import AnalyzerContext
from pydeequ.analyzers import *
from pydeequ.suggestions import *

# Step 2 : Loading the dataset

In [5]:
# Download latest version from Kaggle
dataset_path = kagglehub.dataset_download("siddheshmaheshwari/adult")
print("Path to dataset files:", dataset_path)

# Load data (adjust file names based on what's available in the dataset)
train_file = os.path.join(dataset_path, "adult.data")
test_file = os.path.join(dataset_path, "adult.test")

Path to dataset files: /kaggle/input/adult


In [10]:
column_names = [
    'age', 'workclass', "fnlwgt",'education', 'education-num', 'marital-status',
    'occupation', 'relationship', 'race', 'sex', 'capital-gain',
    'capital-loss', 'hours-per-week', 'native-country', 'income'
]

adult_data = pd.read_csv(train_file, names=column_names)
adult_test = pd.read_csv(test_file, names=column_names)
adult_test.drop(index=0,inplace=True)

adult_data_full = pd.concat([adult_data, adult_test], axis=0)

adult_data_full.index = adult_data_full.index.astype(str)

int_columns = ['age', 'fnlwgt', 'education-num', 'capital-gain', 'capital-loss', 'hours-per-week']
for col in int_columns:
    adult_data_full[col] = adult_data_full[col].astype(int)

# Taking some use of Y data profile analysis
# 1. Normalize income labels (fix variations like '>50K.', '<=50K ', etc.)
adult_data_full['income'] = adult_data_full['income'].str.strip().replace({'>50K.': '>50K', '<=50K.': '<=50K'})

# 2. Replace '?' with NaN
adult_data_full.replace('?', np.nan, inplace=True)

# 3. Remove duplicate rows
adult_data_full = adult_data_full.drop_duplicates()

columns_to_convert = [
    'hours-per-week', 'capital-gain', 'education-num',
    'age', 'fnlwgt', 'capital-loss'
]

# Convert those columns
adult_data_full[columns_to_convert] = adult_data_full[columns_to_convert].astype(int)

In [11]:
# Save the DataFrame to a CSV file in Google Drive
from google.colab import drive
drive.mount('/content/drive')

csv_file_path = '/content/drive/MyDrive/adult_data_full.csv'
adult_data_full.to_csv(csv_file_path, index=False)

print(f"Data saved to: {csv_file_path}")


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Data saved to: /content/drive/MyDrive/adult_data_full.csv


# Step 3: Create Spark session

In [12]:
spark = (SparkSession
    .builder
    .appName("PyDeequ Analysis")
    .config("spark.jars.packages", pydeequ.deequ_maven_coord)
    .config("spark.jars.excludes", pydeequ.f2j_maven_coord)
    .getOrCreate())

# Step 4: Load dataset

In [16]:
adult_data = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("/content/drive/MyDrive/adult_data_full.csv")


In [17]:
adult_data.printSchema()

root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education-num: integer (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital-gain: integer (nullable = true)
 |-- capital-loss: integer (nullable = true)
 |-- hours-per-week: integer (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)



In [18]:
adult_data.show(5)

+---+-----------------+------+----------+-------------+-------------------+------------------+--------------+------+-------+------------+------------+--------------+--------------+------+
|age|        workclass|fnlwgt| education|education-num|     marital-status|        occupation|  relationship|  race|    sex|capital-gain|capital-loss|hours-per-week|native-country|income|
+---+-----------------+------+----------+-------------+-------------------+------------------+--------------+------+-------+------------+------------+--------------+--------------+------+
| 39|        State-gov| 77516| Bachelors|           13|      Never-married|      Adm-clerical| Not-in-family| White|   Male|        2174|           0|            40| United-States| <=50K|
| 50| Self-emp-not-inc| 83311| Bachelors|           13| Married-civ-spouse|   Exec-managerial|       Husband| White|   Male|           0|           0|            13| United-States| <=50K|
| 38|          Private|215646|   HS-grad|            9|     

# Step 5: Column Profiling

In [19]:
print("=== Running Column Profiling ===")
profile_result = ColumnProfilerRunner(spark).onData(adult_data).run()

# Print profiling results
for col, profile in profile_result.profiles.items():
    print(f"Column: {col}")
    print(f"  Completeness: {profile.completeness}")
    print(f"  Distinct values: {profile.approximateNumDistinctValues}")
    print(f"  Data type: {profile.dataType}")
    print()

=== Running Column Profiling ===
Column: hours-per-week
  Completeness: 1.0
  Distinct values: 95
  Data type: Integral

Column: capital-gain
  Completeness: 1.0
  Distinct values: 124
  Data type: Integral

Column: education-num
  Completeness: 1.0
  Distinct values: 16
  Data type: Integral

Column: marital-status
  Completeness: 1.0
  Distinct values: 7
  Data type: String

Column: age
  Completeness: 1.0
  Distinct values: 74
  Data type: Integral

Column: sex
  Completeness: 1.0
  Distinct values: 2
  Data type: String

Column: relationship
  Completeness: 1.0
  Distinct values: 6
  Data type: String

Column: education
  Completeness: 1.0
  Distinct values: 16
  Data type: String

Column: income
  Completeness: 1.0
  Distinct values: 2
  Data type: String

Column: race
  Completeness: 1.0
  Distinct values: 5
  Data type: String

Column: native-country
  Completeness: 1.0
  Distinct values: 39
  Data type: String

Column: fnlwgt
  Completeness: 1.0
  Distinct values: 28972
  Data 

# Step 6: Generate Constraint Suggestions

In [34]:
print("=== Generating Constraint Suggestions ===")
suggestion_result = (ConstraintSuggestionRunner(spark)
    .onData(adult_data)
    .addConstraintRule(DEFAULT())
    .run())

# Print constraint suggestions
for i in range(len(suggestion_result["constraint_suggestions"])):
    print(suggestion_result["constraint_suggestions"][i]["constraint_name"][20:])

=== Generating Constraint Suggestions ===
(Compliance('hours-per-week' has value range '40', '50', '45', '60', '35', '20', '30', '55', '25', '48', '38', '15', '70', '32', '10', '65', '24', '42', '36', '44', '16', '12', '37', '43', '8', '80', '52', '56', '28', '99', '18', '46', '72', '75', '5', '6', '4', '47', '84', '39', '54', '22', '33', '3', '41', '14', '2', '34', '21', '7', '27', '17', '90', '26', '23', '53', '49', '58', '13', '1', '9', '62', '66', '64', '11', '51', '57', '19', '85', '68', '63', '29', '98', '78', '31', '96', '77', '59', '67', '86', '76', '61', '88', '73', '91', '74', '92', '81', '89', '97', '95', '69', '82', '87', '79', '94',`hours-per-week` IN ('40', '50', '45', '60', '35', '20', '30', '55', '25', '48', '38', '15', '70', '32', '10', '65', '24', '42', '36', '44', '16', '12', '37', '43', '8', '80', '52', '56', '28', '99', '18', '46', '72', '75', '5', '6', '4', '47', '84', '39', '54', '22', '33', '3', '41', '14', '2', '34', '21', '7', '27', '17', '90', '26', '23', '53

# Step 7: Verify Data Quality

In [45]:
print("=== Verifying Data Quality ===")

# Define expected values for categorical columns
MARITAL_STATUS_VALUES = [
    ' Married-civ-spouse', ' Never-married', ' Divorced',
    ' Separated', ' Widowed', ' Married-spouse-absent', ' Married-AF-spouse'
]

RELATIONSHIP_VALUES = [
    ' Husband', ' Not-in-family', ' Own-child', ' Unmarried', ' Wife', ' Other-relative'
]

# Create the data quality check
check = (
    Check(spark, CheckLevel.Warning, "Adult Dataset Quality Check")
    # Dataset size check
    .hasSize(lambda x: x >= 30000)

    # Age checks
    .isComplete("age")
    .isContainedIn("age", list(map(str, range(17, 91))))

    # Income checks
    .isComplete("income")
    .isContainedIn("income", [">50K", "<=50K"])

    # Hours per week checks
    .isComplete("hours_per_week")
    .isNonNegative("hours_per_week")
    .isContainedIn("hours_per_week", list(map(str, range(1, 100))))

    # Education checks
    .isComplete("education_num")
    .isContainedIn("education_num", list(map(str, range(1, 17))))

    # Marital status checks
    .isComplete("marital_status")
    .isContainedIn("marital_status", MARITAL_STATUS_VALUES)

    # Gender checks
    .isComplete("sex")
    .isContainedIn("sex", [' Male', ' Female'])

    # Relationship checks
    .isComplete("relationship")
    .isContainedIn("relationship", RELATIONSHIP_VALUES)

    # Financial checks
    .isComplete("capital_gain")
    .isNonNegative("capital_gain")
    .isComplete("capital_loss")
    .isNonNegative("capital_loss")

    # Additional weight check
    .isComplete("fnlwgt")
)

verification_result = (
    VerificationSuite(spark)
    .onData(adult_data)
    .addCheck(check)
    .run()
)

# Show results
verification_result_df = VerificationResult.checkResultsAsDataFrame(spark, verification_result)
verification_result_df.show(truncate=False, n=100)  # Show all rows without truncation


=== Verifying Data Quality ===
+---------------------------+-----------+------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

# Step 8: Analyze Data Metrics

In [50]:
print("=== Analyzing Data Metrics ===")

analysis_result = AnalysisRunner(spark) \
    .onData(adult_data) \
    .addAnalyzer(Size()) \
    .addAnalyzer(Completeness("income")) \
    .addAnalyzer(Completeness("workclass"))  \
    .addAnalyzer(Mean("age")) \
    .addAnalyzer(ApproxCountDistinct("education")) \
    .addAnalyzer(CountDistinct("income"))  \
    .addAnalyzer(Completeness("fnlwgt"))  \
    .addAnalyzer(CountDistinct("fnlwgt"))  \
    .addAnalyzer(Minimum("age")) \
    .addAnalyzer(Maximum("age"))  \
    .addAnalyzer(Mean("hours_per_week"))  \
    .addAnalyzer(Mean("capital_gain"))  \
    .addAnalyzer(ApproxCountDistinct("sex"))  \
    .addAnalyzer(ApproxCountDistinct("race"))  \
    .addAnalyzer(ApproxCountDistinct("marital_status")) \
    .addAnalyzer(ApproxCountDistinct("workclass"))  \
    .addAnalyzer(ApproxCountDistinct("native_country"))  \
    .addAnalyzer(ApproxCountDistinct("occupation"))  \
    .run()

# Convert the result into a DataFrame for easy viewing
analysis_result_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysis_result)

# Show the analysis results
analysis_result_df.show(truncate=False)

=== Analyzing Data Metrics ===
+-------+----------+-------------------+------------------+
|entity |instance  |name               |value             |
+-------+----------+-------------------+------------------+
|Column |workclass |ApproxCountDistinct|9.0               |
|Column |education |ApproxCountDistinct|16.0              |
|Column |race      |ApproxCountDistinct|5.0               |
|Column |sex       |ApproxCountDistinct|2.0               |
|Column |age       |Minimum            |17.0              |
|Column |age       |Maximum            |90.0              |
|Column |income    |CountDistinct      |2.0               |
|Column |fnlwgt    |CountDistinct      |28523.0           |
|Dataset|*         |Size               |48790.0           |
|Column |occupation|ApproxCountDistinct|15.0              |
|Column |workclass |Completeness       |1.0               |
|Column |income    |Completeness       |1.0               |
|Column |fnlwgt    |Completeness       |1.0               |
|Column |

# Step 9 : Some Custom Analysis

In [54]:
# Distribution of Age Groups
print("=== Age Distribution Analysis ===")
adult_data.groupBy("age").count().orderBy("age").show(10)

# Custom Analysis - Income Distribution by Education
print("=== Income Distribution by Education ===")
adult_data.groupBy("education", "income").count().orderBy("education", "income").show(10)


=== Age Distribution Analysis ===
+---+-----+
|age|count|
+---+-----+
| 17|  594|
| 18|  861|
| 19| 1045|
| 20| 1111|
| 21| 1092|
| 22| 1176|
| 23| 1325|
| 24| 1205|
| 25| 1190|
| 26| 1153|
+---+-----+
only showing top 10 rows

=== Income Distribution by Education ===
+---------+------+-----+
|education|income|count|
+---------+------+-----+
|     10th| <=50K| 1302|
|     10th|  >50K|   87|
|     11th| <=50K| 1720|
|     11th|  >50K|   92|
|     12th| <=50K|  607|
|     12th|  >50K|   48|
|  1st-4th| <=50K|  237|
|  1st-4th|  >50K|    8|
|  5th-6th| <=50K|  480|
|  5th-6th|  >50K|   27|
+---------+------+-----+
only showing top 10 rows



In [57]:
# Occupation Distribution by Race
print("=== Occupation Distribution by Race ===")
adult_data.groupBy("race", "occupation").count().orderBy("race", "occupation").show(20)

=== Occupation Distribution by Race ===
+-------------------+------------------+-----+
|               race|        occupation|count|
+-------------------+------------------+-----+
| Amer-Indian-Eskimo|                 ?|   35|
| Amer-Indian-Eskimo|      Adm-clerical|   54|
| Amer-Indian-Eskimo|      Armed-Forces|    1|
| Amer-Indian-Eskimo|      Craft-repair|   61|
| Amer-Indian-Eskimo|   Exec-managerial|   48|
| Amer-Indian-Eskimo|   Farming-fishing|   14|
| Amer-Indian-Eskimo| Handlers-cleaners|   34|
| Amer-Indian-Eskimo| Machine-op-inspct|   28|
| Amer-Indian-Eskimo|     Other-service|   61|
| Amer-Indian-Eskimo|   Priv-house-serv|    1|
| Amer-Indian-Eskimo|    Prof-specialty|   48|
| Amer-Indian-Eskimo|   Protective-serv|   13|
| Amer-Indian-Eskimo|             Sales|   36|
| Amer-Indian-Eskimo|      Tech-support|    8|
| Amer-Indian-Eskimo|  Transport-moving|   28|
| Asian-Pac-Islander|                 ?|   96|
| Asian-Pac-Islander|      Adm-clerical|  198|
| Asian-Pac-Islander

In [65]:
# Workclass and Age Distribution
print("=== Workclass and Age Distribution ===")
workclass_age_distribution = adult_data.groupBy("workclass", "age").count().orderBy("workclass", "age")

random_sample = workclass_age_distribution.sample(withReplacement=False, fraction=0.1)
random_sample.show(20, truncate=False)

=== Workclass and Age Distribution ===
+------------+---+-----+
|workclass   |age|count|
+------------+---+-----+
| ?          |17 |97   |
| ?          |26 |38   |
| ?          |29 |47   |
| ?          |68 |40   |
| ?          |70 |36   |
| ?          |73 |26   |
| Federal-gov|20 |13   |
| Federal-gov|51 |35   |
| Local-gov  |19 |18   |
| Local-gov  |24 |56   |
| Local-gov  |37 |91   |
| Local-gov  |40 |78   |
| Local-gov  |44 |92   |
| Local-gov  |46 |111  |
| Local-gov  |50 |82   |
| Local-gov  |53 |70   |
| Local-gov  |63 |20   |
| Local-gov  |75 |2    |
| Local-gov  |80 |2    |
| Private    |26 |922  |
+------------+---+-----+
only showing top 20 rows

