In [65]:
from pyspark.sql import SparkSession
import sys
import os

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

# Initialize Spark session
spark = SparkSession.builder.master("local").appName("NotebookTests").getOrCreate()

### Reader testing

In [66]:
from input.Reader import Reader
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Define metadata for testing
sources_metadata =[
      {
        "name": "person_inputs",
        "paths": [
          "C:/Users/andres.langoyo/Documents/prueba_tecnica/dataflow/dt-technical-proyect/data/input/person"
      ],
        "format": "JSON",
        "schema":[
          {"field":"name", "type":"STRING"},
          {"field":"age", "type":"INTEGER"},
          {"field":"office", "type":"STRING"}
          ],
          
      }
    ]

# Initialize Reader and load DataFrame
reader = Reader(spark)
sources = reader.load_df(sources_metadata)

# Validate output
assert "person_inputs" in sources, "Source name not found in output"
df = sources["person_inputs"]
assert df is not None, "DataFrame should not be None"
assert df.columns == ["name", "age", "office"], "Unexpected columns in DataFrame"
assert [field.dataType for field in df.schema] == [StringType(), IntegerType(), StringType()], "Wrong schhema types"
df.show()


+--------+----+---------+
|    name| age|   office|
+--------+----+---------+
|  xavier|  32|barcelona|
|  miguel|  12|santander|
|  manuel|  56|   murcia|
|  miguel|  56|         |
|ricardio|NULL|   murcia|
|    juan|  45|   getafe|
| ricardo|  37| valencia|
|    fran|  29| alicante|
+--------+----+---------+



Wrong input path

In [67]:
from input.Reader import Reader
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Define metadata for testing
sources_metadata =[
      {
        "name": "person_inputs",
        "paths": [
          "./data/input/person/p_1.json",
      ],
        "format": "CSV",
        "schema":[
          {"field":"name", "type":"STRING"},
          {"field":"age", "type":"INTEGER"},
          {"field":"office", "type":"STRING"}
          ],
          
      }
    ]

try:
  reader = Reader(spark)
  sources = reader.load_df(sources_metadata)
  sources['person_inputs'].show()
except Exception as e:
  assert "Error reading sources" in str(e), "not existing path not caught"

Malformed input

In [68]:
from input.Reader import Reader
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Define metadata for testing
sources_metadata =[
      {
        "name": "person_inputs",
        "paths": [
          "./data/input/person/people_1.json",
      ],
        "format": "CSV",
        "schema":[
          {"field":"name"},
          {"field":"age", "type":"INTEGER"},
          {"field":"office", "type":"STRING"}
          ],
          
      }
    ]

try:
  reader = Reader(spark)
  sources = reader.load_df(sources_metadata)
  sources['person_inputs'].show()
except Exception as e:
    assert "Error reading sources" in str(e), "badly formed schema not caught"

Error in schema conversion: 'type'


corrupted json input

In [69]:
from input.Reader import Reader
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Define metadata for testing
sources_metadata =[
      {
        "name": "person_inputs",
        "paths": [
          "./data/input/corrupted_input.json",
      ],
        "format": "JSON",
        "schema":[
          {"field":"name", "type": "STRING"},
          {"field":"age", "type":"INTEGER"},
          {"field":"office", "type":"STRING"}
          ],
          
      }
    ]

reader = Reader(spark)
sources = reader.load_df(sources_metadata)
sources['person_inputs'].show()

RuntimeError: Error reading sources: [PATH_NOT_FOUND] Path does not exist: file:/c:/Users/andres.langoyo/Documents/prueba_tecnica/dataflow/dt-technical-proyect/data/input/person/corrupted_input.json.

options in csv

In [59]:
from input.Reader import Reader
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Define metadata for testing
sources_metadata =[
      {
        "name": "person_inputs",
        "paths": [
          "./data/input/input_csv.csv"
      ],
        "format": "CSV",
        "schema":[],
          "options":{
              "inferSchema": "true",
              "header":"true"
          }
          
      }
    ]

reader = Reader(spark)
sources = reader.load_df(sources_metadata)
df = sources['person_inputs']
assert df.columns == ["name", "age", "office"], "Unexpected columns in DataFrame"
assert [field.dataType for field in df.schema] == [StringType(), IntegerType(), StringType()], "Wrong schhema types"
df.show()

