In [0]:
import pyspark.sql
from pyspark.sql.functions import col, concat_ws,collect_list,col, lit, to_json, struct,monotonically_increasing_id, row_number
import pyspark.sql.functions as func
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, StructType, StructField, MapType, ArrayType
from pyspark.sql import Row
import pandas as pd
from pyspark.sql import Window

In [0]:
%sql
CREATE OR REPLACE TABLE group_db_innovation_and_insights.dfsource_full
USING DELTA
PARTITIONED BY (subsector_description, fp_site)
AS
with BOM_levels_raw  as (
     select *
     
     from (
         
     	 	
         ( 
          select 
     MAST.sap_box_name as MAST_sap_box_name, 
     MAST.matnr as MAST_matnr, --id of product material
     MAST.werks as MAST_werks, --Plant id
     MAST.stlan as MAST_stlan, -- bom usage (filter 1 = production)
     MAST.stlnr as MAST_stlnr, --bill of material
     MAST.stlal as MAST_stlal, --Alternative BOM 
     STKO.stkoz as STKO_stkoz, --Internal counter
     STKO.datuv as STKO_datuv, --Valid from date
     STKO.aennr as STKO_aennr, --Change number
     STKO.lkenz as STKO_lkenz, --Deletion indicator 1
     STKO.bmein as STKO_bmein, -- base unit of measure --task unidades
     STKO.bmeng as STKO_bmeng, --base quantity    ---  task unidades
     STKO.stktx as STKO_stktx, --Alternative bom text
     STAS.stlkn as STAS_stlkn, --Bom item bom number
     STAS.stasz as STAS_stasz, --internal counter
     STAS.lkenz as STAS_lkenz, --Deletion indicator
     STAS.datuv as STAS_datuv, --Valid from date
     STPO.stlty as STPO_stlty, --bom category
     STPO.stlnr as STPO_stlnr, --bill of material
     STPO.stlkn as STPO_stlkn, --Bom item bom number
     STPO.stpoz as STPO_stpoz, --internal counter
     STPO.datuv as STPO_datuv,  --Valid from date
     STPO.aennr as STPO_aennr, --Change number
     STPO.lkenz as STPO_lkenz, --Deletion indicator 1
     STPO.vgknt as STPO_vgknt, --Predeccesor node
     STPO.vgpzl as STPO_vgpzl, --Previous item counter
     STPO.andat as STPO_andat, --date record created on
     STPO.aedat as STPO_aedat, --Creation date
     STPO.posnr as STPO_posnr, --Billing item
     STPO.idnrk as STPO_idnrk, --Bom component
     STPO.meins as STPO_meins, -- base unit of measure
     STPO.menge as STPO_menge, --quantity
     STPO.ausch as STPO_ausch, --component scrap in percent
     STPO.avoau as STPO_avoau, --operation scrap
     STPO.netau as STPO_netau, --net scrap
     STPO.stvkn as STPO_stvkn--inherit bom number of bom 

     from
      (
       (
         dp_osi_la_ecc.mast as MAST
         Inner Join dp_osi_la_ecc.stko as STKO on (MAST.sap_box_name = STKO.sap_box_name and MAST.stlnr = STKO.stlnr and MAST.stlal = STKO.stlal)
       )
       Inner Join dp_osi_la_ecc.stas as STAS on (STKO.sap_box_name = STAS.sap_box_name and STKO.stlty = STAS.stlty and STKO.stlnr = STAS.stlnr and STKO.stlal = STAS.stlal)
       ) inner join dp_osi_la_ecc.stpo as STPO on (STAS.sap_box_name = STPO.sap_box_name and STAS.stlty = STPO.stlty and STAS.stlnr = STPO.stlnr and  STAS.stlkn = STPO.stlkn) 
     )
   ) as BOM_raw
   
   where
  MAST_stlan = '1'  
),


sub_BOM_levels as (

        SELECT       MAST_matnr, 
                     MAST_werks, 
                     STPO_idnrk 
        FROM BOM_levels_raw 
        WHERE STPO_idnrk IS NOT NULL
        GROUP BY MAST_matnr,
                 MAST_werks,
                 STPO_idnrk 
),


