# Pismo Data Engineering Test

# Architecture

This architecture was created to demonstrate a cluster and to simulate a cloud provider. 

A initial standalone application was made, but after thought and incentives, this was enriched to this format. 

The test is entirely implemented on this Notebook and the whole application is suited on docker structure.

## Requirements

* Docker environment
* Ports 4040, 8080, 8088, 8888 and 9870 available 

## Docker Stack

* Hadoop Cluster, with:
  - [namenode](http://localhost:9870)
  - [resource manager](http://localhost:8088)
  - one datanode
  - one node manager
* Spark Cluster, with
  - [Spark Master](http://localhost:8080)
  - Two Workers
* Jupyter Notebook, with
  - [Spark Job status](http://localhost:4040) - as Spark does not support full cluster mode with Python, the status application needs to run on the caller
  - [Jupyter Environment](http://localhost:8888) - This notebook

## Images

The project runs on official images of Apache, Jupyter and Bitnami.

Bitnami Spark image was chosen due to the easy of use and pre-configuration to clustering.

## Considerations

* I implemented with a local Hadoop cluster to make a full demonstration of Spark and the solution. On production environment, this will probably be hosted on some cloud platform (S3, GCS, OCS etc)
* The Jupyter Notebook is just to facilitate the usage and documentation. All scripts can be executed as DAGs or applications.
* This solution can easily be transposed to a Helm structure, to be executed on Kubernetes


# Execution steps

## Generate Data

The data is not persisted on host, being generated based on the script taken from [Eder's Github](https://github.com/eder1985/pismo_recruiting_technical_case/tree/main) with a few changes: 

* Added execution batches
* Added total registries to create in each batch
* Added duplication rate
* Saving on HDFS - based on this architecture
* Unique filenames

The next cell will generate the fake data. To check the progress, visit [Hadoop Namenode](http://localhost:9870/explorer.html#/pismo-data/source) 


In [None]:
# Generate data

from local_fake_data_generator import main as generate_fake_data

# Configure here the parameters to generate the data
batches = 5
total_per_file = 10_000
duplication_rate = 0.1

generate_fake_data(batches, total_per_file, duplication_rate)


## Start processing

The next cell is responsible for creating the connection to [Apache Spark](http://localhost:8080). 

Job status can be monitored from [Job Page](http://localhost:4040)

Basic data validation is done on source: 
* Assert is not empty
* Assert the required columns are present


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.dataframe import DataFrame
from os import getenv

# Configurations
BASE_DATA = getenv('HDFS_NAMENODE') + '/pismo-data/'
SOURCE_DATA = BASE_DATA + 'source/'
OUTPUT_DATA = BASE_DATA + 'output/'

## Initialize Spark Session
spark = SparkSession.builder.master(getenv('SPARK_MASTER')).getOrCreate()

# Read data from /pismo-data/
source = spark.read.option('multiline', True).json( SOURCE_DATA + '*.json')

# Basic data validations over datasource
assert isinstance(source,  DataFrame), type(source)
assert not source.isEmpty()
for column in [ 'domain', 'event_id', 'event_type', 'timestamp', 'data']: 
    assert column in source.columns

source.summary().show()

## Data processing

Here the data will be processed. 

To implement the rules, I opted to not use UDFs as they are very slow in processing time. All processing is done via native functions.

1. Add columns `unique_event` (based on `domain`+`event_type` and `domain_id` (`data.id` to the sources
2. De-duplicate data:
   * Create a windowed partition over `unique_event` and `domain_id`
   * Order by the most recent first
   * Filter by `row_number == 1`
3. Add columns based on data timestamp - to be able to partition afterwards

At the end, the cell shows some information about how many duplicated registries were removed (based on `domain`).

In [None]:
from pyspark.sql import Window

import pyspark.sql.functions as F
from pyspark.sql.functions import col

# Adding identifiers column
print('Adding unique event and domain ids')
domain_data = source\
    .withColumn('unique_event', F.concat(col('domain'), col('event_type')))\
    .withColumn('domain_id', col('data.id'))

# Dropping Duplicates over unique_event and domain_id
print('Deduplicating data...')
pismo_window = Window.partitionBy(['unique_event', 'domain_id']).orderBy(col('timestamp').desc())
windowed_data =  domain_data\
    .withColumn('row_number', F.row_number().over(pismo_window))

duplicated = windowed_data.where(col('row_number') > 1)

pismo_data = windowed_data\
    .where(col('row_number') == 1)\
    .drop('row_number')

## Time partitioning
print('Time partitioning... ')
date_column = 'timestamp'
final = pismo_data\
    .withColumn('year', F.year(date_column))\
    .withColumn('month', F.month(date_column))\
    .withColumn('day', F.dayofmonth(date_column))\
    .drop('unique_event')\
    .drop('domain_id')

print('Duplicated data: ')
duplicated.groupBy(col('domain')).count().show()

## Saving

This step saves the output on HDFS cluster with Parquet format, partitioned by `domain`+`date` (broken down). Example:
```
domain=transaction/
  year=2024/
    month=9/
        day=10/
            file0001.parquet
            file0002.parquet
```

After processing, the full output can be seen on [HDFS Namenode](http://localhost:9870/explorer.html#/pismo-data/output/)

### TODO

* The only thing that I couldn't do here is to put a progress bar or similar to be able to monitor Spark Application. For now, it can be done on [Status Job](http://localhost:4040)

In [None]:
print(f'Saving to {OUTPUT_DATA}')
final\
    .repartition('domain', 'timestamp')\
    .write.mode('overwrite')\
    .partitionBy(['domain', 'year', 'month', 'day'])\
    .parquet(OUTPUT_DATA)

print('Saved!')

## Validation over saved data 

This step reads the recently processed data and validates the output.

Besides the above basic validation, it checks if data was truly de-duplicated (counts only)

Note: the `recursiveFileLookup` allows Spark to read based on the partition. This is not used here, but is very useful when filtering.


In [None]:
# Validation over Parquet data

validation = spark.read.option('recursiveFileLookup', False).parquet(OUTPUT_DATA)

# Basic data validations over datasource
assert isinstance(validation,  DataFrame), type(source)
assert not validation.isEmpty()
for column in [ 'domain', 'event_id', 'event_type', 'timestamp', 'data']: 
    assert column in validation.columns

source_count = source.count()
validation_count = validation.count()

assert source_count > validation_count, 'Expected to have duplicated data!'
assert source_count - validation_count == duplicated.count(), 'Unexpected counts over duplicated data!'

validation.summary().show()

### Validation over transations

To validate data over transactions, we read data and check if the schema checks with expected data: 



In [None]:
valid_transactions = spark.read.option('recursiveFileLookup', False).parquet(OUTPUT_DATA).where(col('domain') == 'transaction').select('domain', 'event_id', 'event_type', 'timestamp', 'data.*')

assert isinstance(valid_transactions, DataFrame)
assert not valid_transactions.isEmpty()

for _col in [ 'id', 'account_orig_id', 'account_dest_id', 'amount', 'currency' ]:
    assert _col in valid_transactions.columns

valid_transactions.groupBy('domain').count().show()

# Data is ready to be consumed!

Below is a sample of getting the total of transactions by currency. 

> (based on fake data that was generated)

In [None]:
df_columns = [ col(_col) for _col in validation.columns if _col != 'data' ]
df_columns.append('data.*')

transactions = validation.where(col('domain') == 'transaction').select(*df_columns) 

# Total `amount` by `new_status`
transactions.groupBy('currency').sum('amount').show()