+-------+---+---------+
|   name|age|   office|
+-------+---+---------+
| xavier| 32|barcelona|
| miguel| 12|santander|
| manuel| 56|   murcia|
|   juan| 45|   getafe|
|ricardo| 37| valencia|
|   fran| 29| alicante|
+-------+---+---------+



### Transformations testing

Correct functionality

In [60]:
from transformations.TransformationManager import TransformationManager
from pyspark.sql.functions import col
# Sample data and schema
data = [("John", 30, "new york"), ("Jane", None, "new orleans"), ("", 25, "berlin"), ("", None, "munich")]
schema = ["name", "age", "office"]
df = spark.createDataFrame(data, schema)

# Initialize TransformationManager and apply transformations
transformator = TransformationManager()
transformations = [
    {
        "name": "validate_fields",
        "type": "validate_fields",
        "input": "test_input",
        "params": {
            "validations": [
                {"field": "name", "validations": ["notEmpty"]},
                {"field": "age", "validations": ["notNull"]}
            ]
        }
    },
    {
        "name": "ok_with_date",
        "type": "add_fields",
        "input": "test_input",
        "params": {
          "addFields": [
            {
              "name": "dt",
              "function": "current_timestamp"
            }
          ]
        }
      }
]

transformed_df = transformator.apply_transformations(df, transformations, "test_input")
# Validate transformation
assert "validation_errors" in transformed_df.columns, "Validation errors column missing"
assert transformed_df.filter((col("office") == "new york") & (col("validation_errors") == "[]")).count() == 1, "validation error should be empty in correct row"
assert transformed_df.filter((col("office") == "new orleans") & (col("validation_errors").contains("notNull"))).count() == 1, "validation error should contain a not null error"
assert transformed_df.filter((col("office") == "berlin") & (col("validation_errors").contains("notEmpty"))).count() == 1, "validation error should contain a not empty error"
assert transformed_df.filter((col("office") == "munich") & (col("validation_errors").contains("notEmpty")) & (col("validation_errors").contains("notNull"))).count() == 1, "validation error should contain a not null and not empty error"
assert "dt" in transformed_df.columns, "Timestamp column missing"

transformed_df.show()