MATERIALSFP AS (
            SELECT 
                    CAST(MAST_matnr AS INT) AS ProducedCode,
                    makt_halb.name_short_description_of_product AS ProducedCodeName,
                    MAST_werks as Fp_site,
                    CAST(STPO_idnrk AS INT ) AS RPI_Code,
                    makt_roh.name_short_description_of_product AS material_description
            FROM sub_BOM_levels
            -------------------------------------------------------------------------------------------------------------------------------
            LEFT JOIN (
                            SELECT id_of_product_material, 
                                   LAST(name_short_description_of_product) AS name_short_description_of_product
                            FROM dp_masterdata_g11.makt_material_descriptions
                            WHERE language_code = 'E'
                            GROUP BY id_of_product_material
                      ) makt_halb  ON CAST(MAST_matnr AS INT ) = CAST(makt_halb.id_of_product_material AS INT)
            -------------------------------------------------------------------------------------------------------------------------------
            LEFT JOIN (
                            SELECT id_of_product_material, 
                            LAST(name_short_description_of_product) AS name_short_description_of_product
                            FROM dp_masterdata_g11.makt_material_descriptions
                            WHERE language_code = 'E'
                            GROUP BY id_of_product_material
                      ) makt_roh ON CAST(STPO_idnrk AS INT ) = CAST(makt_roh.id_of_product_material AS INT)
            
),

materialsfpcomplete as(
  SELECT 
    cast(ProducedCode as string) as ProducedCode,
    ProducedCodeName,
    Fp_site,
    RPI_Code,
    material_description
  FROM MATERIALSFP
),

--Coquetscodigo----------------------------------------------------------------------------------------------------------------
otherelements AS (
    SELECT
        dispo AS MRP_Controller,
        dismm AS MRP_Type,
        werks AS Rpi_Site,
        matnr AS RPI_Code
    FROM dp_osi_la_ecc.marc
    WHERE TRIM(dismm) NOT IN ('ND', '') AND dismm IS NOT NULL
),
--------------------------------------------------------------------------------------------------------------------------------------

--FinalMaterialFull----------------------------------------------------------------------------------------------------------------

materialsfpcomplete_otherelements AS (
   SELECT 
         msfp.ProducedCode as PROD_ID,
         msfp.ProducedCodeName,
         msfp.Fp_site,
         msfp.RPI_Code,
         msfp.material_description,
         o.MRP_Controller,
         o.MRP_Type,
         o.Rpi_Site
   FROM materialsfpcomplete msfp
   LEFT JOIN otherelements o USING (RPI_Code) -- using O.RPI_Code = msfp.RPI_Code
),


materialsfpcomplete_otherelements_plus AS (
    SELECT 
       MSPFO.PROD_ID,
       MSPFO.ProducedCodeName,
       MSPFO.Fp_site,
       marc.dismm as MRP_Type_Fp,
       MSPFO.RPI_Code,
       MSPFO.material_description,
       MSPFO.MRP_Controller,
       MSPFO.MRP_Type as MRP_Type_Rp,
       MSPFO.Rpi_Site,
       dnorm.subsector_description,
       dnorm.segment_long_description,
       reg.SiteRegion,
       marag11.mtart as Element_Type
    FROM materialsfpcomplete_otherelements MSPFO
    LEFT JOIN dp_masterdata_g11.mara  marag11 on  SUBSTRING(marag11.matnr, 11) = MSPFO.RPI_Code
    LEFT JOIN ap_emdl_product_master_data.product_global_dimension_dnorm dnorm on substring(dnorm.material_id,11) = MSPFO.PROD_ID
    LEFT JOIN ad_global_supply.sites_dim reg on reg.SiteID = MSPFO.Fp_site
    LEFT JOIN dp_osi_la_ecc.marc marc ON  substring(marc.matnr,11) = MSPFO.PROD_ID -- dp_osi_la_ecc.marc.matnr = msfp.PROD_ID  para traernos MRP_Type_Fp
    LEFT JOIN ap_emdl_product_master_data.product_sales_area_representative_fpc MDA on substring(MDA.material_id,11) = MSPFO.RPI_Code
   -- WHERE  dnorm.subsector_description IS NOT NULL --AND dnorm.active_indicator <> 'inactive'
   WHERE marc.dismm IS NOT NULL  AND TRIM(marc.dismm) NOT IN ('ND', '') AND marc.dismm IS NOT NULL
 )
--  materialsfpcomplete_otherelements_plus_clean as (
--  SELECT DISTINCT 
--         PROD_ID,
--         ProducedCodeName,
--         Fp_site,
--         MRP_Type_Fp,
--         RPI_Code,
--         Rpi_Site,
--         material_description,
--         segment_long_description,
--         subsector_description,
--         MRP_Type,
--         SiteRegion,
--         Element_Type
-- FROM  materialsfpcomplete_otherelements_plus
-- WHERE NOT MRP_Type IN ('ND','X1') AND MRP_Type_Fp NOT IN ('ND','X1')
-- )



 SELECT DISTINCT 
        PROD_ID,
        ProducedCodeName,
        MRP_Type_Fp,
        Fp_site,
        RPI_Code,
        MRP_Type_Rp,
        Rpi_Site,
        material_description,
        segment_long_description,
        subsector_description,
        SiteRegion,
        Element_Type
