## Data Quality Check Functions

In [1]:
from pyspark.sql import DataFrame, functions as F
from pyspark.sql.functions import *
from pyspark.sql.window import *

StatementMeta(, cc20a8bf-1a9d-430f-977f-180a4114a8bc, 3, Finished, Available, Finished)

In [2]:
# Function to detect outliers and replace them with the mode
def Outliers_Detection_Correction(df, columns):
    for column in columns:
        # Step 1: Calculate Q1 (25th percentile) and Q3 (75th percentile)
        q1 = df.approxQuantile(column, [0.25], 0.01)[0]
        q3 = df.approxQuantile(column, [0.75], 0.01)[0]

        # Step 2: Calculate IQR (Inter-Quartile Range)
        iqr = q3 - q1

        # Step 3: Define lower and upper bounds
        lower_bound = q1 - 1.5 * iqr
        upper_bound = q3 + 1.5 * iqr

        # Step 4: Add a column to flag outliers
        outlier_column = f"Outlier_{column}"
        df = df.withColumn(
            outlier_column,
            F.when(F.col(column).isNull(), "Unknown")
             .when((F.col(column) < lower_bound) | (F.col(column) > upper_bound), "Outlier")
             .otherwise("Regular")
        )

        # # Step 5: Calculate mode of the column excluding outliers
        # mode_value = (
        #     df.filter(F.col(outlier_column) == "Regular")
        #       .groupBy(column)
        #       .count()
        #       .orderBy(F.desc("count"))
        #       .first()
        # )
        # mode_value = mode_value[column] if mode_value else 0  # Handle case where mode doesn't exist

        # # Step 6: Add a column to replace outliers and nulls with the mode
        # corrected_column = f"Corrected_{column}"
        # df = df.withColumn(
        #     corrected_column,
        #     F.when(F.col(outlier_column) == "Outlier", mode_value)
        #      .when(F.col(column).isNull(), 0)
        #      .otherwise(F.col(column))
        # )

    return df

StatementMeta(, cc20a8bf-1a9d-430f-977f-180a4114a8bc, 4, Finished, Available, Finished)

In [3]:
def Solve_QualitativeDataIssues(dataframe: DataFrame, columns: list) -> DataFrame:
    transformed_df = dataframe

    for column in columns:
        # Step 1: Normalize the specified column
        transformed_df1 = transformed_df.withColumn(
            f"Normalized_{column}",
            lower(regexp_replace(col(column), "[^a-zA-Z0-9 ]", ""))
        )

        # Step 2: Categorization of Normalized Operators
        window_spec1 = Window.orderBy(f"Normalized_{column}")

        transformed_df = transformed_df1.withColumn(
            f"SpecialChar_Categorized_{column}",  
            concat(lit("Group-"),dense_rank().over(window_spec1)))  # As per Dec-30 discussion

        # Step 3: Get the first matching value in alphabetical order
        window_spec2 = Window.partitionBy(f"Normalized_{column}").orderBy(column)

        transformed_df = transformed_df.withColumn(
            f"Corrected_{column}",
            first(column).over(window_spec2)
        )

        # Step 4: Separate the first word from the Transformed column
        split_col = split(transformed_df[f"Normalized_{column}"], " ")

        transformed_df = transformed_df.withColumn(
            f"Normalized_{column}_FirstWord",
            split_col.getItem(0)
        )

        # Step 5: Make Correct Column into groups
        window_spec3 = Window.orderBy(f"Normalized_{column}_FirstWord")

        transformed_df = transformed_df.withColumn(
            f"Categorized_{column}",
            concat(lit("Group-"),dense_rank().over(window_spec3))
        )

    # Dynamically collect all columns
    all_columns = dataframe.columns  # Include all original columns

    corrected_columns =  [ f"SpecialChar_Categorized_{col}" for col in columns
                               ] + [f"Normalized_{col}_FirstWord" for col in columns
                          ] + [f"Corrected_{col}" for col in columns
                                    ] + [f"Categorized_{col}" for col in columns
    ]

    # Ensure resulting DataFrame includes all original and transformed columns
    final_columns = all_columns + corrected_columns
    return transformed_df.select(*final_columns)

StatementMeta(, cc20a8bf-1a9d-430f-977f-180a4114a8bc, 5, Finished, Available, Finished)

## Master Table

In [6]:
table1_path = "abfss://petronaspocfabric@onelake.dfs.fabric.microsoft.com/WellData2.Lakehouse/Tables/dbo/masterdata_xlsx"
table2_path = "abfss://petronaspocfabric@onelake.dfs.fabric.microsoft.com/WellData2.Lakehouse/Tables/dbo/masterdata_csv"
table3_path = "abfss://petronaspocfabric@onelake.dfs.fabric.microsoft.com/WellData2.Lakehouse/Tables/dbo/masterdata_pdf"

