## Introduction to DataFrames

In [1]:
import pandas as pd
import numpy as np
import os
import warnings
import datetime

pd.set_option('display.float_format', lambda x : '{:,.2f}'.format(x))
warnings.filterwarnings("ignore")
pd.set_option('display.max_columns', None)

In [2]:
if not ('sc' in locals() or 'sc' in globals()):
    from pyspark import SparkContext, SparkConf
    from pyspark.sql import SparkSession
    
    conf = SparkConf()
    # conf.setMaster('spark://localhost:7077')
    conf.set('spark.executor.memory', '512m')
    conf.set('spark.sql.jsonGenerator.ignoreNullFields', 'False') # To dump nules in json
    conf.set('spark.app.name', 'basics')

    sc = SparkContext.getOrCreate(SparkContext(conf=conf))
    
    spark = SparkSession \
        .builder \
        .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/27 11:19:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/04/27 11:19:23 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, current_timestamp, lit, when
from pyspark.sql import functions as F
from pyspark.sql import DataFrame

def validate_fields_cols(df: DataFrame, field_validations: list) -> tuple:
    """
    Validate fields in DataFrame based on specified validations.

    Args:
        df (DataFrame): Input DataFrame.
        field_validations (list): List of dictionaries containing field validations.

    Returns:
        tuple: A tuple containing the validated DataFrame and a list of error columns.
    """
    error_columns = []
    for validation in field_validations:
        field = validation['field']
        validations = validation['validations']
        for v in validations:
            error_col_name = f"{field}_{v}"  # New column name for error
            error_columns.append(error_col_name)
            if 'notEmpty' in v:
                df = df.withColumn(error_col_name, when(col(field) == "", lit("KO")).otherwise(lit("OK")))
            if 'notNull' in v:
                df = df.withColumn(error_col_name, when(col(field).isNull(), lit("KO")).otherwise(lit("OK")))
    return df, error_columns

def filter_by_column_values(df: DataFrame, columns: list) -> tuple:
    """
    Filter DataFrame based on column values.

    Args:
        df (DataFrame): Input DataFrame.
        columns (list): List of column names to filter by.

    Returns:
        tuple: A tuple containing the filtered DataFrame and a DataFrame containing rows not matching the filter.
    """
    condition = " AND ".join(f"{col} = 'OK'" for col in columns)
    filtered_df = df.filter(condition)
    not_ok = df.subtract(filtered_df)
    return filtered_df.drop(*columns), not_ok

def struct_columns_to_single_column(df: DataFrame, column_names: list) -> DataFrame:
    """
    Combine struct columns into a single column.

    Args:
        df (DataFrame): Input DataFrame.
        column_names (list): List of column names to combine.

    Returns:
        DataFrame: DataFrame with combined struct columns.
    """
    struct_col = F.struct(*[F.col(col_name) for col_name in column_names])
    return df.withColumn("validations", struct_col).drop(*column_names)

def load_input(spark: SparkSession, **kwargs) -> DataFrame:
    """
    Load input data into DataFrame.

    Args:
        spark (SparkSession): Spark session object.
        kwargs: Additional arguments for loading data.

    Returns:
        DataFrame: Loaded DataFrame.
    """
    df = spark.read.load(**kwargs)
    return df

def save_output(df: DataFrame, **kwargs) -> None:
    """
    Save DataFrame to output location.

    Args:
        df (DataFrame): DataFrame to save.
        kwargs: Additional arguments for saving data.
    """
    df.write.save(**kwargs)

def transform(input_df: DataFrame, transformation_type: str, steps: list) -> DataFrame:
    """
    Perform transformation on DataFrame.

    Args:
        input_df (DataFrame): Input DataFrame.
        transformation_type (str): Type of transformation to perform.
        steps (list): List of transformation steps.

    Returns:
        DataFrame: Transformed DataFrame.
    """
    return transform_funcs_dict[transformation_type](input_df, steps)