FROM materialsfpcomplete_otherelements_plus

-- FROM  materialsfpcomplete_otherelements_plus_clean
--WHERE LOWER(ProducedCodeName) LIKE '%Ariel%'
--WHERE PROD_ID in ('80748933','80748937','80799995','80750056','80769192')
--WHERE PROD_ID = '90296803'
--WHERE RPI_Code = '20229042'
-- WHERE subsector_description = 'BabyCare'
-- AND Fp_site IN ('2532')


-- SELECT  
--         plus.Fp_site,
--         reg.SiteRegion,
--         COUNT(DISTINCT(plus.PROD_ID)) as FPxsite
-- FROM  materialsfpcomplete_otherelements_plus plus
-- LEFT JOIN ad_global_supply.sites_dim reg on reg.SiteID = plus.Fp_site
-- GROUP BY plus.Fp_site,reg.SiteRegion


In [0]:
%sql
SELECT 
    Fp_site,
    COUNT(DISTINCT PROD_ID) AS Prod_ids
FROM 
    group_db_innovation_and_insights.dfsource_full
WHERE 
    subsector_description = 'BabyCare'
GROUP BY 
    Fp_site

In [0]:
%sql
OPTIMIZE group_db_innovation_and_insights.dfsource_full
ZORDER BY (PROD_ID)

In [0]:


# dfsourcefind = spark.table('group_db_innovation_and_insights.dfsource_find').select("PROD_ID", "Fp_site", "RPI_Code", "Rpi_Site", "Element_Type")

In [0]:
# queryfind3 = '''
# select *
# from group_db_innovation_and_insights.dfsource_full
# where subsector_description = 'ShaveCare'
# and Fp_site in ("C089","0371","3662","9455","B082","3327","6897","2167","B168","3003","3128")
# '''
# LISTO YA SE HIZO LA CARGA ✓


# queryfind3 = '''
# select *
# from group_db_innovation_and_insights.dfsource_full
# where subsector_description = 'ShaveCare'
# and Fp_site in    ("9451"
#                   ,"A992"
#                   ,"5029"
#                   ,"6444"
#                   ,"4752")
# '''
# LISTO YA SE HIZO LA CARGA ✓


# queryfind3 = '''
# select *
# from group_db_innovation_and_insights.dfsource_full
# where subsector_description = 'ShaveCare'
# and Fp_site in    
#                   ("3460"
#                   ,"C096"
#                   ,"9912"
#                   ,"9942"
#                   ,"9244"
#                   ,"3004"
#                   ,"3099"
#                   ,"C092")
# '''


# queryfind3 = '''
# select *
# from group_db_innovation_and_insights.dfsource_full
# where subsector_description = 'ShaveCare'
# and Fp_site in    
#                     ("3009"
#                     ,"B622"
#                     ,"6674"
#                     ,"7763"
#                     ,"9973"
#                     ,"A695"
#                     ,"3486"
#                     ,"8076"
#                     ,"9366")
#  '''


# queryfind3 = '''
# select *
# from group_db_innovation_and_insights.dfsource_full
# where subsector_description = 'ShaveCare'
# and Fp_site in    ("C106")
# '''

# queryfind3 = '''
# select *
# from group_db_innovation_and_insights.dfsource_full
# where subsector_description = 'ShaveCare'
# and Fp_site in    (
#                       "3100"
#                      ,"2332"
#                      ,"B794"
#                      ,"A385"
#                      ,"B532"
#                      ,"6963"
#                      ,"8921"
#                      ,"8779")
# '''




# queryfind3 = '''
# select *
# from group_db_innovation_and_insights.dfsource_full
# where subsector_description = 'ShaveCare'
# and Fp_site in    (
#                      "8823"
#                     ,"1694"
#                     ,"9453"
#                     ,"C106"
#                     ,"2518"
#                     ,"0042"
#                     ,"2472"
#                     ,"8423")
# '''




#***********************shavecare********************************************

# queryfind3 = '''
# select *
# from group_db_innovation_and_insights.dfsource_full
# where subsector_description ='FamilyCare'
# and Fp_site in  
# ("7018"
# ,"3955"
# ,"4406"
# ,"4568"
# ,"4298"
# ,"4722"
# ,"4320")
# '''

