In [3]:
from ace.utils import prep_general_material_data, read_file

In [2]:
df = read_file("../data/system_1/PRE_MARA.csv", "CSV", {"header": "true"})

In [5]:
# Pyspark libraries
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import DataFrame

from ace.utils import enforce_schema, read_file
from ace.schemas import MARA_SCHEMA


def prep_general_material_data(
    df: DataFrame,
    col_mara_global_material_number:str,
    check_old_material_number_is_valid: bool = True,
    check_material_is_not_deleted: bool = True,
):
    """
    Filters materials based on validity of the old material number (BISMT) and deletion flag (LVORM)
    and renames the global material number column and selects required columns.

    Parameters:
    -----------
    df : DataFrame
        Input PySpark DataFrame containing material data.
    col_mara_global_material_number : str
        Column name for the global material number for the system.
    check_old_material_number_is_valid : bool, optional (default=True)
        If True, filters out rows where the old material number is invalid.
        Valid old material numbers are not in ["ARCHIVE", "DUPLICATE", "RENUMBERED"] or null.
    check_material_is_not_deleted : bool, optional (default=True)
        If True, excludes rows where the deletion flag is not null or not empty.
    rename_global_material_number : str, optional (default=None)
        If specified, renames the global material number column to this consistent name.

    Returns:
    --------
    DataFrame
        A PySpark DataFrame after applying the filters and renaming.
    """
    # Apply old material number validity filter
    if check_old_material_number_is_valid:
        df = df.filter(
            (F.col("BISMT").isNull()) | (~F.col("BISMT").isin("ARCHIVE", "DUPLICATE", "RENUMBERED"))
        )

    # Apply material not deleted filter
    if check_material_is_not_deleted:
        df = df.filter(
            (F.col("LVORM").isNull()) | (F.col("LVORM") == "")
        )

    # Rename global material number column
    df = df.withColumnRenamed(col_mara_global_material_number, "global_material_number")


    return enforce_schema(df, MARA_SCHEMA)

In [7]:
prep_general_material_data(df, "ZZMDGM").show(truncate=False)

+----------------------------------------------------------------+----------------------------------------------------------------+----------------------------------------------------------------+----------------------------------------------------------------+
|MANDT                                                           |MATNR                                                           |MEINS                                                           |global_material_number                                          |
+----------------------------------------------------------------+----------------------------------------------------------------+----------------------------------------------------------------+----------------------------------------------------------------+
|ad57366865126e55649ecb23ae1d48887544976efea46a48eb5d85a6eeb4d306|73247d2a426212859ed5573281c4fb0f1ac040983509226591035355f4d0fa68|72dfcfb0c470ac255cde83fb8fe38de8a128188e03ea5ba5b2a93adbea1062fa|73247d2a426212859e

In [8]:
import os

In [4]:
for file_name in os.listdir("../data/system_1/"):
    file_path = os.path.join("../data/system_1/", file_name)

    # Check if it is a file (not a subfolder)
    if os.path.isfile(file_path):
        # Extract the file name without extension
        base_name = os.path.splitext(file_name)[0]
        print(base_name)

        # Read the file based on its extension and create DataFrame
        if file_name.endswith('.csv'):
            df = read_file(file_path, "csv", {"header": "true", "inferSchema": "true"})

        # Dynamically assign the DataFrame to a variable with the same name as the file (without extension)
        globals()[base_name] = df
        print(f"Data loaded into variable: {base_name}")

PRE_AFKO
Data loaded into variable: PRE_AFKO
PRE_AFPO
Data loaded into variable: PRE_AFPO
PRE_AUFK
Data loaded into variable: PRE_AUFK
PRE_MARA
Data loaded into variable: PRE_MARA
PRE_MARC
Data loaded into variable: PRE_MARC
PRE_MBEW
Data loaded into variable: PRE_MBEW
PRE_T001
Data loaded into variable: PRE_T001
PRE_T001K
Data loaded into variable: PRE_T001K
PRE_T001W
Data loaded into variable: PRE_T001W


In [7]:
PRE_AFPO.select("LTRMI")

DataFrame[LTRMI: timestamp]

In [16]:
from ace.utils import enforce_schema

In [2]:
import os

In [14]:
PRE_AFPO.select("KDAUF").show()

