<a href="https://colab.research.google.com/github/ugoGS/Py/blob/main/scenarios.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Initial configuration in Google Colab

In [1]:
# Instalar PySpark y findspark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!pip install pyspark
!pip install -q findspark



# Java and Spark configuration

In [2]:
import os
import findspark

# Configurar la ruta de Java
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

# Inicializar findspark
findspark.init()

# Mount drive

In [3]:
from google.colab import drive

drive.mount('/content/drive')

Mounted at /content/drive


# Create Spark Session & Import Libraries

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_extract, col, lit, concat_ws, when, length, asc, desc, monotonically_increasing_id
import json
import pandas as pd
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType, DateType
from pyspark.sql import functions as F
from datetime import datetime


# Crear una sesión de Spark
spark = SparkSession.builder.appName("proof of concept").getOrCreate()
spark.conf.set("spark.sql.debug.maxToStringFields", 1000)



# Functions related to Partitions

In [5]:
def show_partitions(path_file):

  # Leer el archivo parquet
  parquet_df = spark.read.parquet(path_file)

  # Añadir una columna con el nombre del archivo (que contiene la partición)
  partitions_df = parquet_df.withColumn("file_name", F.input_file_name())

  # Extraer la partición 'process_datetime' del nombre del archivo
  partitions_df = partitions_df.withColumn("process_datetime", F.regexp_extract(F.col("file_name"), "process_datetime=([^/]+)", 1))

  # Mostrar las particiones únicas
  partitions_df.select("process_datetime").distinct().show(truncate=False)

In [6]:
def get_df_last_partition(zone_path, table_name, partition_column_name):

  df = spark.read.parquet(f'{zone_path}/{table_name}')
  last_partition = df.agg({partition_column_name: "max"}).collect()[0][0]
  df_last_partition = df.filter(col(partition_column_name) == last_partition)

  return df_last_partition


In [7]:
def read_excel_file_and_generate_partition(excel_file_name, sheet_name_to_load, columns_to_select, column_mapping, table_schema, skip_rows, destination_path, destination_table, process_datetime, table_id):

  # Cargar la hoja del excel a un DataFrame de pandas
  df_pandas = pd.read_excel(excel_file_name, sheet_name=sheet_name_to_load, skiprows=skip_rows, dtype=str, na_values=[""], keep_default_na=False)

  # Reemplazar valores nulos por cadenas vacías
  #df_pandas.fillna("", inplace=True)

  # Procesar columnas de fecha para mantener el formato original
  #df_pandas['start_up_date'] = pd.to_datetime(df_pandas['start_up_date'], dayfirst=True, errors='coerce').dt.strftime('%d/%m/%Y')
  #df_pandas['survey_date'] = pd.to_datetime(df_pandas['survey_date'], dayfirst=True, errors='coerce').dt.strftime('%d/%m/%Y')

  # Seleccionar las columnas a ser utilizadas
  df_pandas = df_pandas[columns_to_select]

  # Renombrar las columnas en el DataFrame de Pandas usando el mapeo
  df_pandas = df_pandas.rename(columns=column_mapping)

  # Convertir el DataFrame de Pandas (con las columnas renombradas) a un DataFrame de PySpark con el esquema definido
  df_spark = spark.createDataFrame(df_pandas, schema=table_schema)

  # Agregar el campo 'process_datetime' con el valor actual
  df_spark = df_spark.withColumn("process_datetime", F.lit(process_datetime))

  if (len(table_id) != 0):
    # Agregar el campo 'unique_id' basada en el ID incremental y el valor de process_datetime
    df_spark = df_spark.withColumn("id", concat_ws("_", lit(table_id), lit(process_datetime), monotonically_increasing_id()))
  else:
    # Agregar el campo 'unique_id' basada en el ID incremental y el valor de process_datetime
    df_spark = df_spark.withColumn("id", concat_ws("_", lit(process_datetime), monotonically_increasing_id()))

  # Agregar la partición al archivo parquet
  df_spark.write.mode('append').partitionBy('process_datetime').parquet(f'{destination_path}/{destination_table}')




#Get files from trusted zone

In [8]:
source_path = "/content/drive/MyDrive/scenarios/trusted/"

#show_partitions(f'{source_path}/main_template')

df_main_template_trusted = get_df_last_partition(source_path, "main_template", "process_datetime")
df_sap_updates_trusted = get_df_last_partition(source_path, "sap_updates", "process_datetime")
df_gis_updates_trusted = get_df_last_partition(source_path, "gis_updates", "process_datetime")
df_sap_equipment_report_trusted = get_df_last_partition(source_path, "sap_equipment_report", "process_datetime")


