In [1]:
%run "./ADP_Farmatic_Def"

In [2]:
%run "../Libraries/ADP_Spain_MDM_Def_Sellin"

In [3]:
#################################################################################
"""Process Sell Out file data and converts it into canonical format

"""
 #Who                 When           What
 #Ana Perez           05/12/2018     Initial version
 #Victor Salesa       25/01/2019     Commented out call to QA_UPDATE_CTL_PROCESS_FILE_DATA as no longer exists
 #Victor Salesa       25/01/2019     Removed  %run "../Libraries/ADP_QA"
################################################################################
def ProcessPharmaticFilesDataSellin (filepath,filename,errorpath,processedpath,debug=False):

  """Process Sell Out file data and converts it into canonical format

      Parameters:
      filepath                      -- path of file to be processed
      filename                      -- file name of file to be processed
      errorpath                     -- path where the file must be saved if it has number of fields incorrect or the process generates an exception
      processedpath                 -- path where the file must be saved after validation and enrichment
      debug                         -- Optional parameter. Default value=False. When this parameter is True, it prints some trace messages

      Return:
        fileProcessedStatus: OK, OK FMT     (processed with format field errorneous), 
                                 OK ENR     (processed with enrichment fields errorneous)
                                 OK FMT ENR (processed with format and enrichment fields errorneous)
                                 STR        (moved to errorpath because it has number of fields incorrect)
                                 EX         (the process generates an exception; the file continues in tobeprocessed path)
                                 EX: error saving Canonical File (the process cannot save the canonical file; the file continues in tobeprocessed path)

      Example 1:
        ProcessPharmaticFilesDataSellIn(
          "dbfs:/mnt/ds/tobeprocessed/pharmatic/605522930120171106GPR.txt",
          "dbfs:/mnt/ds/errorprocess/pharmatic/",
          "dbfs:/mnt/ds/canonical/pharmatic/"
          )

  """
  #Who                 When           What
  try:
    
    
      processDate = datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S') # Date and Time of the begining to this process
      fileProcessedStatus = "EX"                                    # Status of process To be returned
      path_fact = "PR/"                                             # Folder fo this business area
      business_area = "PR"                                          # Business area
      landingTimeStamp = filename[-18:-4]                           # Landing date extracted from File Name received as parameter
      fileNameExtract = filename[0:21]                              # Original file name extracted from File Name received as parameter
      pharmacyCodeEx = filename[0:4]                                      # parmacyCode extracted from File Name received as parameter
      origExtension = filename[len(filename)-4:len(filename)]       # Original extension extracted from File Name received as parameter 
      lenSpecRow = __SP_PR_TOTAL_LENGHTS__                             # Length of detail at Spec 

      
      #Initialize df_QA_PROCESS_file: General information about this file processing
      rdd = sc.parallelize([(fileNameExtract+origExtension
                       ,landingTimeStamp
                       ,__PMS_FARMATIC_CODE__
                       ,__PMS_FARMATIC_COUNTRY__
                       ,pharmacyCodeEx
                       ,business_area
                       ,processDate
                       ,processDate
                       ,0
                       ,""
                       ,"")])
      df_QA_PROCESS_file = rdd.toDF(["FILE_NAME","LANDING_DATE","PMS_CODE","COUNTRY_CODE","PHARMACY_CODE","BUSINESS_AREA","START_DATE","END_DATE","STATUS","MESSAGE_TEXT","ERROR_CODE"])  
      df_QA_PROCESS_file = (df_QA_PROCESS_file
                 .withColumn ("LANDING_DATE", unix_timestamp(lit(landingTimeStamp), "yyyyMMddHHmmss").cast(TimestampType()))
                 .withColumn ("START_DATE", to_timestamp(lit(processDate), 'yyyy-MM-dd HH:mm:ss')) 
                 .withColumn ("END_DATE",col("START_DATE")) 
                )
        
      #Read the File Content
      df = spark.read.text(filepath+filename)
      
      ################################################################################################################################################
      #  FILE CONTENT MANAGEMENT
      ################################################################################################################################################
      if debug==True: print("["+datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')+"]"+"["+filename+"]"+"[1][Start Process and Read Header]")
      
      # DataFrame Filtering only the Header row
      df_header = (df
       .transform(SliceDFColumn("value",['RecordType'],[__RECORDTYPE_LEN__])).filter(col("RecordType") == "H").drop("RecordType") 
       .transform(SliceDFColumn("value",__SP_HEADER_COLUMN_NAMES__,__SP_HEADER_LENGHTS__))
       .drop("value")
      )
      
      df_QA_PROCESS_file = (df_QA_PROCESS_file.withColumn ("PHARMACY_CODE", lit(df_header.rdd.first().PharmacyID)))
      
      if debug==True: print("["+datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')+"]"+"["+filename+"]"+"[2][Read Detail]")
     
      # DataFrame Detail: filtering the detail rows, adding some general file data
      df_detail = (df
       .transform(SliceDFColumn("value",['RecordType'],[__RECORDTYPE_LEN__])).filter(col("RecordType") == "D").drop("RecordType")
       .withColumn ("FILE_NAME", lit(fileNameExtract+".TXT"))
       .withColumn ("LANDING_DATE", lit(landingTimeStamp))
       .withColumn ("PharmacyID", lit(df_header.rdd.first().PharmacyID))
       .withColumn ("FileDate", lit(df_header.rdd.first().FileDate))
       .withColumn ("ZIPCode", lit(df_header.rdd.first().ZIPCode))
       .withColumn ("ExternalPharmacyID", lit(df_header.rdd.first().ExternalPharmacyID))
       .withColumn("FILE_LINE_NUM",monotonically_increasing_id()+1)
       .transform(SliceDFColumn("value",__SP_PR_DETAIL_COLUMN_NAMES__,__SP_PR_DETAIL_LENGHTS__))
       .withColumn ("lenRow_ok", (length(col("value"))) == lit(lenSpecRow))
       .drop("value")
      )
      
      if debug==True: print("["+datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')+"]"+"["+filename+"]"+"[3][Check if lenrow errors]")
      
      #if some row has not the correct length, move to ErrorProcessed Folder
      if df_detail.filter(df_detail.lenRow_ok == False).count() > 0:
        ################################################################################################################################################
        #  VALIDATION NUMBER OF FIELDS
        ################################################################################################################################################
        processDateEnd = datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')
        # DataFrame Error df_QA_PROCESS_file
        df_QA_PROCESS_file = (df_QA_PROCESS_file
                             .withColumn ("END_DATE", to_timestamp(lit(processDateEnd), 'yyyy-MM-dd HH:mm:ss')) 
                             .withColumn ("STATUS", lit(-1))
                             .withColumn ("MESSAGE_TEXT", lit('{"INVALID_STRUCTURE":"File does not have a valid structure"}'))
                             .withColumn ("ERROR_CODE", lit('{"INVALID_STRUCTURE":"-1"}'))
                            )
#         if (QA_UPDATE_CTL_PROCESS_FILE_DATA(df_QA_PROCESS_file, debug=debug) == True):
#           distributeFile_Def(filepath+filename, "", errorpath+path_fact+filename, False) #Copy File to error folder
#           fileProcessedStatus = "EX:STR"
#         else:
#           fileProcessedStatus = "EX:STR FAILED"
        #end if (QA_UPDATE_CTL_PROCESS_FILE_DATA
        
        ################################################################################################################################################
        #  END - VALIDATION NUMBER OF FIELDS
        ################################################################################################################################################
      else:
        ################################################################################################################################################
        #  VALIDATION DATA FIELDS AND CREATE CANONICAL DATA FILE
        ################################################################################################################################################
        if debug==True: print("["+datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')+"]"+"["+filename+"]"+"[4][Content field process]")
        
        spark.conf.set("spark.sql.crossJoin.enabled", "true")
        
        ################################################################################################################################################
        #  VALIDATION FORMAT DATA FIELDS
        ################################################################################################################################################
        
        df_detail = (df_detail
                      .drop("lenRow_ok")
                       # Validate OperationLine
                      .withColumn("OperationLine_Err",   when(((udf_isDigit_sql("OperationLine")==True) & (length("OperationLine")==__PR_OPERATIONLINE_LEN__)) == True , 0).otherwise(-1))
#                       .withColumn("OperationLine_Err2",  when((isDigit("OperationLine")==False),  ('"' + 'DATA_TYPE' + '":' + '"1"')).otherwise("")) 
#                       .withColumn("OperationLine_Err2",   
#                                   when(
#                                       ((length("OperationLine")==__PR_OPERATIONLINE_LEN__)==False), 
#                                         (when(((length("OperationLine_Err2")==0)==True),('"' + 'DATA_LENGTH' + '":' + '"1"')).otherwise(concat(col("OperationLine_Err2"), lit(',"' + 'DATA_LENGTH' + '":' + '"1"'))))
#                                       ).otherwise(col("OperationLine_Err2"))                       
#                                  )
#                       .withColumn("OperationLine_Err2",  when(((length("OperationLine_Err2")>0)==True),concat(lit("{"),col("OperationLine_Err2"),lit("}"))).otherwise(""))

                        # Validate OperationDate 
                      .withColumn("OperationDate_Err",   when( (udf_isDate_sql("OperationDate",lit(__YYYYMMDDhhmmss__)))==True, 0).otherwise(-1)) 
#                       .withColumn("OperationDate_Err2",  when( (udf_isDate_sql("OperationDate",lit(__YYYYMMDDhhmmss__)))==False, ('"' + 'DATA_TYPE' + '":' + '"1"')).otherwise("")) 
#                       .withColumn("OperationDate_Err2",  when(((length("OperationDate_Err2")>0)==True),concat(lit("{"),col("OperationDate_Err2"),lit("}"))).otherwise(""))

#                        # Validate CostPricewithoutTaxes_Err           
                      .withColumn("CostPricewithoutTaxes_Err",    when(((udf_isDigit_sql("CostPricewithoutTaxes")==True) & (length("CostPricewithoutTaxes")==__PR_COSTPRICEWITHOUTTAXES_LEN__)) == True , 0).otherwise(-1))
#                       .withColumn("CostPricewithoutTaxes_Err2",   when((isDigit("CostPricewithoutTaxes")==False),  ('"' + 'DATA_TYPE' + '":' + '"1"')).otherwise("")) 
#                       .withColumn("CostPricewithoutTaxes_Err2",   
#                                   when(
#                                       ((length("CostPricewithoutTaxes")==__PR_COSTPRICEWITHOUTTAXES_LEN__)==False),  
#                                         (when(((length("CostPricewithoutTaxes_Err2")==0)==True),('"' + 'DATA_LENGTH' + '":' + '"1"')).otherwise(
#                                                   concat(col("CostPricewithoutTaxes_Err2"), lit(',"' + 'DATA_LENGTH' + '":' + '"1"')))
#                                         )
#                                       ).otherwise(col("CostPricewithoutTaxes_Err2"))                       
#                                  )
#                       .withColumn("CostPricewithoutTaxes_Err2",   when(((length("CostPricewithoutTaxes_Err2")>0)==True),concat(lit("{"),col("CostPricewithoutTaxes_Err2"),lit("}"))).otherwise(""))  

                       # Validate Percentage             
                      .withColumn("Percentage_Err",    when(((udf_isDigit_sql("Percentage")==True) & (length("Percentage")==__PR_PERCENTAGE_LEN__)) == True , 0).otherwise(-1)       )
#                       .withColumn("Percentage_Err2",   when((isDigit("Percentage")==False),  ('"' + 'DATA_TYPE' + '":' + '"1"')).otherwise(""))
#                       .withColumn("Percentage_Err2",   
#                                   when(
#                                       ((length("Percentage")==__PR_PERCENTAGE_LEN__)==False),   
#                                         (when(((length("Percentage_Err2")==0)==True),('"' + 'DATA_LENGTH' + '":' + '"1"')).otherwise(concat(col("Percentage_Err2"), lit(',"' + 'DATA_LENGTH' + '":' + '"1"'))))
#                                       ).otherwise(col("Percentage_Err2"))                       
#                                  )
#                       .withColumn("Percentage_Err2",   when(((length("Percentage_Err2")>0)==True),concat(lit("{"),col("Percentage_Err2"),lit("}"))).otherwise("")) 

                       # Validate TaxesPercentage          
                      .withColumn("TaxesPercentage_Err",  when(((udf_isDigit_sql("TaxesPercentage")==True) & (length("TaxesPercentage")==__PR_TAXESPERCENTAGE_LEN__)) == True , 0).otherwise(-1) )
#                       .withColumn("TaxesPercentage_Err2", when((isDigit("TaxesPercentage")==False),  ('"' + 'DATA_TYPE' + '":' + '"1"')).otherwise(""))  
#                       .withColumn("TaxesPercentage_Err2",   
#                                   when(
#                                       ((length("TaxesPercentage")==__PR_TAXESPERCENTAGE_LEN__)==False), 
#                                         (when(((length("TaxesPercentage_Err2")==0)==True),('"' + 'DATA_LENGTH' + '":' + '"1"')).otherwise(concat(col("TaxesPercentage_Err2"), lit(',"' + 'DATA_LENGTH' + '":' + '"1"'))))
#                                       ).otherwise(col("TaxesPercentage_Err2"))                       
#                                  )
#                       .withColumn("TaxesPercentage_Err2", when(((length("TaxesPercentage_Err2")>0)==True),concat(lit("{"),col("TaxesPercentage_Err2"),lit("}"))).otherwise(""))                        

                       # Validate Quantity  
                      .withColumn("Quantity_Err",  when(((udf_isDigit_sql("Quantity")==True) & (length("Quantity")==__PR_QUANTITY_LEN__)) == True , 0).otherwise(-1) )
#                       .withColumn("Quantity_Err2", when((isDigit("Quantity")==False),  ('"' + 'DATA_TYPE' + '":' + '"1"')).otherwise(""))
#                       .withColumn("Quantity_Err2",   
#                                   when(
#                                       ((length("Quantity")==__PR_QUANTITY_LEN__)==False), 
#                                         (when(((length("Quantity_Err2")==0)==True),('"' + 'DATA_LENGTH' + '":' + '"1"')).otherwise(concat(col("Quantity_Err2"), lit(',"' + 'DATA_LENGTH' + '":' + '"1"'))))
#                                       ).otherwise(col("Quantity_Err2"))                       
#                                  )
#                       .withColumn("Quantity_Err2",   when(((length("Quantity_Err2")>0)==True),concat(lit("{"),col("Quantity_Err2"),lit("}"))).otherwise(""))  

                       # Validate QuantityOffered        
                      .withColumn("QuantityOffered_Err",  when(((udf_isDigit_sql("QuantityOffered")==True) & (length("QuantityOffered")==__PR_QUANTITYOFFERED_LEN__)) == True , 0).otherwise(-1))
#                       .withColumn("QuantityOffered_Err2", when((isDigit("QuantityOffered")==False),  ('"' + 'DATA_TYPE' + '":' + '"1"')).otherwise("")) 
#                       .withColumn("QuantityOffered_Err2",   
#                                   when(
#                                       ((length("QuantityOffered")==__PR_QUANTITYOFFERED_LEN__)==False),    
#                                         (when(((length("QuantityOffered_Err2")==0)==True),('"' + 'DATA_LENGTH' + '":' + '"1"')).otherwise(concat(col("QuantityOffered_Err2"), lit(',"' + 'DATA_LENGTH' + '":' + '"1"'))))
#                                       ).otherwise(col("QuantityOffered_Err2"))                       
#                                  )
#                       .withColumn("QuantityOffered_Err2",  when(((length("QuantityOffered_Err2")>0)==True),concat(lit("{"),col("QuantityOffered_Err2"),lit("}"))).otherwise(""))              
                    )
        
        if debug==True: print("["+datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')+"]"+"["+filename+"]"+"[5 validations end]")
        ################################################################################################################################################
        #  END VALIDATION FORMAT DATA FIELDS
        ################################################################################################################################################
        
        
        ################################################################################################################################################
        #  TRANSFORMATION DATA FIELDS
        ################################################################################################################################################
        df_detail = (df_detail
             .drop("RecordType")
#              .withColumn("OPERATION_LINE_RAW", col("OperationLine"))
#              .withColumn("OPERATION_DATE_RAW", col("OperationDate"))
#              .withColumn("NATIONAL_CODE_RAW", col("ProductCode")) 
#              .withColumn("CATEGORY_RAW", col("ProductCode")) 
#              .withColumn("MANUFACTURER_CODE_RAW", col("ProductCode")) 
#              .withColumn("PRODUCT_QTY_RAW", col("ProductUnits"))  
#              .withColumn("PACK_SIZE_RAW", col("ProductPackSize")) 
#              .withColumn("PRODUCT_PRICE_CATALOG_RAW", col("ProductPriceCatalog"))   
#              .withColumn("PRODUCT_PRICE_RAW", col("ProductPrice"))
#              .withColumn("DISCOUNT_VALUE_RAW", col("DiscountValue"))      
#              .withColumn("PRODUCT_NET_RAW", col("ProductTotalPrice")) 
#              .withColumn("REIMBURSEMENT_VALUE_RAW", col("ReimbursementValue"))  
#              .withColumn("CONSUMER_VALUE_RAW", col("ConsumerValue"))    
#              .withColumn("PAYMENT_MODE_RAW", col("PaymentMode"))       
#              .withColumn("CONSUMER_GENDER_RAW", col("ConsumerGender"))  
#              .withColumn("PHARMACY_CODE_RAW", col("PharmacyID"))          
             .withColumn("OperationLine", col("OperationLine").cast(IntegerType()))
             .withColumn("Quantity", col("Quantity").cast(IntegerType()))
             .withColumn("QuantityOffered", col("QuantityOffered").cast(IntegerType()))
             .withColumn("CostPricewithoutTaxes", col("CostPricewithoutTaxes").cast(DoubleType())/100)                 
             .withColumn("Percentage", col("Percentage").cast(DoubleType())/100)
             .withColumn("TaxesPercentage", col("TaxesPercentage").cast(DoubleType())/100)
             .withColumn("formatErrorRow",   
                      col("OperationLine_Err").cast(IntegerType()) + 
                      col("OperationDate_Err").cast(IntegerType()) +
                      col("CostPricewithoutTaxes_Err").cast(IntegerType()) + 
                      col("Percentage_Err").cast(IntegerType()) + 
                      col("TaxesPercentage_Err").cast(IntegerType()) + 
                      col("Quantity_Err").cast(IntegerType()) + 
                      col("QuantityOffered_Err").cast(IntegerType()))
        )

        if debug==True: print("["+datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')+"]"+"["+filename+"]"+"[Number data type processed]")
                
        #Delete spaces into fields   
        df_detail = (reduce(
                        lambda df_detail, col_name: df_detail.withColumn(col_name, trim(col(col_name))),
                        df_detail.columns,
                        df_detail)
                    )

        df_detail = df_detail.alias('df_detail')
        ################################################################################################################################################
        #  END - TRANSFORMATION DATA FIELDS
        ################################################################################################################################################
        

        ################################################################################################################################################
        #  ENRICHMENT DATA FIELDS
        ################################################################################################################################################
        if debug==True: print("["+datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')+"]"+"["+filename+"]"+"[[Begin Enrichment PRODUCT]")

        #Enrichment with PRODUCT MASTER DATA
        df_product_cat = (spark.read.format('csv')
                          .options(header='true',charset='UTF-8')
                          .option("sep","|")
                          .load(__PHARMATIC_MASTER_DATA_CANONICAL_CATEGORIES_PATH__,schema=__SP_CATEGORIES_CANONICAL_DATA_SCHEMA__)
                         )

        if debug==True: print("["+datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')+"]"+"["+filename+"]"+"[9][Filter if National and EAN Not null]")
        #Products with EAN and NATIONAL CODE  
        df_product = df_product_cat.filter((df_product_cat.EAN13.isNull() == False)
                                         & (df_product_cat.NATIONAL_CODE.isNull() == False)).distinct().alias('df_product')
                
        df_detail = (df_detail.repartition(200)
                       .join(df_product.repartition(200), ((col('df_detail.EANCode')==col('df_product.EAN13'))
                                                         & (col('df_detail.ProductCode')==col('df_product.NATIONAL_CODE')) ), how='left')
                       .select('df_detail.*'
                              , 'df_product.NATIONAL_CODE'
                              , 'df_product.EAN13'
                              , 'df_product.CLASS'
                              , 'df_product.CATEGORY'
                              , 'df_product.FAMILY'
                              , 'df_product.SUBFAMILY'
                              , 'df_product.BRAND'
                              , 'df_product.LABORATORY'
                              , 'df_product.PRODUCT_NAME'
                              )
                    ).alias('df_detail')

        if debug==True: print("["+datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')+"]"+"["+filename+"]"+"[10][Filter > 149999]")
        
        #Product with NATIONAL_CODE - pharmaceutical and parapharmacy products
        df_product = (df_product_cat
                      .filter(df_product_cat.NATIONAL_CODE> "149999")
                      .select( col('NATIONAL_CODE')
                              , col('CLASS')
                              , col('CATEGORY')
                              , col('FAMILY')
                              , col('SUBFAMILY')
                              , col('BRAND')
                              , col('LABORATORY')
                              , col('PRODUCT_NAME')
                              )
                      .distinct()
                      .alias('df_product'))

        df_detail = (df_detail.repartition(200).join(df_product.repartition(200), col('df_detail.ProductCode')==col('df_product.NATIONAL_CODE'), how='left')
                      .select('df_detail.*'
                              , col('df_product.NATIONAL_CODE').alias('dp_NATIONAL_CODE')
                              , col('df_product.CLASS').alias('dp_CLASS')
                              , col('df_product.CATEGORY').alias('dp_CATEGORY')
                              , col('df_product.FAMILY').alias('dp_FAMILY')
                              , col('df_product.SUBFAMILY').alias('dp_SUBFAMILY')
                              , col('df_product.BRAND').alias('dp_BRAND')
                              , col('df_product.LABORATORY').alias('dp_LABORATORY')
                              , col('df_product.PRODUCT_NAME').alias('dp_PRODUCT_NAME')
                              )
                      .withColumn("NATIONAL_CODE",
                                  when((col('df_detail.NATIONAL_CODE').isNull()==True)  & (col('dp_NATIONAL_CODE').isNull()==False), col('dp_NATIONAL_CODE')).otherwise(col('df_detail.NATIONAL_CODE')))              
                      .withColumn("CLASS",
                                  when((col('df_detail.CLASS').isNull()==True) & (col('dp_CLASS').isNull()==False) , col('dp_CLASS')).otherwise(col('df_detail.CLASS')))      
                      .withColumn("CATEGORY",       
                             when((col('df_detail.CATEGORY').isNull()==True) & (col('dp_CATEGORY').isNull()==False) , col('dp_CATEGORY')).otherwise(col('df_detail.CATEGORY')))               
                      .withColumn("FAMILY",    
                           when((col('df_detail.FAMILY').isNull()==True) & (col('dp_FAMILY').isNull()==False) , col('dp_FAMILY')).otherwise(col('df_detail.FAMILY'))) 
                      .withColumn("SUBFAMILY",    
                           when((col('df_detail.SUBFAMILY').isNull()==True) & (col('dp_SUBFAMILY').isNull()==False) , col('dp_SUBFAMILY')).otherwise(col('df_detail.SUBFAMILY')))   
                      .withColumn("BRAND",    
                           when((col('df_detail.BRAND').isNull()==True) & (col('dp_BRAND').isNull()==False) , col('dp_BRAND')).otherwise(col('df_detail.BRAND'))) 
                      .withColumn("LABORATORY",    
                           when((col('df_detail.LABORATORY').isNull()==True) & (col('dp_LABORATORY').isNull()==False) , col('dp_LABORATORY')).otherwise(col('df_detail.LABORATORY')))         
                      .withColumn("PRODUCT_NAME",    
                           when((col('df_detail.PRODUCT_NAME').isNull()==True)  & (col('dp_PRODUCT_NAME').isNull()==False) , col('dp_PRODUCT_NAME')).otherwise(col('df_detail.PRODUCT_NAME')))   
                      .drop(col('dp_NATIONAL_CODE'))
                      .drop(col('dp_EAN13'))
                      .drop(col('dp_CLASS'))
                      .drop(col('dp_CATEGORY'))
                      .drop(col('dp_FAMILY'))
                      .drop(col('dp_SUBFAMILY'))
                      .drop(col('dp_BRAND'))
                      .drop(col('dp_LABORATORY'))
                      .drop(col('dp_PRODUCT_NAME'))  
                  ).alias('df_detail')

        #Assign Free products (NATIONAL_CODE -9), NATIONAL_CODE -1, EnrichNationalCodeResult and EnrichCategoryResult
        df_detail = (df_detail
                       .withColumn("NATIONAL_CODE", 
                               when((col('ProductCode')< "150000") & (col('NATIONAL_CODE').isNull()==True),-9).otherwise( col('NATIONAL_CODE')))              
                       .withColumn("NATIONAL_CODE",
                               when((col('NATIONAL_CODE').isNull()==False), col('NATIONAL_CODE')).otherwise(-1))
                     ).alias('df_detail')
        
        
        if debug==True: print("["+datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')+"]"+"["+filename+"]"+"[11][Begin Enrichment PHARMACIES]")
        ########################### ENRICH PHARMACIES
        df_pharmacy = (spark.read.format('csv')
                          .options(header='true',charset='UTF-8')
                          .option("sep","|")
                          .load(__PHARMATIC_MASTER_DATA_CANONICAL_PHARMACIES_PATH__,schema=__SP_PHARMACIES_MASTER_DATA_SCHEMA__)
                         )

        if debug==True: print("["+datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')+"]"+"["+filename+"]"+"[12][Filter PHARMACIES]")
        
        #Pharmacies with AH_PHARMACY_CODE  
        df_pharmacy = (df_pharmacy
                        .filter((df_pharmacy.AH_PHARMACY_CODE.isNull() == False))
                        .select(col('AH_PHARMACY_CODE').alias('PH_AH_PHARMACY_CODE')
                               ,col('PHARMACY_CODE'))
                        .distinct()
                      ).alias('df_pharmacy')

        df_detail = (df_detail
                     .join(broadcast(df_pharmacy), (col('df_detail.PharmacyID')==col('df_pharmacy.PH_AH_PHARMACY_CODE')), how='left')
                     . select ('df_detail.*'
                              , col('df_pharmacy.PHARMACY_CODE').alias('PHARMACY_CODE'))
                     . withColumn("PHARMACY_CODE",
                               when((col('PHARMACY_CODE').isNull()==False), col('PHARMACY_CODE')).otherwise(-1))
                     ).alias('df_detail')
        
        if debug==True: print("["+datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')+"]"+"["+filename+"]"+"[13][Enrich MANUFACTURERS]")
        ########################### ENRICH PRODUCT-MANUFACTURER
        df_product_new = (spark.read.format('csv')
                          .options(header='true',charset='UTF-8')
                          .option("sep","|")
                          .load("/mnt/ds/canonical/masterdata/product_master_new_CDM.csv")
                         )
        #Pharmacies with MANUFACTURER_CODE
        df_product_new = (df_product_new
                            .filter((df_product_new.MANUFACTURER_CODE.isNull() == False))
                            .select(col('PRODUCT_CODE')
                                   ,col('MANUFACTURER_CODE'))
                            .distinct()
                          ).alias('df_product_new')

        df_detail = (df_detail
                     .join(broadcast(df_product_new), (col('df_detail.NATIONAL_CODE')== df_product_new.PRODUCT_CODE.substr(0,6)), how='left')
                     . select ('df_detail.*'
                              , col('df_product_new.MANUFACTURER_CODE'))
                      . withColumn("MANUFACTURER_CODE",
                               when((col('MANUFACTURER_CODE').isNull()==False), col('MANUFACTURER_CODE')).otherwise(-1))
                     ).alias('df_detail')

        if debug==True: print("["+datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')+"]"+"["+filename+"]"+"[14][Calculate EnrichResult fields ]")
                
        #Assign EnrichNationalCodeResult, EnrichCategoryResult, EnrichManufacturerResult AND EnrichErrorRow
        df_detail = (df_detail
                       .withColumn("EnrichNationalCodeResult",
                               when((col('NATIONAL_CODE')==lit("-9")) | (col('NATIONAL_CODE')!=lit("-1")), lit("OK")))
                       .withColumn("EnrichNationalCodeResult",
                               when(
                                 (col('NATIONAL_CODE')==lit("-1")) & 
                                 (col('ProductCode')> "599999"),lit("NF1")).otherwise(col('EnrichNationalCodeResult')))
                       .withColumn("EnrichNationalCodeResult",
                               when(
                                 (col('NATIONAL_CODE')==lit("-1")) & 
                                 (col('ProductCode')> "149999") &
                                 (col('ProductCode')< "600000")
                                 ,lit("NF2")).otherwise(col('EnrichNationalCodeResult')))
                       .withColumn("EnrichCategoryResult",
                                 when((col('CATEGORY').isNull()==False), lit("OK")))
                       .withColumn("EnrichCategoryResult",
                                 when(
                                   ((col('CATEGORY').isNull()==True)) & 
                                   (col('ProductCode')> "599999")
                                   ,lit("NF1")).otherwise(col('EnrichCategoryResult')))
                        .withColumn("EnrichCategoryResult",
                                 when(
                                   ((col('CATEGORY').isNull()==True)) & 
                                   (col('ProductCode')> "149999") &
                                   (col('ProductCode')< "600000")
                                   ,lit("NF2")).otherwise(col('EnrichCategoryResult')))
                        .withColumn("EnrichCategoryResult",
                                 when(
                                   ((col('CATEGORY').isNull()==True)) & 
                                   (col('ProductCode')< "150000")
                                   ,lit("NF3")).otherwise(col('EnrichCategoryResult')))
                        .withColumn("EnrichManufacturerResult",
                                 when((col('MANUFACTURER_CODE')!="-1"), lit("OK")).otherwise("NF"))
                        .withColumn("EnrichPharmacyResult",
                                 when((col('PHARMACY_CODE')!="-1"), lit("OK")).otherwise("NF"))
                        .withColumn("EnrichErrorRow", lit("0"))
                        .withColumn("EnrichErrorRow",
                                    (when((col('EnrichNationalCodeResult')=="OK"), 0).otherwise(-1)) +
                                    (when((col('EnrichCategoryResult')=="OK"), 0).otherwise(-1)) +
                                    (when((col('EnrichManufacturerResult')=="OK"), 0).otherwise(-1)) +
                                    (when((col('EnrichPharmacyResult')=="OK"), 0).otherwise(-1))
                                   )
                         # Validate NATIONAL_CODE
                        .withColumn("NATIONAL_CODE_ERR",  when((col('EnrichNationalCodeResult')=="OK"), "").otherwise('{"' + 'ENRICHMENT' + '":' + '"1"}') )
                         # Validate CATEGORY
                        .withColumn("CATEGORY_ERR",  when((col('EnrichCategoryResult')=="OK"), "").otherwise('{"' + 'ENRICHMENT' + '":' + '"1"}') )
                         # Validate MANUFACTURER
                        .withColumn("MANUFACTURER_CODE_ERR",  when((col('EnrichManufacturerResult')=="OK"), "").otherwise('{"' + 'ENRICHMENT' + '":' + '"1"}') )
                         # Validate PHARMACY_CODE
                        .withColumn("PHARMACY_CODE_ERR",  when((col('EnrichPharmacyResult')=="OK"), "").otherwise('{"' + 'ENRICHMENT' + '":' + '"1"}') )
                     ).alias('df_detail')

        if debug==True: print("["+datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')+"]"+"["+filename+"]"+"[15][Count Errors]")
        
        #Status to be returned
        fileProcessedStatus = "OK:"
        if df_detail.filter(df_detail.formatErrorRow <0).count() > 0:
          fileProcessedStatus = fileProcessedStatus + " FMT" #some Format Error had been detected
        #end if formatErrorRow
        
        if df_detail.filter(df_detail.EnrichErrorRow <0).count() > 0:
            fileProcessedStatus = fileProcessedStatus + " ENR" #some Enrichment Error had been detected
        #end if enrichErrorRow
        ################################################################################################################################################
        #  END - ENRICHMENT DATA FIELDS
        ################################################################################################################################################
        
        
#         ################################################################################################################################################
#         #  QA MANAGMENT ERROS 
#         ################################################################################################################################################
#         df_errors = (df_detail
#                       .withColumn("PROCESS_DATE", to_timestamp(lit(processDate), 'yyyy-MM-dd HH:mm:ss') )
#                       .select(col('PROCESS_DATE')
#                             , col('FILE_NAME')
#                             , col('LANDING_DATE')
#                             , col('FILE_LINE_NUM')
#                             , col('OperationLine_Err2').alias("OPERATION_LINE_ERR")
#                             , col('OPERATION_LINE_RAW')
#                             , col('OperationDate_Err2').alias("OPERATION_DATE_ERR")
#                             , col('OPERATION_DATE_RAW')
#                             , col('ProductUnits_Err2').alias("PRODUCT_QTY_ERR")
#                             , col('PRODUCT_QTY_RAW')
#                             , col('ProductPackSize_Err2').alias("PACK_SIZE_ERR")
#                             , col('PACK_SIZE_RAW')
#                             , col('ProductPriceCatalog_Err2').alias("PRODUCT_PRICE_CATALOG_ERR")
#                             , col('PRODUCT_PRICE_CATALOG_RAW')
#                             , col('ProductPrice_Err2').alias("PRODUCT_PRICE_ERR")
#                             , col('PRODUCT_PRICE_RAW')
#                             , col('DiscountValue_Err2').alias("DISCOUNT_VALUE_ERR")
#                             , col('DISCOUNT_VALUE_RAW')  
#                             , col('ProductTotalPrice_Err2').alias("PRODUCT_NET_ERR")
#                             , col('PRODUCT_NET_RAW')  
#                             , col('ReimbursementValue_Err2').alias("REIMBURSEMENT_VALUE_ERR")
#                             , col('REIMBURSEMENT_VALUE_RAW')
#                             , col('ConsumerValue_Err2').alias("CONSUMER_VALUE_ERR")
#                             , col('CONSUMER_VALUE_RAW')
#                             , col('PaymentMode_Err2').alias("PAYMENT_MODE_ERR")
#                             , col('PAYMENT_MODE_RAW')  
#                             , col('ConsumerGender_Err2').alias("CONSUMER_GENDER_ERR")
#                             , col('CONSUMER_GENDER_RAW') 
#                             , col('NATIONAL_CODE_ERR')
#                             , col('NATIONAL_CODE_RAW')
#                             , col('CATEGORY_ERR')
#                             , col('CATEGORY_RAW') 
#                             , col('MANUFACTURER_CODE_ERR')
#                             , col('MANUFACTURER_CODE_RAW') 
#                             , col('PHARMACY_CODE_ERR')
#                             , col('PHARMACY_CODE_RAW') 
#                             )
#                       .withColumn('PROCESS_DATE', to_timestamp(col("PROCESS_DATE"), 'yyyy-MM-dd HH:mm:ss'))
#                       .withColumn('LANDING_DATE', unix_timestamp("LANDING_DATE", "yyyyMMddHHmmss").cast(TimestampType()))
#                       .withColumn("formatErrorRow",   
#                                 (when ((length("OPERATION_LINE_ERR")==0)==True, 0).otherwise(1)) +
#                                 (when ((length("OPERATION_DATE_ERR")==0)==True, 0).otherwise(1)) +
#                                 (when ((length("PRODUCT_QTY_ERR")==0)==True, 0).otherwise(1)) +
#                                 (when ((length("PACK_SIZE_ERR")==0)==True, 0).otherwise(1)) +
#                                 (when ((length("PRODUCT_PRICE_CATALOG_ERR")==0)==True, 0).otherwise(1)) +
#                                 (when ((length("PRODUCT_PRICE_ERR")==0)==True, 0).otherwise(1)) +    
#                                 (when ((length("DISCOUNT_VALUE_ERR")==0)==True, 0).otherwise(1)) +
#                                 (when ((length("PRODUCT_NET_ERR")==0)==True, 0).otherwise(1)) +
#                                 (when ((length("REIMBURSEMENT_VALUE_ERR")==0)==True, 0).otherwise(1)) +
#                                 (when ((length("CONSUMER_VALUE_ERR")==0)==True, 0).otherwise(1)) +
#                                 (when ((length("PAYMENT_MODE_ERR")==0)==True, 0).otherwise(1)) +
#                                 (when ((length("CONSUMER_GENDER_ERR")==0)==True, 0).otherwise(1)) +
#                                 (when ((length("NATIONAL_CODE_ERR")==0)==True, 0).otherwise(1)) +
#                                 (when ((length("CATEGORY_ERR")==0)==True, 0).otherwise(1)) +
#                                 (when ((length("MANUFACTURER_CODE_ERR")==0)==True, 0).otherwise(1)) +
#                                 (when ((length("PHARMACY_CODE_ERR")==0)==True, 0).otherwise(1))  
#                                  )
#                       .filter(col("formatErrorRow")> 0)
#                       .drop("formatErrorRow")
#                     )

#         if debug==True: print("["+datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')+"]"+"["+filename+"]"+"[16][QA_GENERATE_DATA]")
#         rows_count = df_detail.count()
#         OK_rows_count = rows_count-df_errors.count()
        
#         qa_result = QA_GENERATE_DATA(df_errors, rows_count, OK_rows_count,debug=debug)

#         if debug==True: print("["+datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')+"]"+"["+filename+"]"+"[17][ END QA_GENERATE_DATA]")

#         df_detail = (df_detail
#                       .drop('OperationLine_Err2')
#                       .drop('OperationDate_Err2')
#                       .drop('PRODUCT_QTY_ERR')
#                       .drop('ProductPackSize_Err2')
#                       .drop('ProductPriceCatalog_Err2')
#                       .drop('ProductPrice_Err2')
#                       .drop('DiscountValue_Err2')
#                       .drop('ProductTotalPrice_Err2')
#                       .drop('ReimbursementValue_Err2')
#                       .drop('ConsumerValue_Err2')
#                       .drop('PaymentMode_Err2')
#                       .drop('ConsumerGender_Err2')
#                     )
#         ################################################################################################################################################
#         #  END - QA MANAGMENT ERRORS 
#         ################################################################################################################################################

        
        ################################################################################################################################################
        #  RENAME FIELDS FOR CANONICAL DATA FILE
        ################################################################################################################################################
        df_detail = (df_detail
                     .withColumnRenamed('PharmacyID', 'PHARMACY_PMS_CODE')
                     .withColumnRenamed('FileDate', 'FILE_DATE')
                     .withColumnRenamed('ZIPCode', 'ZIP_CODE')
                     .withColumnRenamed('ExternalPharmacyID', 'EXTERNAL_PHARMACY_CODE')
                     .withColumnRenamed('OperationIdentification', 'OPERATION_TYPE')
                     .withColumnRenamed('OperationID', 'OPERATION_CODE')
                     .withColumnRenamed('OperationLine', 'OPERATION_LINE')
                     .withColumnRenamed('OperationDate', 'OPERATION_DATE')
                     .withColumnRenamed('SupplierType', 'SUPPLIER_TYPE')
                     .withColumnRenamed('SupplierIdentification', 'SUPPLIER_CODE')
                     .withColumnRenamed('ProductCode', 'PRODUCT_LINE_CODE')
                     .withColumnRenamed('ProductName', 'PRODUCT_LINE_NAME')
                     .withColumnRenamed('EANCode', 'EAN_CODE')
                     .withColumnRenamed('AlternativeProductCode', 'ALTERNATIVE_PRODUCT_CODE')
                     .withColumnRenamed('CostPricewithoutTaxes', 'PRODUCT_NET_PRICE')
                     .withColumnRenamed('PercentageSignal', 'PERCENTAGE_TYPE')
                     .withColumnRenamed('Percentage', 'PERCENTAGE_VALUE')
                     .withColumnRenamed('TaxesPercentage', 'TAXES_PERCENTAGE_VALUE')
                     .withColumnRenamed('Quantity', 'PRODUCT_QTY')
                     .withColumnRenamed('QuantityOffered', 'PRODUCT_OFFERED_QTY')
                    )
        
        ################################################################################################################################################
        #  END - RENAME FIELDS FOR CANONICAL DATA FILE
        ################################################################################################################################################
        
        ################################################################################################################################################
        #  SAVE CANONICAL DATA FILE
        ################################################################################################################################################
        if debug==True: print("["+datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')+"]"+"["+filename+"]"+"[18][ BEGIN Save CMD]")
        
        #Save the datraframe as a file
        result = saveAsCanonical(df_detail,processedpath+path_fact+fileNameExtract+ "_CMD.csv")
        if debug==True: print("["+datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')+"]"+"["+filename+"]"+"[19][ END Save CMD]")
          
        processDateEnd = datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')
        if result == False:
            distributeFile_Def(filepath+filename, "", errorpath+path_fact+filename, False) #Copy File to error folder
                        
            if debug==True: print("["+datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')+"]"+"["+filename+"]"+"[20][MOVE TO errorpath  CMD ]")
#             # Return information for CTL.PROCESS_FILE entity
#             df_QA_PROCESS_file = (df_QA_PROCESS_file
#                                  .withColumn ("END_DATE", to_timestamp(lit(processDateEnd), 'yyyy-MM-dd HH:mm:ss')) 
#                                  .withColumn ("STATUS", lit(-1))
#                                  .withColumn ("MESSAGE_TEXT", lit('{"CDM_NOT_SAVED":"CMD File saved function unsuccessfully"}'))
#                                  .withColumn ("ERROR_CODE", lit('{"CDM_NOT_SAVED":"-1"}'))
#                                 )
#             if debug==True: print("["+datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')+"]"+"["+filename+"]"+"[20][DDBB QA saved CMD KO]")
#             if (QA_UPDATE_CTL_PROCESS_FILE_DATA(df_QA_PROCESS_file, debug=debug) == True):
#               fileProcessedStatus = "EX: CMD File saved function unsuccessfully" 
#             else:
#               fileProcessedStatus = "EX QA FAILED: CMD File saved function unsuccessfully"
#             #end if (QA_UPDATE_CTL_PROCESS_FILE_DATA 
        else:
            (deleteFile_Def(filepath,filename))   #Delete the tobreprocessed file
#             # Return information for CTL.PROCESS_FILE entity
#             df_QA_PROCESS_file = (df_QA_PROCESS_file
#                                  .withColumn ("END_DATE", to_timestamp(lit(processDateEnd), 'yyyy-MM-dd HH:mm:ss')) 
#                                  .withColumn ("STATUS", lit(1))
#                                 )
#              if debug==True: print("["+datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')+"]"+"["+filename+"]"+"[20][DDBB QA saved CMD OK]")
#             if (QA_UPDATE_CTL_PROCESS_FILE_DATA(df_QA_PROCESS_file, debug=debug) == False):
#               fileProcessedStatus = "QA FAILED " + fileProcessedStatus
#             #end if (QA_UPDATE_CTL_PROCESS_FILE_DATA
        #end if result == False
        ##############################################################################################################################################
        #  END - SAVE CANONICAL DATA FILE
        ##############################################################################################################################################
        
      ################################################################################################################################################
      #  END VALIDATION DATA FIELDS AND CREATE CANONICAL DATA FILE
      ################################################################################################################################################        
      #end if df_detail.filter
          
      ################################################################################################################################################
      #  END FILE CONTENT MANAGEMENT
      ################################################################################################################################################
      
      return fileProcessedStatus
    
  except Exception as e:
      processDateEnd = datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')
      errorException= str(e).replace("'","")
      
      if debug==True: print("["+datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')+"]"+"["+filename+"]"+"[ EXCEPTION]")
        
#       # DataFrame Error df_QA_PROCESS_file
#       df_QA_PROCESS_file = (df_QA_PROCESS_file
#            .withColumn ("END_DATE", to_timestamp(lit(processDateEnd), 'yyyy-MM-dd HH:mm:ss')) 
#            .withColumn ("STATUS", lit(-2))
#            .withColumn ("MESSAGE_TEXT", lit('{"EXCEPTION_ERROR":"File does not have a valid structure"}'))
#            .withColumn ("ERROR_CODE", lit('{"EXCEPTION_ERROR":"-2"}'))
#           )
#       if (QA_UPDATE_CTL_PROCESS_FILE_DATA(df_QA_PROCESS_file, debug=debug) == True):
#         fileProcessedStatus= "EX:" + errorException
#       else:
#         fileProcessedStatus= "EX: FAILED " + errorException
      
      fileProcessedStatus= "EX:" + errorException # ELIMINAR AL QUITAR COMENTARIOS DE QA
      
      
      #end if (QA_UPDATE_CTL_PROCESS_FILE_DATA
      
      return fileProcessedStatus  