# queryfind3 = '''
# select *
# from group_db_innovation_and_insights.dfsource_full
# where subsector_description ='FemCare'
# and Fp_site in  
# ("C089"
# ,"0371"
# ,"C107"
# ,"C702"
# ,"4269"
# ,"9455"
# ,"2532"
# ,"7018"
# ,"3003"
# ,"5619"
# ,"1193"
# ,"9451"
# ,"5029"
# ,"4330"
# ,"6444"
# ,"5030"
# ,"3955"
# ,"5593"
# ,"5032"
# ,"3008"
# ,"4331"
# ,"9912"
# ,"4568"
# ,"C092"
# ,"6674"
# ,"9458"
# ,"4298"
# ,"B393"
# ,"2299"
# ,"8076"
# ,"6960"
# ,"2332"
# ,"B794"
# ,"6503"
# ,"5715"
# ,"8823"
# ,"9461"
# ,"B785"
# ,"1694"
# ,"4840"
# ,"9462"
# ,"A850")
# '''

#**************************************ORAL CARE*******************************

# queryfind3 = '''
#     select *
#     from group_db_innovation_and_insights.dfsource_full
#      where subsector_description ='Oralcare'
#      and Fp_site in  
#               ("C089"
#               ,"0371"
#               ,"C104"
#               ,"3662"
#               ,"3325"
#               ,"4269"
#               ,"9455"
#               ,"9972"
#               ,"5062"
#               ,"6897"
#               ,"2167"
#               ,"2709"
#               ,"4267"
#               ,"3003"
#               ,"1193"
#               ,"9451"
#               ,"5029"
#               ,"6444"
#               ,"4752"
#               ,"2603"
#               ,"8420"
#               ,"0453"
#               ,"5593"
#               ,"5032"
#               ,"3460")
# '''

#**********************************Appliances***************************************
# queryfind3 = '''
#     select *
#     from group_db_innovation_and_insights.dfsource_full
#      where subsector_description ='Appliances'
#      and Fp_site in 
# ("C089"
# ,"7018"
# ,"3128"
# ,"9942"
# ,"7648"
# ,"2332"
# ,"8921"
# ,"8823"
# ,"9276"
# ,"C106"
# ,"2518"
# ,"8423")
# '''
#*******************************************************************************
#*******************************PHC*********************************************

# queryfind3 = '''
#     select *
#     from group_db_innovation_and_insights.dfsource_full
#      where subsector_description ='PHC'
#      and Fp_site in 
# ("D301"
# ,"9942"
# ,"9976"
# ,"3018"
# ,"C830"
# ,"6990"
# ,"C820"
# ,"4568"
# ,"C092"
# ,"6674"
# ,"4298"
# ,"A695"
# ,"3486"
# ,"4328"
# ,"C828"
# ,"C868"
# ,"2332"
# ,"B794"
# ,"6963"
# ,"4876"
# ,"8823"
# ,"2598"
# ,"2517"
# ,"6020"
# ,"9276"
# ,"2518"
# ,"2472"
# ,"3013"
# ,"8423"
# ,"9921")
# '''
#******************************************************************************
#*********************************FabricCare_A***************************************

# queryfind3 = '''
#     select *
#     from group_db_innovation_and_insights.dfsource_full
#      where subsector_description ='FabricCare'
#      and Fp_site in 
# ("9030"
# ,"C089"
# ,"9975"
# ,"B777"
# ,"D657"
# ,"C787"
# ,"C104"
# ,"2587"
# ,"9455"
# ,"4147"
# ,"4951"
# ,"6021"
# ,"6897"
# ,"2709"
# ,"4267"
# ,"3003"
# ,"1193"
# ,"5029"
# ,"2603"
# ,"E220"
# ,"3460"
# ,"9030"
# ,"C089"
# ,"0369"
# ,"7249"
# ,"B777"
# ,"9361"
# ,"A351"
# ,"0371"
# ,"9047"
# ,"3325"
# ,"4269"
# ,"9455"
# ,"5722"
# ,"4951"
# ,"2708"
# ,"2167"
# ,"6929"
# ,"1193"
# ,"5029"
# ,"6444"
# ,"2603"
# ,"3955")
# '''
#*********************************FabricCare_B***************************************

# queryfind3 = '''
#     select *
#     from group_db_innovation_and_insights.dfsource_full
#      where subsector_description ='FabricCare'
#      and Fp_site in 
# ("2711"
# ,"5738"
# ,"5593"
# ,"3487"
# ,"C105"
# ,"5276"
# ,"9942"
# ,"9464"
# ,"4568"
# ,"C092"
# ,"B622"
# ,"6674"
# ,"4298"
# ,"7759"
# ,"2299"
# ,"5774"
# ,"8076"
# ,"6960"
# ,"4328"
# ,"9366"
# ,"2332"
# ,"B794"
# ,"4876"
# ,"8779"
# ,"8823"
# ,"B785"
# ,"2517"
# ,"5026"
# ,"2518"
# ,"2472")
# '''
#*******************************************************************************
#************************************SkinPersCr****************************

