<a href="https://colab.research.google.com/github/vazquezsampedro/Aircraft_Valuation_TFM/blob/main/DataGov_Framework.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Data Quality & Governance Framework

In [55]:
!pip install pyspark



In [56]:
from pyspark.sql import SparkSession
import os
from pyspark.sql.functions import to_date, col
import pandas as pd

In [57]:
spark = SparkSession.builder \
    .appName("FrameWorkDataGovernance") \
    .getOrCreate()

In [58]:
class CSVHandler:
    def __init__(self, spark_session):
        self.spark = spark_session
    def show_dataframe(self, dataframe):
        dataframe.show()
    def show_headers_from_file_path(self, file_path):
        file_path = os.path.abspath(file_path)
        if not os.path.exists(file_path):
            raise FileNotFoundError(f"El archivo {file_path} no existe.")
        dataframe = self.spark.read.csv(file_path, header=True, sep=";", encoding="latin1") #encoding added
        print(dataframe.columns)
    def csv_reader(self, file_path):
      try:
        pandas_df = pd.read_csv(file_path, sep=';', encoding="latin1") #encoding added
        return pandas_df.head(10)
      except FileNotFoundError:
        print(f"Error: File not found at {file_path}")
        return None
      except pd.errors.ParserError:
        print(f"Error: Could not parse the CSV file at {file_path}. Check the file format and separator.")
        return None
      except Exception as e:
        print(f"An unexpected error occurred: {e}")
        return None


In [59]:
class DataQualityRules:

    def __init__(self, spark_session):
        self.spark = spark_session

    # Schema = Ventas
    def is_first_column_dates(self, dataframe):
        try:
            id_column = dataframe.columns[0]
            transaction_date_column = dataframe.columns[4]

            dataframe_with_dates = dataframe.withColumn("is_date", to_date(col(transaction_date_column), "dd/MM/yyyy"))
            invalid_dates = dataframe_with_dates.filter(col("is_date").isNull() & col(transaction_date_column).isNotNull()).select(id_column, transaction_date_column).collect()
            for row in invalid_dates:
                print(f"Para {row[id_column]}, ha fallado el siguiente campo '{row[transaction_date_column]}'")
            if len(invalid_dates) == 0:
                return True
            return False
        except Exception as e:
            print(f"Error al verificar si la columna 'TransactionDate' contiene fechas: {e}")
            return False

    def validate_payment_method(self, dataframe):
        payment_column = dataframe.columns[3]
        id_column = dataframe.columns[0]
        valid_methods = ["Credit Card", "Debit Card", "Bank Transfer"]

        invalid_rows = dataframe.filter(~col(payment_column).isin(valid_methods))
        invalid_rows.select(id_column, payment_column).collect()

        for row in invalid_rows.collect():
            print(f"{row[id_column]} = {row[payment_column]}, Payment Method is incorrect")

    # Schema = Logistica
    def validate_stock_quantity(self, dataframe):
        try:
            id_column = dataframe.columns[0]
            stock_column = dataframe.columns[4]

            low_stock_rows = dataframe.filter(col(stock_column) < 100)
            for row in low_stock_rows.collect():
                print(f"Para {row[id_column]}, el número de productos de stock es bajo {row[stock_column]}")
        except Exception as e:
            print(f"Error al verificar la cantidad de stock: {e}")

    def validate_delivery_status(self, dataframe):
        try:
            id_column = dataframe.columns[0]
            status_column = dataframe.columns[11]

            not_delivered_rows = dataframe.filter(col(status_column) != "Delivered")
            for row in not_delivered_rows.collect():
                print(f"'{row[id_column]}' aún no se ha entregado - El estado es '{row[status_column]}'")
        except Exception as e:
            print(f"Error al verificar el estado de entrega: {e}")

In [60]:
csv_handler = CSVHandler(spark)

file_path = '/content/Marzo2025_Ventas.csv'
head_of_csv = csv_handler.csv_reader(file_path)

print(head_of_csv.head(10))

      TransactionID          OrderID   CustomerName PaymentMethod  \
T001           1001       Juan Pérez    Credit Card    01/03/2025   
T002           1002       Ana García         PayPal    02/03/2025   
T003           1003    Luis Martínez     Debit Card         error   
T004           1004      María López    Credit Card    04/03/2025   
T005           1005    Pedro Sánchez  Bank Transfer    05/03/2025   
T006           1006  Laura Fernández    Credit Card    06/03/2025   
T007           1007      Carlos Ruiz         PayPal    07/03/2025   
T008           1008      Elena Gómez     Debit Card    08/03/2025   
T009           1009    Javier Torres  Bank Transfer    09/03/2025   
T010           1010       Marta Díaz    Credit Card    10/03/2025   

      TransactionDate  Amount  Tax  Discount NetAmount        PaymentStatus  \
T001             2400     240  240      2160      Paid      123 Calle Falsa   
T002              800      80   40       760      Paid     456 Avenida Real   
T00

In [66]:
# Instance for Class DataQualityRules over FilePath
data_quality_rules = DataQualityRules(spark)
file_path = '/content/Marzo2025_Ventas.csv'

try:
  dataframe = spark.read.csv(file_path, header=True, sep=";", encoding="latin1")
  is_first_column_dates = data_quality_rules.is_first_column_dates(dataframe)
  print(""
  "Regla TransactionDate ejecutada")
except Exception as e:
  print(f"Error al leer el archivo CSV: {e}")

try:
  dataframe = spark.read.csv(file_path, header=True, sep=";", encoding="latin1")
  data_quality_rules.validate_payment_method(dataframe)
  print(""
  "Regla Payment Method ejecutada")

except Exception as e:
  print(f"Error al leer el archivo CSV: {e}")

Para T003, ha fallado el siguiente campo 'error'
Regla TransactionDate ejecutada
T002 = PayPal, Payment Method is incorrect
T007 = PayPal, Payment Method is incorrect
Regla Payment Method ejecutada