+--------------------+
|               KDAUF|
+--------------------+
|                null|
|146dc792be66dba12...|
|                null|
|3bc2e0055747de78b...|
|                null|
|ed13758110de363aa...|
|                null|
|                null|
|8557a40f9fbe2c90f...|
|f0f0cf3f9b941b8cb...|
|f0f0cf3f9b941b8cb...|
|                null|
|                null|
|af850ba2ecb3ea24e...|
|79fa43ff27a96d27b...|
|                null|
|                null|
|                null|
|                null|
|                null|
+--------------------+
only showing top 20 rows



In [17]:
PRE_AUFK.select("KDAUF").show()

+--------------------+
|               KDAUF|
+--------------------+
|                null|
|7884871dce402f358...|
|                null|
|                null|
|                null|
|                null|
|cd14e89eccc6b123f...|
|                null|
|                null|
|                null|
|146dc792be66dba12...|
|bc719afaabc72d67c...|
|e7fc5abf8bd4ad253...|
|d67dddd89a02148df...|
|b53841a0fa8769d2f...|
|                null|
|                null|
|                null|
|                null|
|2b375afa928b8e4fb...|
+--------------------+
only showing top 20 rows



In [18]:
['MATNR', 'AUFNR', 'SOURCE_SYSTEM_ERP', 'MANDT', 'start_date', 'GLTRP', 'GSTRI', 'POSNR', 'DWERK', 'KDAUF', 'LTRMI', 'OBJNR', 'ERDAT', 'ERNAM', 'AUART', 'ZZGLTRP_ORIG', 'ZZPRO_TEXT', 'MANDT', 'MEINS', 'global_material_number', 'primary_key_intra', 'primary_key_inter', 'on_time_flag', 'actual_on_time_deviation', 'late_delivery_bucket', 'mto_vs_mts_flag', 'order_finish_timestamp', 'order_start_timestamp']

['MATNR',
 'AUFNR',
 'SOURCE_SYSTEM_ERP',
 'MANDT',
 'start_date',
 'GLTRP',
 'GSTRI',
 'POSNR',
 'DWERK',
 'KDAUF',
 'LTRMI',
 'OBJNR',
 'ERDAT',
 'ERNAM',
 'AUART',
 'ZZGLTRP_ORIG',
 'ZZPRO_TEXT',
 'MANDT',
 'MEINS',
 'global_material_number',
 'primary_key_intra',
 'primary_key_inter',
 'on_time_flag',
 'actual_on_time_deviation',
 'late_delivery_bucket',
 'mto_vs_mts_flag',
 'order_finish_timestamp',
 'order_start_timestamp']

In [19]:
process_order_schema = ['MATNR', 'AUFNR', 'SOURCE_SYSTEM_ERP', 'MANDT', 'start_date', 'GLTRP', 'GSTRI', 'POSNR', 'DWERK', 'KDAUF', 'LTRMI', 'OBJNR', 'ERDAT', 'ERNAM', 'AUART', 'ZZGLTRP_ORIG', 'ZZPRO_TEXT', 'MTART', 'NTGEW', 'global_material_number', 'primary_key_intra', 'primary_key_inter', 'on_time_flag', 'actual_on_time_deviation', 'late_delivery_bucket', 'mto_vs_mts_flag', 'order_finish_timestamp', 'order_start_timestamp']

In [20]:
process_order_schema

['MATNR',
 'AUFNR',
 'SOURCE_SYSTEM_ERP',
 'MANDT',
 'start_date',
 'GLTRP',
 'GSTRI',
 'POSNR',
 'DWERK',
 'KDAUF',
 'LTRMI',
 'OBJNR',
 'ERDAT',
 'ERNAM',
 'AUART',
 'ZZGLTRP_ORIG',
 'ZZPRO_TEXT',
 'MTART',
 'NTGEW',
 'global_material_number',
 'primary_key_intra',
 'primary_key_inter',
 'on_time_flag',
 'actual_on_time_deviation',
 'late_delivery_bucket',
 'mto_vs_mts_flag',
 'order_finish_timestamp',
 'order_start_timestamp']

In [21]:
local_meterial_schema = ['MANDT', 'BUKRS', 'BWKEY', 'MATNR', 'WERKS', 'SOURCE_SYSTEM_ERP', 'PLIFZ', 'DZEIT', 'DISLS', 'MEINS', 'global_material_number', 'NAME1', 'VPRSV', 'VERPR', 'STPRS', 'PEINH', 'BKLAS', 'WAERS', 'mtl_plant_emd', 'global_mtl_id', 'primary_key_intra', 'primary_key_inter', 'no_of_duplicates']

In [22]:
local_meterial_schema