queryfind3 = '''
    select *
    from group_db_innovation_and_insights.dfsource_full
     where subsector_description ='SkinPersCr'
     and Fp_site in 
("C089"
",0371"
",3325"
",4269"
",9455"
",5722"
",4951"
",5621"
",B082"
",6897"
",5662"
",4267"
",6929"
",7018"
",1193"
",5660"
",9451"
",4877"
",5029"
",6444"
",2603"
",4752"
",3955"
",2835"
",0093"
",5593"
",5032"
",3460"
",C108"
",4331"
",9942"
",3004"
",4568"
",3099"
",C092"
",3009"
",6496"
",B622"
",6674"
",4875"
",9458"
",4298"
",6902"
",9973"
",A695"
",4283"
",3486"
",8076"
",4328"
",9366"
",C091"
",3100"
",2332"
",C095"
",3488"
",B649"
",B794"
",6963"
",6503"
",4876"
",8779"
",0302"
",8823"
",B785"
",5026"
",9462"
",2518"
",2472"
",C100"
",8423")
'''

#*******************************************************************************

find_df3 = spark.sql(queryfind3)
dfsourcefull = spark.table('group_db_innovation_and_insights.dfsource_full').select("PROD_ID", "Fp_site", "RPI_Code", "Rpi_Site", "Element_Type")

algoritmo con vectores


In [0]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import col, lit, when
from pyspark.sql.types import StructType, StructField, IntegerType, StringType


# esquema basado en tu estructura
schema = StructType([
    StructField("Level_0_PROD_ID", IntegerType(), True),
    StructField("Level_0_Fp_site", StringType(), True),
    StructField("Level_1_RPI_Code", IntegerType(), True),
    StructField("Level_1_RP_Site", StringType(), True),
    StructField("Level_1_Element_Type", StringType(), True),
    StructField("Level_2_RPI_Code", IntegerType(), True),
    StructField("Level_2_RP_Site", StringType(), True),
    StructField("Level_2_Element_Type", StringType(), True),
    StructField("Level_3_RPI_Code", IntegerType(), True),
    StructField("Level_3_RP_Site", StringType(), True),
    StructField("Level_3_Element_Type", StringType(), True),
    StructField("Level_4_RPI_Code", IntegerType(), True),
    StructField("Level_4_RP_Site", StringType(), True),
    StructField("Level_4_Element_Type", StringType(), True),
    StructField("Level_5_RPI_Code", IntegerType(), True),
    StructField("Level_5_RP_Site", StringType(), True),
    StructField("Level_5_Element_Type", StringType(), True),
])

# Preparar el DataFrame inicial
def prepare_initial_df(dfsourcefind):
    return dfsourcefind.select(
        col('PROD_ID').cast(IntegerType()).alias('Level_0_PROD_ID'), 
        col('Fp_site').alias('Level_0_Fp_site'),
        col('RPI_Code').cast(IntegerType()).alias('Level_1_RPI_Code'), 
        col('Rpi_Site').alias('Level_1_RP_Site'),
        col('Element_Type').alias('Level_1_Element_Type')
    ).withColumn('Level_2_RPI_Code', lit(None).cast(IntegerType())) \
     .withColumn('Level_2_RP_Site', lit(None)) \
     .withColumn('Level_2_Element_Type', lit(None)) \
     .withColumn('Level_3_RPI_Code', lit(None).cast(IntegerType())) \
     .withColumn('Level_3_RP_Site', lit(None)) \
     .withColumn('Level_3_Element_Type', lit(None)) \
     .withColumn('Level_4_RPI_Code', lit(None).cast(IntegerType())) \
     .withColumn('Level_4_RP_Site', lit(None)) \
     .withColumn('Level_4_Element_Type', lit(None)) \
     .withColumn('Level_5_RPI_Code', lit(None).cast(IntegerType())) \
     .withColumn('Level_5_RP_Site', lit(None)) \
     .withColumn('Level_5_Element_Type', lit(None))