#Schema Log table definition

In [9]:
from pyspark.sql.functions import lit, current_timestamp, first, row_number
from pyspark.sql.types import IntegerType, StringType, StructType, StructField, TimestampType

zone_path = "/content/data_lake/log/"

# Lucio's schema
# new_error = {
#         "ID": next_id,
#         "ID_DATA": id_data,
#         "ERROR_TYPE": error_type,
#         "ERROR_DESC": error_desc,
#         "COLUMN_NAME": column_name,
#         "PROCESS_DATETIME": process_datetime,
#         "INVALID_VALUE": invalid_value,
#         "STATE": state,
#         "OPERATION": operation,
#         "NEW_VALUE": new_value,
#         "DATE_TIME": pd.Timestamp.now()
#     }

schema_log = StructType([
    StructField("id", LongType(), False),
    StructField("id_data", StringType(), True),
    StructField("error_type", StringType(), True),
    StructField("error_desc", StringType(), True),
    StructField("column_name", StringType(), True),
    StructField("invalid_value", StringType(), True),
    StructField("new_value", StringType(), True),
    StructField("state", StringType(), True),
    StructField("operation", StringType(), True),
    StructField("date_time", TimestampType(), True),
    StructField("process_datetime", LongType(), True)
])


#Dummy data (Add columns to DFs)

In [42]:
#Create a dummy row
from pyspark.sql import Row