+----+----+-----------+--------------------+--------------------+
|name| age|     office|   validation_errors|                  dt|
+----+----+-----------+--------------------+--------------------+
|John|  30|   new york|                  []|2024-12-20 13:54:...|
|Jane|NULL|new orleans|[notNull: age mus...|2024-12-20 13:54:...|
|    |  25|     berlin|[notEmpty: name m...|2024-12-20 13:54:...|
|    |NULL|     munich|[notEmpty: name m...|2024-12-20 13:54:...|
+----+----+-----------+--------------------+--------------------+



Non existing validation

In [61]:
from transformations.TransformationManager import TransformationManager
from pyspark.sql.functions import col
# Sample data and schema
data = [("John", 30, "new york"), ("Jane", None, "new orleans"), ("", 25, "berlin"), ("", None, "munich")]
schema = ["name", "age", "office"]
df = spark.createDataFrame(data, schema)

# Initialize TransformationManager and apply transformations
transformator = TransformationManager()
transformations = [
    {
        "name": "validate_fields",
        "type": "validate_fields",
        "input": "test_input",
        "params": {
            "validations": [
                {"field": "name", "validations": ["notEmpty"]},
                {"field": "age", "validations": ["weirdVal"]}
            ]
        }
    },
    {
        "name": "ok_with_date",
        "type": "add_fields",
        "input": "test_input",
        "params": {
          "addFields": [
            {
              "name": "dt",
              "function": "current_timestamp"
            }
          ]
        }
      }
]
try:
    transformed_df = transformator.apply_transformations(df, transformations, "test_input")
except Exception as e:
    assert "Unknown validation rule" in str(e), "unknown validation error not caught"

Unknown validation rule: weirdVal
Error during validation: Unknown validation rule: weirdVal
Error applying transformation validate_fields: Error during validation: Unknown validation rule: weirdVal


Unkown transformation

In [62]:
from transformations.TransformationManager import TransformationManager
from pyspark.sql.functions import col
# Sample data and schema
data = [("John", 30, "new york"), ("Jane", None, "new orleans"), ("", 25, "berlin"), ("", None, "munich")]
schema = ["name", "age", "office"]
df = spark.createDataFrame(data, schema)

# Initialize TransformationManager and apply transformations
transformator = TransformationManager()
transformations = [
    {
        "name": "validate_fields",
        "type": "validate_fields",
        "input": "test_input",
        "params": {
            "validations": [
                {"field": "name", "validations": ["notEmpty"]},
                {"field": "age", "validations": ["notNull"]}
            ]
        }
    },
    {
        "name": "ok_with_date",
        "type": "delete_column",
        "input": "test_input",
        "params": {
          "delete_column": ['age']
        }
      }
]
try:
    transformed_df = transformator.apply_transformations(df, transformations, "test_input")
except Exception as e:
    assert "Unknown transformation rule" in str(e), "unknown validation error not caught"

Unknown transformation rule: delete_column
Error applying transformation delete_column: Unknown transformation rule: delete_column


### Writer testing

Testing it writes to the three possible paths ok, ko, default

In [63]:
import json
import shutil
from input.Reader import Reader
from transformations.TransformationManager import TransformationManager
from output.Writer import Writer

shutil.rmtree("./data/output")

# Load metadata for the workflow
metadata_path = "./metadata/test_writer.json"
with open(metadata_path, "r") as file:
    metadata = json.load(file)

# Initialize components
reader = Reader(spark)
transformator = TransformationManager()
writer = Writer(spark)

# Execute the workflow
sources = reader.load_df(metadata["sources"])

# Apply transformations
for source_name, source in sources.items():
    sources[source_name] = transformator.apply_transformations(
        df=source, 
        transformations=metadata["transformations"], 
        input_name=source_name
    )

# Write data to sinks
writer.write_dataframes(sources, metadata["sinks"])

# Validate outputs
output_ok = "./data/output/ok/person"
output_ko = "./data/output/ko/person"
all = "./data/output/all/person"
assert os.path.exists(output_ok), "Output for valid data (ok) was not created"
assert os.path.exists(output_ko), "Output for invalid data (ko) was not created"
assert os.path.exists(all), "Output for default data (wrong path) was not created"


DataFrame[name: string, age: int, office: string, dt: timestamp] csv ['./data/output/ok/person']
DataFrame[name: string, age: int, office: string, validation_errors: string, dt: timestamp] json ['./data/output/ko/person']
DataFrame[name: string, age: int, office: string] parquet ['./data/output/all/person']


### End to end test

In [64]:
import subprocess
import os
metadata_path = "./metadata/conf.json"
logs_path = "./logs/tests_logs.txt"

# Run the main script
result = subprocess.run(
    [sys.executable, "./dataflow.py", "-m", metadata_path, "-l", logs_path],
    capture_output=True,
    cwd=os.path.dirname(os.path.abspath('./dt-technical-proyect'))
)
print(result)
# Check the logs for success
assert result.returncode == 0, "error occurred while running the subprocess"
with open(logs_path, "r") as log_file:
    logs = log_file.read()
    assert "Successfully processed sink" in logs, "process didn't finish successfully"

CompletedProcess(args=['c:\\Users\\andres.langoyo\\Documents\\prueba_tecnica\\dataflow\\dt-technical-proyect\\.venv\\Scripts\\python.exe', './dataflow.py', '-m', './metadata/conf.json', '-l', './logs/tests_logs.txt'], returncode=0, stdout=b"DataFrame[name: string, age: int, office: string, dt: timestamp] csv ['./data/output/ok/person']\r\nDataFrame[name: string, age: int, office: string, validation_errors: string, dt: timestamp] json ['./data/output/ko/person']\r\nCORRECTO: el proceso con PID 30984 (proceso secundario de PID 20212)\r\nha sido terminado.\r\nCORRECTO: el proceso con PID 20212 (proceso secundario de PID 5884)\r\nha sido terminado.\r\nCORRECTO: el proceso con PID 5884 (proceso secundario de PID 35808)\r\nha sido terminado.\r\n", stderr=b'Setting default log level to "WARN".\nTo adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\r\n24/12/20 13:54:58 WARN Utils: Service \'SparkUI\' could not bind on port 4040. Attempting port 4041.\r\n2