['MANDT',
 'BUKRS',
 'BWKEY',
 'MATNR',
 'WERKS',
 'SOURCE_SYSTEM_ERP',
 'PLIFZ',
 'DZEIT',
 'DISLS',
 'MEINS',
 'global_material_number',
 'NAME1',
 'VPRSV',
 'VERPR',
 'STPRS',
 'PEINH',
 'BKLAS',
 'WAERS',
 'mtl_plant_emd',
 'global_mtl_id',
 'primary_key_intra',
 'primary_key_inter',
 'no_of_duplicates']

In [24]:
import pandas as pd

In [26]:
pd.DataFrame({"process_order" : process_order_schema})

Unnamed: 0,process_order
0,MATNR
1,AUFNR
2,SOURCE_SYSTEM_ERP
3,MANDT
4,start_date
5,GLTRP
6,GSTRI
7,POSNR
8,DWERK
9,KDAUF


In [27]:
pd.DataFrame({"local_material": local_meterial_schema})

Unnamed: 0,local_material
0,MANDT
1,BUKRS
2,BWKEY
3,MATNR
4,WERKS
5,SOURCE_SYSTEM_ERP
6,PLIFZ
7,DZEIT
8,DISLS
9,MEINS


In [28]:
['global_material_number', 'primary_key_intra', 'primary_key_inter', 'mtl_plant_emd', 'global_mtl_id', 'no_of_duplicates', 'global_material_number', 'primary_key_intra', 'primary_key_inter', 'material_number', 'source_system_erp', 'client', 'company_code', 'valuation_area', 'planned_delivery_time', 'decoupling_time', 'discontinuation_indicator', 'unit_of_measure', 'name_of_plant', 'price_control_indicator', 'moving_average_price', 'standard_price', 'unit_price', 'valuation_class', 'currency_key', 'order_number', 'start_date_source', 'start_date', 'order_start_timestamp_source', 'order_item_number', 'plant', 'sales_order_number', 'order_finish_timestamp_source', 'object_number', 'creation_date', 'created_by', 'order_type', 'original_basic_finish_date', 'project_text', 'material_type', 'net_weight', 'on_time_flag', 'actual_on_time_deviation', 'late_delivery_bucket', 'mto_vs_mts_flag', 'order_finish_timestamp', 'order_start_timestamp']

['global_material_number',
 'primary_key_intra',
 'primary_key_inter',
 'mtl_plant_emd',
 'global_mtl_id',
 'no_of_duplicates',
 'global_material_number',
 'primary_key_intra',
 'primary_key_inter',
 'material_number',
 'source_system_erp',
 'client',
 'company_code',
 'valuation_area',
 'planned_delivery_time',
 'decoupling_time',
 'discontinuation_indicator',
 'unit_of_measure',
 'name_of_plant',
 'price_control_indicator',
 'moving_average_price',
 'standard_price',
 'unit_price',
 'valuation_class',
 'currency_key',
 'order_number',
 'start_date_source',
 'start_date',
 'order_start_timestamp_source',
 'order_item_number',
 'plant',
 'sales_order_number',
 'order_finish_timestamp_source',
 'object_number',
 'creation_date',
 'created_by',
 'order_type',
 'original_basic_finish_date',
 'project_text',
 'material_type',
 'net_weight',
 'on_time_flag',
 'actual_on_time_deviation',
 'late_delivery_bucket',
 'mto_vs_mts_flag',
 'order_finish_timestamp',
 'order_start_timestamp']

In [1]:
import os

In [2]:
from pyspark.sql import SparkSession

In [5]:
ss = SparkSession.builder.appName("test").getOrCreate()

In [6]:
ss

In [8]:
import pyspark.sql.types as T

In [9]:
df = ss.createDataFrame(
        data=[
            ("pk_val_1", "a", 1, "A", 10, "aa", 100),
            ("pk_val_2", "b", 2, "B", 20, "bb", 200),
        ],
        schema=T.StructType(
            [
                T.StructField("pk_col", T.StringType()),
                T.StructField("col_to_keep_1", T.StringType()),
                T.StructField("col_to_keep_2", T.IntegerType()),
                T.StructField("col_to_rename_1_old_name", T.StringType()),
                T.StructField("col_to_rename_2_old_name", T.IntegerType()),
                T.StructField("col_to_delete_1", T.StringType()),
                T.StructField("col_to_delete_2", T.IntegerType()),
            ]
        ),
    )

In [13]:
df.toPandas().head()

ModuleNotFoundError: No module named 'distutils'