schema_sap_updates = StructType([
    StructField('object_type', StringType(), True),
    StructField('type_of_sap_update', StringType(), True),
    StructField('sap_discrepancy', StringType(), True),
    StructField('additional_object_type_notes_and_observations', StringType(), True),
    StructField('floc', StringType(), True),
    StructField('floc_description', StringType(), True),
    StructField('description_of_object_type', StringType(), True),
    StructField('sap_equipment_number', LongType(), True),
    StructField('cu_id', StringType(), True),
    StructField('cyme_id', StringType(), True),
    StructField('circuit_number', IntegerType(), True),
    StructField('nyseg_line_number', StringType(), True),
    StructField('equipment_category', StringType(), True),
    StructField('mid', StringType(), True),
    StructField('sort_field_pole_number', StringType(), True),
    StructField('opco', StringType(), True),
    StructField('address_number', StringType(), True),
    StructField('street', StringType(), True),
    StructField('city', StringType(), True),
    StructField('state', StringType(), True),
    StructField('postal_code', StringType(), True),
    StructField('main_work_center', IntegerType(), True),
    StructField('service_center', StringType(), True),
    StructField('start_up_date', DateType(), True),
    StructField('system_status', StringType(), True),
    StructField('status_for_users', StringType(), True),
    StructField('changed_on', DateType(), True),
    StructField('construction_year', IntegerType(), True),
    StructField('construction_month', IntegerType(), True),
    StructField('technical_id_work_order', StringType(), True),
    StructField('survey_date', DateType(), True),
    StructField('proxy_vert', StringType(), True),
    StructField('phantom_location', StringType(), True),
    StructField('long_text', StringType(), True),
    StructField('manufacturer', StringType(), True),
    StructField('model_number', StringType(), True),
    StructField('manufacturer_serial_number', StringType(), True),
    StructField('nameplate_image_captured', StringType(), True),
    StructField('ue_capsg_capacitor_type', StringType(), True),
    StructField('ue_capsg_circuit_switch_number', StringType(), True),
    StructField('ue_capsg_kvar', StringType(), True),
    StructField('ue_capsg_nominal_voltage_rating', StringType(), True),
    StructField('ue_capsg_number_of_bushings', StringType(), True),
    StructField('ue_capsg_number_of_phases', StringType(), True),
    StructField('ue_capsg_public_or_private', StringType(), True),
    StructField('ue_capsg_scada_controlled', StringType(), True),
    StructField("ue_capsg_status", StringType(), True),
    StructField('ue_cbank_capacitor_type', StringType(), True),
    StructField('ue_cbank_circuit_switch_number', StringType(), True),
    StructField('ue_cbank_controlled', StringType(), True),
    StructField('ue_cbank_kvar_total', StringType(), True),
    StructField('ue_cbank_nominal_voltage_rating', StringType(), True),
    StructField("ue_cbank_number_of_capacitors", StringType(), True),
    StructField('ue_cbank_number_of_phases', StringType(), True),
    StructField('ue_cbank_public_or_private', StringType(), True),
    StructField('ue_cbank_scada_controlled', StringType(), True),
    StructField('ue_cbank_status', StringType(), True),
    StructField('ue_condr_conductor_length', StringType(), True),
    StructField('ue_condr_conductor_size', StringType(), True),
    StructField('ue_condr_conductor_type', StringType(), True),
    StructField('ue_condr_insulation_type', StringType(), True),
    StructField('ue_condr_neutral_material', StringType(), True),
    StructField('ue_condr_neutral_size', StringType(), True),
    StructField('ue_condr_nominal_voltage_rating', StringType(), True),
    StructField('ue_condr_location_oh_or_ug', StringType(), True),
    StructField('ue_condr_primary_or_secondary', StringType(), True),
    StructField('ue_condr_primary_conductor_material', StringType(), True),
    StructField('ue_condr_public_length', StringType(), True),
    StructField('ue_condr_public_or_private', StringType(), True),
    StructField('ue_condr_trailing_span_length', StringType(), True),
    StructField('ue_condr_trailing_span_location', StringType(), True),
    StructField('ue_condr_trans_or_dist', StringType(), True),
    StructField('ue_ctout_character_of_construction', StringType(), True),
    StructField('ue_ctout_cutout_rating', StringType(), True),
    StructField('ue_ctout_fuse_size', StringType(), True),
    StructField('ue_ctout_fuse_type', StringType(), True),
    StructField('ue_ctout_material', StringType(), True),
    StructField('ue_ctout_nominal_voltage_rating', StringType(), True),
    StructField('ue_ctout_phase', StringType(), True),
    StructField('ue_ctout_public_or_private', StringType(), True),
    StructField('ue_ctout_state', StringType(), True),
    StructField('ue_ctout_type', StringType(), True),
    StructField('ue_disc_amp_rating', StringType(), True),
    StructField('ue_disc_circuit_switch_number', StringType(), True),
    StructField('ue_disc_number_of_phases', StringType(), True),
    StructField('ue_disc_phase_designation', StringType(), True),
    StructField('ue_disc_public_or_private', StringType(), True),
    StructField('ue_disc_switch_style', StringType(), True),
    StructField('ue_disc_type', StringType(), True),
    StructField('ue_pole_circuit_phase_label', StringType(), True),
    StructField('ue_pole_class', StringType(), True),
    StructField('ue_pole_opco_owner_percent', StringType(), True),
    StructField('ue_pole_owner', StringType(), True),
    StructField('ue_pole_owner_maintained', StringType(), True),
    StructField('ue_pole_pole_length', StringType(), True),
    StructField('ue_pole_pole_material', StringType(), True),
    StructField('ue_pole_pole_number', StringType(), True),
    StructField('ue_pole_between_pole_number_since', StringType(), True),
    StructField('ue_pole_between_pole_number_to', StringType(), True),
    StructField('ue_pole_pole_type', StringType(), True),
    StructField('ue_pole_public_or_private', StringType(), True),
    StructField('ue_pole_trans_or_dist', StringType(), True),
    StructField('ue_pole_treatment', StringType(), True),
    StructField('ue_recl_circuit_switch_number', StringType(), True),
    StructField('ue_recl_number_of_phases', StringType(), True),
    StructField('ue_recl_public_or_private', StringType(), True),
    StructField('ue_recl_recloser_acts_as', StringType(), True),
    StructField('ue_recl_scada_controlled', StringType(), True),
    StructField('ue_recl_trans_or_dist', StringType(), True),
    StructField('ue_reg_circuit_switch_number', StringType(), True),
    StructField('ue_reg_kva', StringType(), True),
    StructField('ue_reg_phase_designation', StringType(), True),
    StructField('ue_reg_public_or_private', StringType(), True),
    StructField('ue_reg_scada_controlled', StringType(), True),
    StructField('ue_reg_status', StringType(), True),
    StructField('ue_sectz_circuit_switch_number', StringType(), True),
    StructField('ue_sectz_number_of_phases', StringType(), True),
    StructField('ue_sectz_public_or_private', StringType(), True),
    StructField('ue_sectz_scada_controlled', StringType(), True),
    StructField('ue_ratio_location_oh_or_ug', StringType(), True),
    StructField('ue_ratio_number_of_phases', StringType(), True),
    StructField('ue_ratio_operating_voltage', StringType(), True),
    StructField('ue_ratio_phase_designation', StringType(), True),
    StructField('ue_ratio_primary_voltage_text', StringType(), True),
    StructField('ue_ratio_public_or_private', StringType(), True),
    StructField('ue_ratio_secondary_voltage_text', StringType(), True),
    StructField('ue_ratio_size_kva', StringType(), True),
    StructField('ue_ratio_status', StringType(), True),
    StructField('ue_ratio_subtype_transformer_type', StringType(), True),
    StructField('ue_swtch_circuit_switch_number', StringType(), True),
    StructField('ue_swtch_load_break', StringType(), True),
    StructField('ue_swtch_normal_position', StringType(), True),
    StructField('ue_swtch_number_of_phases', StringType(), True),
    StructField('ue_swtch_operating_voltage', StringType(), True),
    StructField('ue_swtch_phase_designation', StringType(), True),
    StructField('ue_swtch_public_or_private', StringType(), True),
    StructField('ue_swtch_rated_kv', StringType(), True),
    StructField('ue_swtch_state', StringType(), True),
    StructField('ue_swtch_scada_controlled', StringType(), True),
    StructField('ue_swtch_switch_type', StringType(), True),
    StructField('ue_swtgr_amperage', StringType(), True),
    StructField('ue_swtgr_fuse_type', StringType(), True),
    StructField('ue_swtgr_circuit_switch_number', StringType(), True),
    StructField('ue_swtgr_installation_type', StringType(), True),
    StructField('ue_swtgr_loadbreak_capability', StringType(), True),
    StructField('ue_swtgr_nameplate', StringType(), True),
    StructField('ue_swtgr_nominal_voltage_rating', StringType(), True),
    StructField('ue_swtgr_public_or_private', StringType(), True),
    StructField('ue_swtgr_switchgear_distribution_type', StringType(), True),
    StructField('ue_swtgr_trans_or_dist', StringType(), True),
    StructField('ue_swtgr_year_installed', StringType(), True),
    StructField('ue_xfmer_date_retired_abandoned', StringType(), True),
    StructField('ue_xfmer_dual_voltage', StringType(), True),
    StructField('ue_xfmer_kva_rating', StringType(), True),
    StructField('ue_xfmer_location_oh_or_ug', StringType(), True),
    StructField('ue_xfmer_number_of_phases', StringType(), True),
    StructField('ue_xfmer_phase_designation', StringType(), True),
    StructField('ue_xfmer_primary_voltage_text', StringType(), True),
    StructField('ue_xfmer_public_or_private', StringType(), True),
    StructField('ue_xfmer_secondary_voltage', StringType(), True),
    StructField('ue_xfmer_secondary_voltage_text', StringType(), True),
    StructField('ue_xfmer_size_kva', StringType(), True),
    StructField('ue_xfmer_transformer_type', StringType(), True),
    StructField('ue_xfmer_trans_type', StringType(), True),
    StructField('ue_xfmer_year_installed', StringType(), True),
    StructField('ue_vaulp_foundation_material', StringType(), True),
    StructField('ue_vaulp_opco_owner_percent', StringType(), True),
    StructField('ue_vaulp_public_or_private', StringType(), True),
    StructField('ue_vaulp_tr_number', StringType(), True),
    StructField('ue_vaulp_trans_or_dist', StringType(), True),
    StructField('id', StringType(), True),
    StructField('process_datetime', LongType(), True)
  ])

