In [39]:
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
import pyspark.sql.functions as F

In [32]:
spark = SparkSession.builder.getOrCreate()

In [33]:
logs_data = [
    (1, "192.168.1.1", 200),
    (2, "256.1.2.3", 404),
    (3, "192.168.001.1", 200),
    (4, "192.168.1.1", 200),
    (5, "192.168.1", 500),
    (6, "256.1.2.3", 404),
    (7, "192.168.001.1", 200),
]

logs_columns = ["log_id", "ip", "status_code"]

logs_df = spark.createDataFrame(logs_data, logs_columns)
logs_df.show(truncate=False)

+------+-------------+-----------+
|log_id|ip           |status_code|
+------+-------------+-----------+
|1     |192.168.1.1  |200        |
|2     |256.1.2.3    |404        |
|3     |192.168.001.1|200        |
|4     |192.168.1.1  |200        |
|5     |192.168.1    |500        |
|6     |256.1.2.3    |404        |
|7     |192.168.001.1|200        |
+------+-------------+-----------+



In [34]:
output_data = [
    ("256.1.2.3", 2),
    ("192.168.001.1", 2),
    ("192.168.1", 1),
]

output_columns = ["ip", "invalid_count"]

output_df = spark.createDataFrame(output_data, output_columns)
output_df.show(truncate=False)

+-------------+-------------+
|ip           |invalid_count|
+-------------+-------------+
|256.1.2.3    |2            |
|192.168.001.1|2            |
|192.168.1    |1            |
+-------------+-------------+



In [35]:
logs_ip_octets_df = (
    logs_df
    .withColumn("ip_octets_arr", F.split("ip", r"\."))
    .withColumn("ip_octets_expl", F.explode("ip_octets_arr") )
)

logs_ip_octets_df.show(truncate=False)

+------+-------------+-----------+------------------+--------------+
|log_id|ip           |status_code|ip_octets_arr     |ip_octets_expl|
+------+-------------+-----------+------------------+--------------+
|1     |192.168.1.1  |200        |[192, 168, 1, 1]  |192           |
|1     |192.168.1.1  |200        |[192, 168, 1, 1]  |168           |
|1     |192.168.1.1  |200        |[192, 168, 1, 1]  |1             |
|1     |192.168.1.1  |200        |[192, 168, 1, 1]  |1             |
|2     |256.1.2.3    |404        |[256, 1, 2, 3]    |256           |
|2     |256.1.2.3    |404        |[256, 1, 2, 3]    |1             |
|2     |256.1.2.3    |404        |[256, 1, 2, 3]    |2             |
|2     |256.1.2.3    |404        |[256, 1, 2, 3]    |3             |
|3     |192.168.001.1|200        |[192, 168, 001, 1]|192           |
|3     |192.168.001.1|200        |[192, 168, 001, 1]|168           |
|3     |192.168.001.1|200        |[192, 168, 001, 1]|001           |
|3     |192.168.001.1|200        |

In [36]:
logs_ip_octets_df = (
    logs_ip_octets_df
    .filter(
        ( ~F.col("ip").rlike(r"^\d+\.\d+\.\d+\.\d+$") ) |
        ( ( F.col("ip_octets_expl").rlike("0.*") ) & ( F.char_length("ip_octets_expl") > 1 ) ) |
        ( F.col("ip_octets_expl").cast("int") > 255 )

     )
)
logs_ip_octets_df.show(truncate=False)

+------+-------------+-----------+------------------+--------------+
|log_id|ip           |status_code|ip_octets_arr     |ip_octets_expl|
+------+-------------+-----------+------------------+--------------+
|2     |256.1.2.3    |404        |[256, 1, 2, 3]    |256           |
|3     |192.168.001.1|200        |[192, 168, 001, 1]|001           |
|5     |192.168.1    |500        |[192, 168, 1]     |192           |
|5     |192.168.1    |500        |[192, 168, 1]     |168           |
|5     |192.168.1    |500        |[192, 168, 1]     |1             |
|6     |256.1.2.3    |404        |[256, 1, 2, 3]    |256           |
|7     |192.168.001.1|200        |[192, 168, 001, 1]|001           |
+------+-------------+-----------+------------------+--------------+



In [37]:
res_df = (
    logs_ip_octets_df
    .groupBy("ip")
    .agg( F.count_distinct( F.col("log_id") ).alias("invalid_count") )
    .orderBy( F.col("invalid_count").desc(), F.col("ip").desc() )
)

res_df.show(truncate=False)

+-------------+-------------+
|ip           |invalid_count|
+-------------+-------------+
|256.1.2.3    |2            |
|192.168.001.1|2            |
|192.168.1    |1            |
+-------------+-------------+



In [42]:
def validate_result(calculated_df: DataFrame, expected_df: DataFrame):
    calculated_df_data = calculated_df.sort(calculated_df.columns).collect()
    expected_df_data = expected_df.sort(expected_df.columns).collect()

    assert calculated_df_data == expected_df_data, "❌ Data Mismatched"
    print("✅ Data Matched")

    print("🎉 Test Case Passed !!!")

In [43]:
validate_result(calculated_df=res_df, expected_df=output_df)

✅ Data Matched
🎉 Test Case Passed !!!