# Load each table into a DataFrame
df1 = spark.read.format("delta").load(table1_path)
df2 = spark.read.format("delta").load(table2_path)
df3 = spark.read.format("delta").load(table3_path)

# Combine the DataFrames using union
combined_df_master = df1.union(df2).union(df3)

# Solving Data Quality Issues  ["Well_Name", "County", "Oil_Field","Operator"]
clean_combined_df_master = Solve_QualitativeDataIssues(combined_df_master, ["Operator"])

clean_combined_df_master = clean_combined_df_master.dropDuplicates(subset=['API_WellNo'])
# display(clean_combined_df_master.limit(3))
clean_combined_df_master.write.format("delta").option('mergeSchema','true').mode("overwrite").saveAsTable("MasterData")

# spark.sql("select Operator, SpecialChar_Categorized_Operator,Categorized_Operator from MasterData").show(truncate=False)
spark.sql("select count(*) from MasterData").show(truncate=False)  ## Just showing the required columns - 24303

StatementMeta(, cc20a8bf-1a9d-430f-977f-180a4114a8bc, 8, Finished, Available, Finished)

+-----------------------------+-----------------------------+--------------------------------+--------------------+
|Operator                     |Corrected_Operator           |SpecialChar_Categorized_Operator|Categorized_Operator|
+-----------------------------+-----------------------------+--------------------------------+--------------------+
|Unknown                      |Unknown                      |Group-1389                      |Group-1128          |
|Unknown                      |Unknown                      |Group-1389                      |Group-1128          |
|Unknown                      |Unknown                      |Group-1389                      |Group-1128          |
|Unknown                      |Unknown                      |Group-1389                      |Group-1128          |
|Unknown                      |Unknown                      |Group-1389                      |Group-1128          |
|Beecroft, James              |Beecroft, James              |Group-84   

In [10]:
# from pyspark.sql.functions import col, when

# # Replace blank values with nulls
# clean_combined_df_master = clean_combined_df_master.select([
#     when(col(c) == "", None).otherwise(col(c)).alias(c) for c in clean_combined_df_master.columns
# ])

# # Fill nulls with 0
# filled_df = clean_combined_df_master.fillna(0)

# # Show the results
# display(filled_df.limit(6))

StatementMeta(, , , Waiting, , Waiting)

## Transactions Table

In [9]:
table11_path = "abfss://petronaspocfabric@onelake.dfs.fabric.microsoft.com/WellData2.Lakehouse/Tables/dbo/transactiondata_xlsx"
table12_path = "abfss://petronaspocfabric@onelake.dfs.fabric.microsoft.com/WellData2.Lakehouse/Tables/dbo/transactiondata_csv"
table13_path = "abfss://petronaspocfabric@onelake.dfs.fabric.microsoft.com/WellData2.Lakehouse/Tables/dbo/transactiondata_pdf"

# Load each table into a DataFrame
df11 = spark.read.format("delta").load(table11_path)
df12 = spark.read.format("delta").load(table12_path)
df13 = spark.read.format("delta").load(table13_path)

# Combine the DataFrames using union
combined_df_transactions = df11.union(df12).union(df13)
# combined_df_transactions = df12.union(df13)

combined_df_transactions = combined_df_transactions.withColumn(
    "Months",
    when(col("Month_Prod") == 1, "Jan")
    .when(col("Month_Prod") == 2, "Feb")
    .when(col("Month_Prod") == 3, "Mar")
    .when(col("Month_Prod") == 4, "Apr")
    .when(col("Month_Prod") == 5, "May")
    .when(col("Month_Prod") == 6, "Jun")
    .when(col("Month_Prod") == 7, "Jul")
    .when(col("Month_Prod") == 8, "Aug")
    .when(col("Month_Prod") == 9, "Sep")
    .when(col("Month_Prod") == 10, "Oct")
    .when(col("Month_Prod") == 11, "Nov")
    .when(col("Month_Prod") == 12, "Dec")
    .otherwise("Invalid Month")
)

columns_with_dollar = ["Oil_Prod", "Gas_Prod", "Water_Prod"]  # Replace with your column names

# Dynamically remove '$' sign and retain all other columns
for column in columns_with_dollar:
    if column in combined_df_transactions.columns:
        combined_df_transactions = combined_df_transactions.withColumn(
            column, 
            regexp_replace(col(column), "\\$", "").cast('int')
        )


combined_df_transactions1 = Outliers_Detection_Correction(combined_df_transactions,["Oil_Prod", "Gas_Prod", "Water_Prod"])