dummy_data = [
  Row(object_type="UE_CAPSG",
      type_of_sap_update="Add Equipment",
      sap_discrepancy="New equipment",
      additional_object_type_notes="Indicates abandoned in columnn FO",
      floc="9301-L0576-1510-0014-ED00047",
      floc_description="BOSTON T -1038 -42",
      description_of_object_type="TRANSFORMER,DIST,1PH,CSP 120/240 25KVA 4",
      sap_equipment_number=0,
      cu_id=None,
      cyme_id=None,
      circuit_number=3105301,
      nyseg_line_number=None,
      equipment_category=None,
      mid=None,
      sort_field_pole_number="PO=44",
      opco="NYSEG",
      address_number=None,
      street=None,
      city="BOSTON T",
      state=None,
      postal_code=None,
      main_work_center=51000,
      service_center="Lancaster Service Center",
      start_up_date=datetime.strptime("10/08/2018", "%d/%m/%Y").date(),
      system_status="INST",
      status_for_users="UNOP 4000",
      changed_on=datetime.strptime("10/08/2018", "%d/%m/%Y").date(),
      construction_year=2018,
      construction_month=None,
      technical_id_work_order="801000000000",
      survey_date=datetime.strptime("10/08/2018", "%d/%m/%Y").date(),
      proxy_vert=None,
      phantom_location=None,
      long_text=None,
      manufacturer="N/A",
      model_number="N/A",
      manufacturer_serial_number="N/A",
      nameplate_image_captured=None,
      ue_capsg_capacitor_type=None,
      ue_capsg_circuit_switch_number=None,
      ue_capsg_kvar=None,
      ue_capsg_nominal_voltage_rating=None,
      ue_capsg_number_of_bushings=None,
      ue_capsg_number_of_phases=None,
      ue_capsg_public_or_private=None,
      ue_capsg_scada_controlled=None,
      ue_capsg_status=None,
      ue_cbank_capacitor_type=None,
      ue_cbank_circuit_switch_number=None,
      ue_cbank_controlled=None,
      ue_cbank_kvar_total=None,
      ue_cbank_nominal_voltage_rating=None,
      ue_cbank_number_of_capacitors=None,
      ue_cbank_number_of_phases=None,
      ue_cbank_public_or_private=None,
      ue_cbank_scada_controlled=None,
      ue_cbank_status=None,
      ue_condr_conductor_length=None,
      ue_condr_conductor_size=None,
      ue_condr_conductor_type=None,
      ue_condr_insulation_type=None,
      ue_condr_neutral_material=None,
      ue_condr_neutral_size=None,
      ue_condr_nominal_voltage_rating=None,
      ue_condr_location_oh_or_ug=None,
      ue_condr_primary_or_secondary=None,
      ue_condr_primary_conductor_material=None,
      ue_condr_public_length=None,
      ue_condr_public_or_private=None,
      ue_condr_trailing_span_length=None,
      ue_condr_trailing_span_location=None,
      ue_condr_trans_or_dist=None,
      ue_ctout_character_of_construction=None,
      ue_ctout_cutout_rating=None,
      ue_ctout_fuse_size=None,
      ue_ctout_fuse_type=None,
      ue_ctout_material=None,
      ue_ctout_nominal_voltage_rating=None,
      ue_ctout_phase=None,
      ue_ctout_public_or_private=None,
      ue_ctout_state=None,
      ue_ctout_type=None,
      ue_disc_amp_rating=None,
      ue_disc_circuit_switch_number=None,
      ue_disc_number_of_phases=None,
      ue_disc_phase_designation=None,
      ue_disc_public_or_private=None,
      ue_disc_switch_style=None,
      ue_disc_type=None,
      ue_pole_circuit_phase_label=None,
      ue_pole_class=None,
      ue_pole_opco_owner_percent=None,
      ue_pole_owner=None,
      ue_pole_owner_maintained=None,
      ue_pole_pole_length=None,
      ue_pole_pole_material=None,
      ue_pole_pole_number=None,
      ue_pole_between_pole_number_since=None,
      ue_pole_between_pole_number_to=None,
      ue_pole_pole_type=None,
      ue_pole_public_or_private=None,
      ue_pole_trans_or_dist=None,
      ue_pole_treatment=None,
      ue_recl_circuit_switch_number=None,
      ue_recl_number_of_phases=None,
      ue_recl_public_or_private=None,
      ue_recl_recloser_acts_as=None,
      ue_recl_scada_controlled=None,
      ue_recl_trans_or_dist=None,
      ue_reg_circuit_switch_number=None,
      ue_reg_kva=None,
      ue_reg_phase_designation=None,
      ue_reg_public_or_private=None,
      ue_reg_scada_controlled=None,
      ue_reg_status=None,
      ue_sectz_circuit_switch_number=None,
      ue_sectz_number_of_phases=None,
      ue_sectz_public_or_private=None,
      ue_sectz_scada_controlled=None,
      ue_ratio_location_oh_or_ug=None,
      ue_ratio_number_of_phases=None,
      ue_ratio_operating_voltage=None,
      ue_ratio_phase_designation=None,
      ue_ratio_primary_voltage_text=None,
      ue_ratio_public_or_private=None,
      ue_ratio_secondary_voltage_text=None,
      ue_ratio_size_kva=None,
      ue_ratio_status=None,
      ue_ratio_subtype_transformer_type=None,
      ue_swtch_circuit_switch_number=None,
      ue_swtch_load_break=None,
      ue_swtch_normal_position=None,
      ue_swtch_number_of_phases=None,
      ue_swtch_operating_voltage=None,
      ue_swtch_phase_designation=None,
      ue_swtch_public_or_private=None,
      ue_swtch_rated_kv=None,
      ue_swtch_state=None,
      ue_swtch_scada_controlled=None,
      ue_swtch_switch_type=None,
      ue_swtgr_amperage=None,
      ue_swtgr_fuse_type=None,
      ue_swtgr_circuit_switch_number=None,
      ue_swtgr_installation_type=None,
      ue_swtgr_loadbreak_capability=None,
      ue_swtgr_nameplate=None,
      ue_swtgr_nominal_voltage_rating=None,
      ue_swtgr_public_or_private=None,
      ue_swtgr_switchgear_distribution_type=None,
      ue_swtgr_trans_or_dist=None,
      ue_swtgr_year_installed=None,
      ue_xfmer_date_retired_abandoned=None,
      ue_xfmer_dual_voltage=None,
      ue_xfmer_kva_rating=None,
      ue_xfmer_location_oh_or_ug=None,
      ue_xfmer_number_of_phases=None,
      ue_xfmer_phase_designation=None,
      ue_xfmer_primary_voltage_text=None,
      ue_xfmer_public_or_private=None,
      ue_xfmer_secondary_voltage=None,
      ue_xfmer_secondary_voltage_text=None,
      ue_xfmer_size_kva=None,
      ue_xfmer_transformer_type=None,
      ue_xfmer_trans_type=None,
      ue_xfmer_year_installed=None,
      ue_vaulp_foundation_material=None,
      ue_vaulp_opco_owner_percent=None,
      ue_vaulp_public_or_private=None,
      ue_vaulp_tr_number=None,
      ue_vaulp_trans_or_dist=None,
      id="02_20241129192612_8589934608",
      process_datetime=20241129192612
  )
]

