In [0]:
from pyspark.sql.functions import col, current_timestamp, lit, to_timestamp, regexp_extract
from pyspark.sql.types import StructField,StructType, StringType,FloatType,LongType,TimestampType

bronze_source_table = "kpn_test.network.bronze_cablemodem_dt"

# ############################## TABLE CREATION ################################

cablemodem_bronze_schema = StructType([
    StructField("WM_FLAG", StringType(), True),
    StructField("CCER_DN", LongType(), True),
    StructField("CER_DN", LongType(), True),
    StructField("DOWNSTREAM_NAME", StringType(), True),
    StructField("HUB", StringType(), True),
    StructField("MAC_ADDR", StringType(), True),
    StructField("METRICS_DATE", StringType(), True),
    StructField("RX_POWER_DN", FloatType(), True),
    StructField("SNR_DN", FloatType(), True)
])
df = spark.createDataFrame([], cablemodem_bronze_schema)
df.write.mode("overwrite").saveAsTable("kpn_test.network.bronze_cablemodem_dt")

cablemodem_dt_schema = StructType([
    StructField("HUB", StringType()),
    StructField("MAC_ADDR", StringType()),
    StructField("METRICS_DATE", TimestampType()),
    StructField("DOWNSTREAM_NAME", StringType()),
    StructField("CCER_DN", LongType()),
    StructField("CER_DN", LongType()),
    StructField("SNR_DN", FloatType()),
    StructField("RX_POWER_DN", FloatType()),
    StructField("CRTD_DTTM", TimestampType()),
    StructField("CRTD_BY", StringType())
])

df = spark.createDataFrame([], cablemodem_dt_schema)
df.write.mode("overwrite").saveAsTable("kpn_test.network.cablemodem_dt")

rejection_cablemodem_dt_schema = StructType([
    StructField("HUB", StringType()),
    StructField("MAC_ADDR", StringType()),
    StructField("METRICS_DATE", TimestampType()),
    StructField("DOWNSTREAM_NAME", StringType()),
    StructField("CCER_DN", LongType()),
    StructField("CER_DN", LongType()),
    StructField("SNR_DN", FloatType()),
    StructField("RX_POWER_DN", FloatType()),
    StructField("DQ_STATUS", StringType()),
    StructField("CRTD_DTTM", TimestampType()),
    StructField("CRTD_BY", StringType())
])

df = spark.createDataFrame([], rejection_cablemodem_dt_schema)
df.write.mode("overwrite").saveAsTable("kpn_test.network.rejection_cablemodem_dt")

In [0]:
# Flag Records for Curation (Step 1)
df=spark.sql(f"SELECT * FROM {bronze_source_table}").where(col("wm_flag")=="N").withColumn("wm_flag", lit("NT"))

########## GOOD RECORDS ##########
# Define Data Quality Filters (Reusable for both good and bad tables)


good_condition=(col("MAC_ADDR") != "") & (regexp_extract(col("METRICS_DATE"), r"^[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}$", 0) == col("METRICS_DATE"))
bad_condition=((col("MAC_ADDR") == "") | ~(col("METRICS_DATE").rlike('^[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}')))

def dq_filters(df,col):
    return df.where(good_condition)

df_cablemode_good=dq_filters(df,good_condition)

# Insert into Curated Table (Good & Bad)
# def insert_curated_table(df, table_name, dq_status=""):
df_cablemode_good=df_cablemode_good.select(
    col("HUB"),
    col("MAC_ADDR"),
    to_timestamp(col("METRICS_DATE"), "yyyy-MM-dd HH:mm:ss").alias("METRICS_DATE"),
    col("DOWNSTREAM_NAME"),
    col("CER_DN").cast("bigint"),
    col("CCER_DN").cast("bigint"),
    col("SNR_DN").cast("float"),
    col("RX_POWER_DN").cast("float"),
    current_timestamp().alias("CRTD_DTTM"),
    lit("data_engineer_group").alias("CRTD_BY")
)
# df_cablemode_good.write.mode("append").saveAsTable("kpn_test.network.cablemodem_dt")
print(df_cablemode_good.show(2),df_cablemode_good.count())

########## BAD RECORDS ##########
df_cablemode_bad=dq_filters(df,bad_condition)

# Insert into Curated Table (Good & Bad)
# def insert_curated_table(df, table_name, dq_status=""):
df_cablemode_bad=df_cablemode_bad.select(
    col("HUB"),
    col("MAC_ADDR"),
    to_timestamp(col("METRICS_DATE"), "yyyy-MM-dd HH:mm:ss").alias("METRICS_DATE"),
    col("DOWNSTREAM_NAME"),
    col("CER_DN").cast("bigint"),
    col("CCER_DN").cast("bigint"),
    col("SNR_DN").cast("float"),
    col("RX_POWER_DN").cast("float"),
    lit("O").alias("DQ_STATUS"),
    current_timestamp().alias("CRTD_DTTM"),
    lit("data_engineer_group").alias("CRTD_BY")
)
# df_cablemode_bad.write.mode("append").saveAsTable("kpn_test.network.rejection_cablemodem_dt")
df_cablemode_bad.show(2),df_cablemode_bad.count()

+---+--------+------------+---------------+------+-------+------+-----------+---------+-------+
|HUB|MAC_ADDR|METRICS_DATE|DOWNSTREAM_NAME|CER_DN|CCER_DN|SNR_DN|RX_POWER_DN|CRTD_DTTM|CRTD_BY|
+---+--------+------------+---------------+------+-------+------+-----------+---------+-------+
+---+--------+------------+---------------+------+-------+------+-----------+---------+-------+

None 0
+---+--------+------------+---------------+------+-------+------+-----------+---------+---------+-------+
|HUB|MAC_ADDR|METRICS_DATE|DOWNSTREAM_NAME|CER_DN|CCER_DN|SNR_DN|RX_POWER_DN|DQ_STATUS|CRTD_DTTM|CRTD_BY|
+---+--------+------------+---------------+------+-------+------+-----------+---------+---------+-------+
+---+--------+------------+---------------+------+-------+------+-----------+---------+---------+-------+



(None, 0)

In [0]:
%sql
-- truncate table kpn_test.network.cablemodem_dt;
-- insert into kpn_test.network.bronze_cablemodem_dt
-- select * from kpn_bronze.network.cablemodem_dt;
-- select * from kpn_test.network.bronze_cablemodem_dt limit 100;

In [0]:
type(good_condition)

pyspark.sql.column.Column

In [0]:
def update_flag(table_name,column_name,new_value,old_vale):
    spark.sql('UPDATE {table_name} SET {column_name} = {new_value} WHERE {column_name} = {old_vale}')

In [0]:
# update_flag('kpn_test.network.bronze_cablemodem_dt','wm_flag','NT','N')
# update_flag('kpn_test.network.rejection_cablemodem_dt','DQ_STATUS','R','O')