def expand_to_level_2(df, dfsourcefull):
    joined_df = df.alias('parent').join(
        dfsourcefull.alias('child'),
        (col('parent.Level_1_RPI_Code') == col('child.PROD_ID').cast(IntegerType())) &
        (col('parent.Level_1_Element_Type') == 'HALB'), 
        'left_outer'
    )

    selected_columns = [
        col('parent.Level_0_PROD_ID'),
        col('parent.Level_0_Fp_site'),
        col('parent.Level_1_RPI_Code'),
        col('parent.Level_1_RP_Site'),
        col('parent.Level_1_Element_Type'),
        col('child.RPI_Code').alias('Level_2_RPI_Code'),  # Asegura que esta columna viene del DataFrame 'child'
        col('child.Rpi_Site').alias('Level_2_RP_Site'),   # Similarmente, asegura que estas columnas vienen de 'child'
        col('child.Element_Type').alias('Level_2_Element_Type')
    ]

    # Creamos el DataFrame final seleccionando las columnas deseadas y manejando valores nulos
    final_df = joined_df.select(*selected_columns).fillna({
        'Level_2_RPI_Code': -1,
        'Level_2_RP_Site': 'NA',
        'Level_2_Element_Type': 'NA'
    })

    return final_df



def expand_to_level_3(df, dfsourcefull):
    joined_df = df.alias('parent').join(
        dfsourcefull.alias('child'),
        (col('parent.Level_2_RPI_Code') == col('child.PROD_ID')) & (col('parent.Level_2_Element_Type') == 'HALB'),
        'left_outer'
    )
    selected_columns = [
        col('parent.' + colName) for colName in df.columns
    ] + [
        col('child.RPI_Code').alias('Level_3_RPI_Code'),
        col('child.Rpi_Site').alias('Level_3_RP_Site'),
        col('child.Element_Type').alias('Level_3_Element_Type')
    ]
    final_df = joined_df.select(*selected_columns).fillna({
        'Level_3_RPI_Code': -1,
        'Level_3_RP_Site': 'NA',
        'Level_3_Element_Type': 'NA'
    })
    return final_df


def expand_to_level_4(df, dfsourcefull):
    joined_df = df.alias('parent').join(
        dfsourcefull.alias('child'),
        (col('parent.Level_3_RPI_Code') == col('child.PROD_ID')) & (col('parent.Level_3_Element_Type') == 'HALB'),
        'left_outer'
    )
    selected_columns = [
        col('parent.' + colName) for colName in df.columns
    ] + [
        col('child.RPI_Code').alias('Level_4_RPI_Code'),
        col('child.Rpi_Site').alias('Level_4_RP_Site'),
        col('child.Element_Type').alias('Level_4_Element_Type')
    ]
    final_df = joined_df.select(*selected_columns).fillna({
        'Level_4_RPI_Code': -1,
        'Level_4_RP_Site': 'NA',
        'Level_4_Element_Type': 'NA'
    })
    return final_df

def expand_to_level_5(df, dfsourcefull):
    joined_df = df.alias('parent').join(
        dfsourcefull.alias('child'),
        (col('parent.Level_4_RPI_Code') == col('child.PROD_ID')) & (col('parent.Level_4_Element_Type') == 'HALB'),
        'left_outer'
    )
    selected_columns = [
        col('parent.' + colName) for colName in df.columns
    ] + [
        col('child.RPI_Code').alias('Level_5_RPI_Code'),
        col('child.Rpi_Site').alias('Level_5_RP_Site'),
        col('child.Element_Type').alias('Level_5_Element_Type')
    ]
    final_df = joined_df.select(*selected_columns).fillna({
        'Level_5_RPI_Code': -1,
        'Level_5_RP_Site': 'NA',
        'Level_5_Element_Type': 'NA'
    })
    return final_df


initial_df = prepare_initial_df(find_df)
expanded_df_level_1 = expand_to_level_2(initial_df, dfsourcefull)
expanded_df_level_2 = expand_to_level_3(expanded_df_level_1, dfsourcefull)
expanded_df_level_3 = expand_to_level_4(expanded_df_level_2, dfsourcefull)
expanded_df_level_4 = expand_to_level_5(expanded_df_level_3, dfsourcefull)

# #check de cargas en niveles
# display(expanded_df_level_1)
# #resultados finales
final_df = expanded_df_level_4
display(final_df)

In [0]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import col, lit, when
from pyspark.sql.types import StructType, StructField, IntegerType, StringType