dummy_df = spark.createDataFrame(dummy_data, schema_sap_updates)

#UE_RATIO Scenario 2: Step Transformer Assigned Wrong Object Type

In [11]:
#Pattern to extract "object_type" columns
pattern = r"Object type from (\w+) to (\w+)"

#df_scenario = df_scenario.filter(col("type_of_sap_update") == "Modify Equipment")

df_scenario = df_sap_updates_trusted.filter(col("sap_discrepancy").rlike(pattern))

df_scenario = df_scenario.withColumn("object_type", regexp_extract(col("sap_discrepancy"), pattern, 1)) \
       .withColumn("object_type_updated", regexp_extract(col("sap_discrepancy"), pattern, 2))

#Log table

df_scenario_log_data = df_scenario.select(F.monotonically_increasing_id().alias("id"), F.col("id").alias("id_data"), F.col("type_of_sap_update").alias("error_type"), F.col("sap_discrepancy").alias("error_desc"), F.lit("object_type").alias("column_name"),
                                     F.col("object_type").alias("invalid_value"), F.col("object_type_updated").alias("new_value"), F.lit("TRUSTED").alias("state"), F.lit("UPDATE").alias("operation"),
                                     F.current_timestamp().cast(TimestampType()).alias("date_time"), "process_datetime").collect()

