In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, sum, count, max, min

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Applying Rules to PySpark DataFrame") \
    .getOrCreate()

# Assuming you have already loaded your data into the DataFrame 'df'
# Let's define your DataFrame 'df'

# List of dictionaries representing the rules and their corresponding columns
rules = [
    {
        "name": "Total Row Count",
        "description": "Calculate the total count of rows in the DataFrame.",
        "function": "count",
        "column": None,
        "length": None
    },
    {
        "name": "Average Length of Column1",
        "description": "Calculate the average length of values in Column1.",
        "function": "avg",
        "column": "Column1",
        "length": None
    },
    {
        "name": "Percentage of Null Values in Column2",
        "description": "Calculate the percentage of null values in Column2.",
        "function": "sum",
        "column": "Column2",
        "length": 0
    },
    {
        "name": "Percentage of Values with Specific Length in Column3",
        "description": "Calculate the percentage of values with length 3 in Column3.",
        "function": "count",
        "column": "Column3",
        "length": 3
    },
    {
        "name": "Maximum Length in Column4",
        "description": "Find the maximum length of values in Column4.",
        "function": "max",
        "column": "Column4",
        "length": None
    },
    {
        "name": "Minimum Length in Column5",
        "description": "Find the minimum length of values in Column5.",
        "function": "min",
        "column": "Column5",
        "length": None
    }
]

# Function to apply a rule to the DataFrame and return the result
def apply_rule(df, rule, column_name):
    func = rule["function"]
    col_name = rule["column"]
    length = rule["length"]

    if func == "count":
        if col_name is None:
            result = df.count()
        elif length is not None:
            result = df.filter((col(col_name).isNotNull()) & (length(col(col_name)) == length)).count()
        else:
            result = df.filter(col(col_name).isNotNull()).count()

    elif func == "avg":
        result = df.select(avg(length(col(col_name)))).first()[0]

    elif func == "sum":
        result = df.filter(length(col(col_name)) == length).count()
        total_count = df.filter(col(col_name).isNotNull()).count()
        result = (result * 100.0) / total_count if total_count > 0 else 0.0

    elif func == "max":
        result = df.select(max(length(col(col_name)))).first()[0]

    elif func == "min":
        result = df.select(min(length(col(col_name)))).first()[0]

    return result

# Calculate the percentage of rows that meet each rule and append them to a list
results_list = []
for rule in rules:
    if rule["column"] is not None:
        result = apply_rule(df, rule, column_name=rule["column"])
    else:
        result = apply_rule(df, rule, column_name="Entire DataFrame")
    results_list.append((rule["name"], result))

# Print the results list
print(results_list)


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/07/26 14:19:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


NameError: name 'df' is not defined

23/07/26 14:19:47 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