def validate_fields(input_df: DataFrame, steps: list) -> tuple:
    """
    Validate fields in DataFrame.

    Args:
        input_df (DataFrame): Input DataFrame.
        steps (list): List of validation steps.

    Returns:
        tuple: A tuple containing the DataFrame with valid rows and DataFrame with invalid rows.
    """
    validated_df, val_cols = validate_fields_cols(input_df, steps)
    validated_df_OK, validated_df_NOTOK = filter_by_column_values(validated_df, val_cols)
    validated_df_NOTOK = struct_columns_to_single_column(validated_df_NOTOK, val_cols)
    validated_df_NOTOK = add_current_timestamp(validated_df_NOTOK)
    return validated_df_OK, validated_df_NOTOK

def add_fields(input_df: DataFrame, params: list) -> list:
    """
    Add new fields to DataFrame based on specified parameters.

    Args:
        input_df (DataFrame): Input DataFrame.
        params (list): List of dictionaries containing parameters for adding fields.

    Returns:
        list: List containing the modified DataFrame.
    """
    for item in params:
        column_name = item['name']
        function_name = item['function']
        
        # Get the function object dynamically
        function = globals()[function_name]
        
        # Apply function and add column to DataFrame
        input_df = input_df.withColumn(column_name, function())
    return [input_df]

def add_current_timestamp(df: DataFrame) -> DataFrame:
    """
    Add current timestamp column to DataFrame.

    Args:
        df (DataFrame): Input DataFrame.

    Returns:
        DataFrame: DataFrame with current timestamp column added.
    """
    return df.withColumn("dt", current_timestamp())

# Dictionary to map transformation types to corresponding functions
transform_funcs_dict = dict()
transform_funcs_dict["validate_fields"] = validate_fields
transform_funcs_dict["add_fields"] = add_fields

In [4]:
metadata = {
    "sources": [{
        "name": "person_inputs",
        "params": {
            "path": "/Users/allianz/Desktop/workspace/ingestion-metadata-engine/notebooks/data/input/events/person/*.json",
            "format": "JSON"
        }
    }],
    "transformations": [{
            "name": "validation",
            "type": "validate_fields",
            "params": {
                "input": "person_inputs",
                "steps": [
                    {"field": "office", "validations": ["notEmpty"]},
                    {"field": "age", "validations": ["notNull"]}
                ],
                "outputs": {
                    "output_ok": "validation_ok",
                    "output_notok": "validation_notok"
                }
            }
        },
        {
            "name": "ok_with_date",
            "type": "add_fields",
            "params": {
                "input": "validation_ok",
                "steps": [
                        {"name": "dt", "function": "current_timestamp"},
                        {"name": "dt2", "function": "current_timestamp"}
                    ],
                "outputs": {
                    "output": "final_ok"
                }
            }
        }]
    ,
    "outputs": [
        {
            "input": "final_ok",
            "name": "raw-ok",
            "params": {
                "path": "/data/output/events/person/raw-ok.json",
                "format": "JSON",
                "mode": "overwrite"
            }
        },
        {
            "input": "validation_notok",
            "name": "raw-notok",
            "params": {
                "path": "/Users/allianz/Desktop/workspace/ingestion-metadata-engine/notebooks/data/output/discards/person/raw-notok.json",
                "format": "JSON",
                "mode": "overwrite"
            }
        },
        {
            "input": "validation_ok",
            "name": "raw-ok",
            "params": {
                "path": "/Users/allianz/Desktop/workspace/ingestion-metadata-engine/notebooks/data/output/discards/person/raw-ok.parquet",
                "format": "parquet",
                "mode": "overwrite"
            }
        }
    ]
}