df_scenario_log = spark.createDataFrame(df_scenario_log_data, schema=schema_log)

#Script ZIUSACHOBJTYPE - Change Object (CSV)

lsmw_ziusachobjtype = df_scenario.select(col("sap_equipment_number").alias("equnr"),
                              col("object_type_updated").alias("equart"),
                              "process_datetime")


df_scenario_log.show(truncate=False)
lsmw_ziusachobjtype.show()

df_scenario_log.write.mode('overwrite').partitionBy('process_datetime').parquet(f'{zone_path}/data_quality_log')
lsmw_ziusachobjtype.write.csv(f"{zone_path}/lsmw_ziusachobjtype", header=True, mode="overwrite")

'''
ZIUSACRATTCHEQ: Create Attach Equipment ​

  Reference equipment​

  Equipment Description​

  Start-Up Date Format MM/DD/YYYY​

  Construction Year​

  Construction Month​

  Functional Location​

  Sort Field​
'''

+----------+----------------------------+----------------+-------------------------------------+-----------+-------------+---------+-------+---------+-----------------------+----------------+
|id        |id_data                     |error_type      |error_desc                           |column_name|invalid_value|new_value|state  |operation|date_time              |process_datetime|
+----------+----------------------------+----------------+-------------------------------------+-----------+-------------+---------+-------+---------+-----------------------+----------------+
|0         |02_20241129192612_2         |Modify Equipment|Object type from UE_SWTCH to UE_CTOTS|object_type|UE_SWTCH     |UE_CTOTS |TRUSTED|UPDATE   |2024-12-10 00:31:14.264|20241129192612  |
|1         |02_20241129192612_3         |Modify Equipment|Object type from UE_SWTCH to UE_CTOTS|object_type|UE_SWTCH     |UE_CTOTS |TRUSTED|UPDATE   |2024-12-10 00:31:14.264|20241129192612  |
|2         |02_20241129192612_4         

'\nZIUSACRATTCHEQ: Create Attach Equipment \u200b\n\n  Reference equipment\u200b\n\n  Equipment Description\u200b\n\n  Start-Up Date Format MM/DD/YYYY\u200b\n\n  Construction Year\u200b\n\n  Construction Month\u200b\n\n  Functional Location\u200b\n\n  Sort Field\u200b\n'

#UE_CAPSG Scenario 1: Serialized Equipment in Field – Not in SAP​

