# Experiment pyspark scaffold

Our engine **must** be coded in this notebook. The platform provides some cool automations to ensure the correct engine life cycle; this is why we are asking you to please use this notebook. Don’t panic! You can find some useful guidelines in the sections below. If you want to use this example you must **copy** it to the **nootebooks folder**

In [None]:
from exampleenginepythonqiyhbwvw.business.business_logic import BusinessLogic

# Environment Variables #

In [None]:
#ENVIRONMENT_VARS_PM
INPUTCLIENTFILE = "/data/sandboxes/test/data/clients.csv"
INPUTCONTRACTFILE = "/data/sandboxes/test/data/contracts.csv"
INPUTPRODUCTFILE = "/data/sandboxes/test/data/products.csv"
OUTPUTFILE = "/data/sandboxes/test/data/output"

# Read Data #
Reading input and output paths and create dataframes

In [None]:
clients_df = spark.read.csv(INPUTCLIENTFILE, header=True)
contracts_df = spark.read.csv(INPUTCONTRACTFILE, header=True)
products_df = spark.read.csv(INPUTPRODUCTFILE, header=True)

# Bussines Logic #
Filtering Clients Dataframe depending on age and if Client is or not VIP

In [None]:
business_logic = BusinessLogic()
clients_df = business_logic.filter_example(clients_df)

Spark Join between already filtered Clients and Contracts Dataframes by "cod_client"/"cod_titular". 
Spark Broadcast Join between previous join Dataframe and products by "cod_product"

In [None]:
clients_contracts_df = business_logic.join_example(clients_df, contracts_df, products_df)

Spark SQL Query for filtering Clients with more than 3 contracts

In [None]:
clients_contracts_df.createOrReplaceTempView("clients_contracts_df")
clients_target_df = business_logic.filter_sql_example(spark)

# Add new hash column

In [None]:
clients_target_df = business_logic.add_hash(clients_target_df)

# Cache Dataframe for future operations with it

In [None]:
clients_target_df_cache = clients_target_df.cache()

# Write Data #
Writing final Dataframe in parquet file

In [None]:
if OUTPUTFILE != "":
    clients_target_df_cache.write.parquet(OUTPUTFILE, mode="overwrite")

# Sending notification

In [None]:
business_logic.send_notification(clients_target_df_cache)

# Enma convert #
The enma-sdk library provides the convert functionality. In order to make use of it you need to have the library installed.

In [None]:
!pip install enma31==2.0.1 --user

Once installed you can make use of it from a notebook. For example in our notebook it includes at the end the following sentences:

In [None]:
import enma

This way the enma.convert function will generate an `experiment.py` file with the content of the notebook passed as first parameter, in our case `engine.ipynb` in that same path, if you want it to generate it somewhere else you can specify the path in the second parameter of the function. By default it generates it in the same path as the notebook file passed as the first parameter.
When executing the enma.convert cell, a button panel with two buttons will appear:
* **Validate**: which will pass a pep8 rules autoformatter, automatically resolving all the errors it can and showing the ones that need manual correction by the user. It will also show certain warnings about other validations such as avoid prints, .show()...
* **Export**: Once the code has been left error free (with or without warnings) it will generate two files:
  * `application.conf`: from the variables defined in the cell with `#ENVIRONMENT_VARS_PM`.
  * `experiment.py`: script with the contents of the notebook


In [None]:
enma.convert("engine.ipynb",".")

Once the notebook is validated we must copy the file experiment.py to the repository module `exampleenginepythonqiyhbwvw/` folder and the `application.conf` file to the `resources/`folder.

In [None]:
![[ `basename $PWD` == "notebooks" ]] && mv experiment.py ../exampleenginepythonqiyhbwvw && mv application.conf ../resources/ && echo "Files moved succesfully" || echo "Failed. Please move the files manually."