# esquema basado en tu estructura
schema = StructType([
    StructField("Level_0_PROD_ID", IntegerType(), True),
    StructField("Level_0_Fp_site", StringType(), True),
    StructField("Level_1_RPI_Code", IntegerType(), True),
    StructField("Level_1_RP_Site", StringType(), True),
    StructField("Level_1_Element_Type", StringType(), True),
    StructField("Level_2_RPI_Code", IntegerType(), True),
    StructField("Level_2_RP_Site", StringType(), True),
    StructField("Level_2_Element_Type", StringType(), True),
    StructField("Level_3_RPI_Code", IntegerType(), True),
    StructField("Level_3_RP_Site", StringType(), True),
    StructField("Level_3_Element_Type", StringType(), True),
    StructField("Level_4_RPI_Code", IntegerType(), True),
    StructField("Level_4_RP_Site", StringType(), True),
    StructField("Level_4_Element_Type", StringType(), True),
    StructField("Level_5_RPI_Code", IntegerType(), True),
    StructField("Level_5_RP_Site", StringType(), True),
    StructField("Level_5_Element_Type", StringType(), True),
])

# Preparar el DataFrame inicial
def prepare_initial_df(dfsourcefind):
    return dfsourcefind.select(
        col('PROD_ID').cast(IntegerType()).alias('Level_0_PROD_ID'), 
        col('Fp_site').alias('Level_0_Fp_site'),
        col('RPI_Code').cast(IntegerType()).alias('Level_1_RPI_Code'), 
        col('Rpi_Site').alias('Level_1_RP_Site'),
        col('Element_Type').alias('Level_1_Element_Type')
    ).withColumn('Level_2_RPI_Code', lit(None).cast(IntegerType())) \
     .withColumn('Level_2_RP_Site', lit(None)) \
     .withColumn('Level_2_Element_Type', lit(None)) \
     .withColumn('Level_3_RPI_Code', lit(None).cast(IntegerType())) \
     .withColumn('Level_3_RP_Site', lit(None)) \
     .withColumn('Level_3_Element_Type', lit(None)) \
     .withColumn('Level_4_RPI_Code', lit(None).cast(IntegerType())) \
     .withColumn('Level_4_RP_Site', lit(None)) \
     .withColumn('Level_4_Element_Type', lit(None)) \
     .withColumn('Level_5_RPI_Code', lit(None).cast(IntegerType())) \
     .withColumn('Level_5_RP_Site', lit(None)) \
     .withColumn('Level_5_Element_Type', lit(None))



def expand_to_level_2(df, dfsourcefull):
    joined_df = df.alias('parent').join(
        dfsourcefull.alias('child'),
        (col('parent.Level_1_RPI_Code') == col('child.PROD_ID').cast(IntegerType())) &
        (col('parent.Level_1_Element_Type') == 'HALB'), 
        'left_outer'
    )

    selected_columns = [
        col('parent.Level_0_PROD_ID'),
        col('parent.Level_0_Fp_site'),
        col('parent.Level_1_RPI_Code'),
        col('parent.Level_1_RP_Site'),
        col('parent.Level_1_Element_Type'),
        col('child.RPI_Code').alias('Level_2_RPI_Code'),  # Asegura que esta columna viene del DataFrame 'child'
        col('child.Rpi_Site').alias('Level_2_RP_Site'),   # Similarmente, asegura que estas columnas vienen de 'child'
        col('child.Element_Type').alias('Level_2_Element_Type')
    ]

    # Creamos el DataFrame final seleccionando las columnas deseadas y manejando valores nulos
    final_df = joined_df.select(*selected_columns).fillna({
        'Level_2_RPI_Code': -1,
        'Level_2_RP_Site': 'NA',
        'Level_2_Element_Type': 'NA'
    })

    return final_df



def expand_to_level_3(df, dfsourcefull):
    joined_df = df.alias('parent').join(
        dfsourcefull.alias('child'),
        (col('parent.Level_2_RPI_Code') == col('child.PROD_ID')) & (col('parent.Level_2_Element_Type') == 'HALB'),
        'left_outer'
    )
    selected_columns = [
        col('parent.' + colName) for colName in df.columns
    ] + [
        col('child.RPI_Code').alias('Level_3_RPI_Code'),
        col('child.Rpi_Site').alias('Level_3_RP_Site'),
        col('child.Element_Type').alias('Level_3_Element_Type')
    ]
    final_df = joined_df.select(*selected_columns).fillna({
        'Level_3_RPI_Code': -1,
        'Level_3_RP_Site': 'NA',
        'Level_3_Element_Type': 'NA'
    })
    return final_df