In [56]:
df1 = df_sap_updates_trusted.union(dummy_df) #added columns
df2 = df_sap_equipment_report_trusted

df1.createOrReplaceTempView("df1")
df2.createOrReplaceTempView("df2")

#Select columns to script ZIUSACRKTYPEQ: Create Equipment Category K from Reference
#Find objects not in SAP

missing_records = spark.sql("""
    SELECT *
    FROM df1
    LEFT ANTI JOIN df2
    ON df1.floc = df2.floc
    AND df1.sap_equipment_number = df2.sap_equipment_number
    WHERE df1.object_type = 'UE_CAPSG'
""")

lsmw_ziusacrktypeq = spark.sql("""
    SELECT df1.floc, df1.sap_equipment_number, df1.object_type, df1.equipment_category, df1.start_up_date, df1.construction_month, df1.construction_year, df1.mid, df1.manufacturer_serial_number, df1.id
    FROM df1
    LEFT ANTI JOIN df2
    ON df1.floc = df2.floc AND df1.sap_equipment_number = df2.sap_equipment_number
    WHERE df1.object_type = 'UE_CAPSG'
""")


#Log table

df_scenario_log_data = missing_records.select(F.monotonically_increasing_id().alias("id"), F.col("id").alias("id_data"), F.col("type_of_sap_update").alias("error_type"), F.col("sap_discrepancy").alias("error_desc"), F.lit("sap_equipment_number").alias("column_name"),
                                     F.col("sap_equipment_number").alias("invalid_value"), F.lit("NEW_VALUE").alias("new_value"), F.lit("TRUSTED").alias("state"), F.lit("CREATE").alias("operation"),
                                     F.current_timestamp().cast(TimestampType()).alias("date_time"), "process_datetime").collect()

df_scenario_log = spark.createDataFrame(df_scenario_log_data, schema=schema_log)


#Script ZIUSACRKTYPEQ

df_scenario_log.show(truncate=False)
lsmw_ziusacrktypeq.show(truncate=False)

df_scenario_log.write.mode('overwrite').partitionBy('process_datetime').parquet(f'{zone_path}/data_quality_log')
lsmw_ziusacrktypeq.write.csv(f"{zone_path}/lsmw_ziusacrktypeq", header=True, mode="overwrite")

#nyseg_line_number, sort_field_pole_number, description_of_object_type, long_text

+-----------+----------------------------+-------------+-------------+--------------------+-------------+---------+-------+---------+-----------------------+----------------+
|id         |id_data                     |error_type   |error_desc   |column_name         |invalid_value|new_value|state  |operation|date_time              |process_datetime|
+-----------+----------------------------+-------------+-------------+--------------------+-------------+---------+-------+---------+-----------------------+----------------+
|25769803776|02_20241129192612_8589934608|Add Equipment|New equipment|sap_equipment_number|0            |NEW_VALUE|TRUSTED|CREATE   |2024-12-10 01:48:44.862|20241129192612  |
+-----------+----------------------------+-------------+-------------+--------------------+-------------+---------+-------+---------+-----------------------+----------------+

+----------------------------+--------------------+-----------+------------------+-------------+------------------+---------

#Non-Load Break Switch (UE_DISC) - Scenario 2: Disconnect Switch in Field, Not n Records: Non-Serialized Equipment - FLOC Exists

In [60]:
df1 = df_sap_updates_trusted.union(dummy_df) #added columns
df2 = df_sap_equipment_report_trusted

df1.createOrReplaceTempView("df1")
df2.createOrReplaceTempView("df2")

#Select columns to script ZIUSACRKTYPEQ: Create Equipment Category K from Reference
#Find objects not in SAP

missing_records = spark.sql("""
    SELECT *
    FROM df1
    LEFT ANTI JOIN df2
    ON df1.floc = df2.floc
    AND df1.sap_equipment_number = df2.sap_equipment_number
    WHERE df1.object_type = 'UE_DISC'
""")

#Script ZIUSACRKTYPEQ
lsmw_ziusacrktypeq = spark.sql("""
    SELECT df1.floc, df1.sap_equipment_number, df1.object_type, df1.equipment_category, df1.start_up_date, df1.construction_month, df1.construction_year, df1.mid, df1.manufacturer_serial_number, df1.id
    FROM df1
    LEFT ANTI JOIN df2
    ON df1.floc = df2.floc AND df1.sap_equipment_number = df2.sap_equipment_number
    WHERE df1.object_type = 'UE_DISC'
""")

#Script ZIUSAINACEQUI
lsmw_ziusainecequi = spark.sql("""
    SELECT df1.sap_equipment_number, df1.description_of_object_type, df1.long_text
    FROM df1
    LEFT ANTI JOIN df2
    ON df1.floc = df2.floc AND df1.sap_equipment_number = df2.sap_equipment_number
    WHERE df1.object_type = 'UE_DISC'
""")