In [5]:
def execute_tasks(spark: SparkSession, sc, metadata_input: dict) -> None:
    """
    Execute tasks specified in the metadata input.

    Args:
        spark (SparkSession): Spark session object.
        sc: Spark context.
        metadata_input (dict): Metadata input containing sources, transformations, and outputs.
    """
    outputs_dict = {}

    # Load input data from sources
    for source in metadata_input["sources"]:
        outputs_dict[source["name"]] = load_input(spark, **source["params"])

    # Perform transformations
    for transformation in metadata_input["transformations"]:
        transformation_name = transformation["name"]
        transformation_type = transformation["type"]
        params = transformation["params"]
        input_df = params["input"]
        steps = params["steps"]
        outputs = params["outputs"]
    
        # Transform data
        outs = transform(outputs_dict[input_df], transformation_type, steps)
        temp = {value: outs[idx] for idx, value in enumerate(outputs.values())}
        outputs_dict = {**temp, **outputs_dict}

    # Save output data
    for output in metadata_input["outputs"]:
        save_output(outputs_dict[output["input"]], **output["params"])
        

In [6]:
execute_tasks(spark, sc, metadata)

24/04/27 11:19:26 ERROR FileOutputCommitter: Mkdirs failed to create file:/data/output/events/person/raw-ok.json/_temporary/0
24/04/27 11:19:26 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
java.io.IOException: Mkdirs failed to create file:/data/output/events/person/raw-ok.json/_temporary/0/_temporary/attempt_202404271119266297887620395963654_0001_m_000000_1 (exists=false, cwd=file:/Users/allianz/Desktop/workspace/ingestion-metadata-engine/notebooks)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:515)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:500)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1195)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1175)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1064)
	at org.apache.spark.sql.execution.datasources.CodecStreams$.createOutputStream(CodecStreams.scala:81)
	at org.apache.spark.sql.execution.datasources.CodecStreams$.createOutputS

Py4JJavaError: An error occurred while calling o69.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1) (192.168.1.36 executor driver): java.io.IOException: Mkdirs failed to create file:/data/output/events/person/raw-ok.json/_temporary/0/_temporary/attempt_202404271119266297887620395963654_0001_m_000000_1 (exists=false, cwd=file:/Users/allianz/Desktop/workspace/ingestion-metadata-engine/notebooks)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:515)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:500)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1195)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1175)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1064)
	at org.apache.spark.sql.execution.datasources.CodecStreams$.createOutputStream(CodecStreams.scala:81)
	at org.apache.spark.sql.execution.datasources.CodecStreams$.createOutputStreamWriter(CodecStreams.scala:92)
	at org.apache.spark.sql.execution.datasources.json.JsonOutputWriter.<init>(JsonOutputWriter.scala:47)
	at org.apache.spark.sql.execution.datasources.json.JsonFileFormat$$anon$1.newInstance(JsonFileFormat.scala:81)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:161)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:146)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:389)
	at org.apache.spark.sql.execution.datasources.WriteFilesExec.$anonfun$doExecuteWrite$1(WriteFiles.scala:100)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:893)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:893)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeWrite$4(FileFormatWriter.scala:307)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:271)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(FileFormatWriter.scala:304)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:190)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:190)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:859)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:388)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:361)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:240)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.io.IOException: Mkdirs failed to create file:/data/output/events/person/raw-ok.json/_temporary/0/_temporary/attempt_202404271119266297887620395963654_0001_m_000000_1 (exists=false, cwd=file:/Users/allianz/Desktop/workspace/ingestion-metadata-engine/notebooks)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:515)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:500)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1195)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1175)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1064)
	at org.apache.spark.sql.execution.datasources.CodecStreams$.createOutputStream(CodecStreams.scala:81)
	at org.apache.spark.sql.execution.datasources.CodecStreams$.createOutputStreamWriter(CodecStreams.scala:92)
	at org.apache.spark.sql.execution.datasources.json.JsonOutputWriter.<init>(JsonOutputWriter.scala:47)
	at org.apache.spark.sql.execution.datasources.json.JsonFileFormat$$anon$1.newInstance(JsonFileFormat.scala:81)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:161)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:146)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:389)
	at org.apache.spark.sql.execution.datasources.WriteFilesExec.$anonfun$doExecuteWrite$1(WriteFiles.scala:100)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:893)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:893)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more