def expand_to_level_4(df, dfsourcefull):
    joined_df = df.alias('parent').join(
        dfsourcefull.alias('child'),
        (col('parent.Level_3_RPI_Code') == col('child.PROD_ID')) & (col('parent.Level_3_Element_Type') == 'HALB'),
        'left_outer'
    )
    selected_columns = [
        col('parent.' + colName) for colName in df.columns
    ] + [
        col('child.RPI_Code').alias('Level_4_RPI_Code'),
        col('child.Rpi_Site').alias('Level_4_RP_Site'),
        col('child.Element_Type').alias('Level_4_Element_Type')
    ]
    final_df = joined_df.select(*selected_columns).fillna({
        'Level_4_RPI_Code': -1,
        'Level_4_RP_Site': 'NA',
        'Level_4_Element_Type': 'NA'
    })
    return final_df

def expand_to_level_5(df, dfsourcefull):
    joined_df = df.alias('parent').join(
        dfsourcefull.alias('child'),
        (col('parent.Level_4_RPI_Code') == col('child.PROD_ID')) & (col('parent.Level_4_Element_Type') == 'HALB'),
        'left_outer'
    )
    selected_columns = [
        col('parent.' + colName) for colName in df.columns
    ] + [
        col('child.RPI_Code').alias('Level_5_RPI_Code'),
        col('child.Rpi_Site').alias('Level_5_RP_Site'),
        col('child.Element_Type').alias('Level_5_Element_Type')
    ]
    final_df = joined_df.select(*selected_columns).fillna({
        'Level_5_RPI_Code': -1,
        'Level_5_RP_Site': 'NA',
        'Level_5_Element_Type': 'NA'
    })
    return final_df


initial_df = prepare_initial_df(find_df3)
expanded_df_level_1 = expand_to_level_2(initial_df, dfsourcefull)
expanded_df_level_2 = expand_to_level_3(expanded_df_level_1, dfsourcefull)
expanded_df_level_3 = expand_to_level_4(expanded_df_level_2, dfsourcefull)
expanded_df_level_4 = expand_to_level_5(expanded_df_level_3, dfsourcefull)

# #check de cargas en niveles
# display(expanded_df_level_1)
# #resultados finales
final_df = expanded_df_level_4
#display(final_df)
final_df.write.format("delta").mode("overwrite").saveAsTable("group_db_innovation_and_insights.Fpboom_SkinPersCr")

In [0]:
%sql
SELECT *
FROM group_db_innovation_and_insights.Fpboom_SkinPersCr
WHERE Level_1_Element_Type = 'HALB'

Level_0_PROD_ID,Level_0_Fp_site,Level_1_RPI_Code,Level_1_RP_Site,Level_1_Element_Type,Level_2_RPI_Code,Level_2_RP_Site,Level_2_Element_Type,Level_3_RPI_Code,Level_3_RP_Site,Level_3_Element_Type,Level_4_RPI_Code,Level_4_RP_Site,Level_4_Element_Type,Level_5_RPI_Code,Level_5_RP_Site,Level_5_Element_Type


In [0]:
%sql
REFRESH TABLE group_db_innovation_and_insights.ShaveCare

In [0]:
%sql
SELECT *
FROM
group_db_innovation_and_insights.ShaveCare limit 10

In [0]:
%sql
SELECT *
FROM group_db_innovation_and_insights.Fpboom_ShaveCare
WHERE Level_2_RPI_Code IS NOT NULL 
AND Level_1_Element_Type = 'HALB'
OR Level_2_RPI_Code <> -1 --AND Level_0_Fp_site = '5593'


In [0]:
%sql
CREATE OR REPLACE TABLE group_db_innovation_and_insights.Fpboom_ShaveCare
USING DELTA
PARTITIONED BY (Level_0_PROD_ID, Level_0_Fp_site)
AS
SELECT * FROM group_db_innovation_and_insights.Fpboom_ShaveCare_A
UNION ALL
SELECT * FROM group_db_innovation_and_insights.Fpboom_ShaveCare_B
UNION ALL
SELECT * FROM group_db_innovation_and_insights.Fpboom_ShaveCare_C
UNION ALL
SELECT * FROM group_db_innovation_and_insights.Fpboom_ShaveCare_D
UNION ALL
SELECT * FROM group_db_innovation_and_insights.Fpboom_ShaveCare_E
UNION ALL
SELECT * FROM group_db_innovation_and_insights.Fpboom_ShaveCare_F
UNION ALL
SELECT * FROM group_db_innovation_and_insights.Fpboom_ShaveCare_G

In [0]:
%sql
CREATE OR REPLACE TABLE group_db_innovation_and_insights.Fpboom_FabricCare
USING DELTA
PARTITIONED BY (Level_0_PROD_ID, Level_0_Fp_site)
AS
SELECT * FROM group_db_innovation_and_insights.Fpboom_FabricCare_A
UNION ALL
SELECT * FROM group_db_innovation_and_insights.Fpboom_FabricCare_B