#Log table

df_scenario_log_data = missing_records.select(F.monotonically_increasing_id().alias("id"), F.col("id").alias("id_data"), F.col("type_of_sap_update").alias("error_type"), F.col("sap_discrepancy").alias("error_desc"), F.lit("sap_equipment_number").alias("column_name"),
                                     F.col("sap_equipment_number").alias("invalid_value"), F.lit("NEW_VALUE").alias("new_value"), F.lit("TRUSTED").alias("state"), F.lit("CREATE").alias("operation"),
                                     F.current_timestamp().cast(TimestampType()).alias("date_time"), "process_datetime").collect()

df_scenario_log = spark.createDataFrame(df_scenario_log_data, schema=schema_log)

df_scenario_log.show(truncate=False)
lsmw_ziusacrktypeq.show(truncate=False)
lsmw_ziusainecequi.show(truncate=False)

df_scenario_log.write.mode('overwrite').partitionBy('process_datetime').parquet(f'{zone_path}/data_quality_log')
lsmw_ziusacrktypeq.write.csv(f"{zone_path}/lsmw_ziusacrktypeq", header=True, mode="overwrite")
lsmw_ziusacrktypeq.write.csv(f"{zone_path}/lsmw_ziusainecequi", header=True, mode="overwrite")

#nyseg_line_number, sort_field_pole_number, description_of_object_type, long_text

+----------+----------------------------+-------------+---------------+--------------------+-------------+---------+-------+---------+-----------------------+----------------+
|id        |id_data                     |error_type   |error_desc     |column_name         |invalid_value|new_value|state  |operation|date_time              |process_datetime|
+----------+----------------------------+-------------+---------------+--------------------+-------------+---------+-------+---------+-----------------------+----------------+
|8589934592|02_20241129192612_8589934605|Add Equipment|Provide details|sap_equipment_number|0            |NEW_VALUE|TRUSTED|CREATE   |2024-12-10 01:56:35.667|20241129192612  |
|8589934593|02_20241129192612_8589934606|Add Equipment|Provide details|sap_equipment_number|0            |NEW_VALUE|TRUSTED|CREATE   |2024-12-10 01:56:35.667|20241129192612  |
|8589934594|02_20241129192612_8589934607|Add Equipment|Provide details|sap_equipment_number|0            |NEW_VALUE|TRUS

#Cutouts (UE_CTOUT) - Scenario 1: Non-Serialized Equipment (Non Pre-Cap) on Record – Not in Field​

In [51]:
df1 = df_main_template_trusted
df2 = df_sap_equipment_report_trusted

df1.createOrReplaceTempView("df1")
df2.createOrReplaceTempView("df2")

#Compare Main Template vs SAP Equipment Report

missing_records = spark.sql("""
    SELECT *
    FROM df2
    LEFT ANTI JOIN df1
    ON df1.floc = df2.floc AND df1.sap_equipment_number = df2.sap_equipment_number
""")


#Log table

df_scenario_log_data = df_scenario.select(F.monotonically_increasing_id().alias("id"), F.col("id").alias("id_data"),
                                     F.lit("Remove Equipment").alias("error_type"), F.lit("Equipment not found in field during GMEP Field Survey. Equipment record removed from FLOC").alias("error_desc"),
                                     F.lit("N/A").alias("column_name"),
                                     F.col("sap_equipment_number").alias("invalid_value"), F.lit("DISMANTELED").alias("new_value"), F.lit("DISMANTELED").alias("state"), F.lit("DELETE").alias("operation"),
                                     F.current_timestamp().cast(TimestampType()).alias("date_time"), "process_datetime").collect()

df_scenario_log = spark.createDataFrame(df_scenario_log_data, schema=schema_log)

df_scenario_log.show(truncate=False)


+----------+----------------------------+----------------+-----------------------------------------------------------------------------------------+-----------+-------------+-----------+-----------+---------+-----------------------+----------------+
|id        |id_data                     |error_type      |error_desc                                                                               |column_name|invalid_value|new_value  |state      |operation|date_time              |process_datetime|
+----------+----------------------------+----------------+-----------------------------------------------------------------------------------------+-----------+-------------+-----------+-----------+---------+-----------------------+----------------+
|0         |02_20241129192612_2         |Remove Equipment|Equipment not found in field during GMEP Field Survey. Equipment record removed from FLOC|N/A        |100017077123 |DISMANTELED|DISMANTELED|DELETE   |2024-12-10 01:30:08.215|20241129192612  |