combined_df_transactions1 = combined_df_transactions1.withColumn(
    "Date",
    make_date(col("Year"), col("Month_Prod"), col("Day"))
)
# display(combined_df_transactions1.limit(7))

combined_df_transactions1.write.format("delta").option('mergeSchema','true').mode("overwrite").saveAsTable("TransactionData")
display(spark.sql("select * from TransactionData limit 10"))
spark.sql("select count(*) from TransactionData").show()

StatementMeta(, 5187c1fd-c531-492c-89ac-a6eae73ff77c, 11, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, d5386336-f216-49bc-ac39-6774140d0c71)

+--------+
|count(1)|
+--------+
|   25026|
+--------+



## Unauthorized Table

In [10]:
table21_path = "abfss://petronaspocfabric@onelake.dfs.fabric.microsoft.com/WellData2.Lakehouse/Tables/dbo/unauthorizeddata_xlsx"
table22_path = "abfss://petronaspocfabric@onelake.dfs.fabric.microsoft.com/WellData2.Lakehouse/Tables/dbo/unauthorizeddata_csv"
table23_path = "abfss://petronaspocfabric@onelake.dfs.fabric.microsoft.com/WellData2.Lakehouse/Tables/dbo/unauthorizeddata_pdf"

# Load each table into a DataFrame
df21 = spark.read.format("delta").load(table21_path)
df22 = spark.read.format("delta").load(table22_path)
df23 = spark.read.format("delta").load(table23_path)
# Combine the DataFrames using union
combined_df_unauthorized = df21.union(df22).union(df23)

# Display the combined DataFrame
# combined_df_unauthorized.show()
# combined_df_unauthorized.count()
combined_df_unauthorized.write.format("delta").option('mergeSchema','true').mode("overwrite").saveAsTable("UnauthorizedData")
spark.sql("select count(*) from UnauthorizedData").show()

StatementMeta(, 5187c1fd-c531-492c-89ac-a6eae73ff77c, 12, Finished, Available, Finished)

+--------+
|count(1)|
+--------+
|       0|
+--------+



## Capturing the incorrect data

In [25]:
finaldf = clean_combined_df_master.join(combined_df_transactions1,on='API_WellNo',how='inner')
finaldf.count()

StatementMeta(, 5187c1fd-c531-492c-89ac-a6eae73ff77c, 32, Finished, Available, Finished)

25026

In [32]:
outlierData = finaldf.filter(
    (col('Outlier_Oil_Prod')=='Outlier') |
    (col('Outlier_Gas_Prod')=='Outlier') |
    (col('Outlier_Water_Prod')=='Outlier')
)
print(outlierData.count())

nullData = finaldf.filter(
    (col('Outlier_Oil_Prod')=='Unknown') |
    (col('Outlier_Gas_Prod')=='Unknown') |
    (col('Outlier_Water_Prod')=='Unknown')
)
print(nullData.count())

incorrect = nullData.union(outlierData)
incorrect.count()

incorrect.coalesce(1) \
    .write \
    .format('csv') \
    .option('header', 'true') \
    .option('inferSchema', 'true') \
    .mode('overwrite') \
    .save('Files/Incorrect_Files/incorrectfile.csv')

incorrect.write.mode('overwrite').saveAsTable('incorrectfile_csv')

StatementMeta(, 5187c1fd-c531-492c-89ac-a6eae73ff77c, 39, Finished, Available, Finished)

7998
14


# Move the processed files to Archive folder

In [6]:
# import shutil
# import os

# source_dir = "/lakehouse/default/Files"
# archive_dir = "/lakehouse/default/Files/Archive"

# def move_to_archive():
#     for file in os.listdir(source_dir):
#         file_path = os.path.join(source_dir, file)
#         if os.path.isfile(file_path):
#             shutil.move(file_path, archive_dir)

# move_to_archive()

# print("Files moved to archive.")

StatementMeta(, 937e3f1d-4927-4e7d-8751-a9f51557bbda, 8, Finished, Available, Finished)

Files moved to archive.


In [1]:
# import shutil
# import os

# source_dir = "/lakehouse/default/Files"
# archive_dir = "/lakehouse/default/Files/Archive"

# def move_from_archive():
#     for file in os.listdir(archive_dir):
#         file_path = os.path.join(archive_dir, file)
#         if os.path.isfile(file_path):
#             shutil.move(file_path, source_dir)

# # Move files from archive to source
# move_from_archive()

# print("Files have been moved back from archive to source.")


StatementMeta(, 0ce2bde9-4373-4ab0-b336-d50fdf1494ca, 3, Finished, Available, Finished)

Files have been moved back from archive to source.
