## Insert Data Access Token

You have to generate an access token in order to securely access data from your Lake at Nekt. When generating the token you can associate it with one or more tables.

Follow the steps on this link to get your token:
https://app.nekt.ai/transformations/resources

In [1]:
import requests

resp = requests.get(
    "https://api.nekt.ai/api/v1/jupyter-credentials/",
    headers={"X-Jupyter-Token": "<<INSERT_DATA_ACCESS_TOKEN_HERE>>"},
)
credentials = resp.json()

## Create Spark session

This code block starts a Spark session for your data transformation. No need to change it, unless you know what you're doing.

In [None]:
# Create a Spark session with your AWS Credentials - DO NOT CHANGE THIS!

from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from delta.tables import DeltaTable
from pyspark.sql import DataFrame

conf = (
    SparkConf()
    .setAppName("Nekt-Transformation")  # replace with your desired name
    .set("spark.jars.packages", "io.delta:delta-core_2.12:2.3.0,org.apache.hadoop:hadoop-aws:3.3.4")
    .set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .set("spark.hadoop.fs.s3a.access.key", credentials["aws_access_key_id"])
    .set("spark.hadoop.fs.s3a.secret.key", credentials["aws_secret_access_key"])
    .set("spark.hadoop.fs.s3a.session.token", credentials["aws_session_token"])  # optional
    .set("spark.sql.shuffle.partitions", "4")  # default is 200 partitions which is too many for local
    .setMaster("local[*]")  # replace the * with your desired number of cores. * for use all.
)

spark = SparkSession.builder.config(conf=conf).getOrCreate()

## Load tables from your Nekt catalog

Replace the `INPUT_TABLES` variable with the list of input tables associated with your token. 

You can access the list by clicking on 'Token tables' on the previosuly created token (https://app.nekt.ai/transformations/resources).

In [None]:
INPUT_TABLES = [] # REPLACE THIS WITH THE LIST OF INPUT_TABLES 

### Load input tables - DO NOT CHANGE THIS!
delta_dict = {}
for table in INPUT_TABLES:
    table_layer = table.get("layer")
    table_name = table.get("name")
    if not delta_dict.get(table_layer):
        delta_dict[table_layer] = {}

    try:
        delta_dict[table_layer][table_name] = DeltaTable.forPath(spark, table.get("path"))
    except:
        delta_dict[table_layer][table_name] = None
        print(f'Failed to load delta table "{table_name}" from layer "{table_layer}".')
    else:
        print(f'Delta table "{table_name}" loaded from layer "{table_layer}".')

## User transformation

### Create dataframes

For each data frame you want to access in your notebook, run the following code:

```
df: DataFrame = delta_dict.get("layer_name").get("table_name").toDF()
```

- `layer_name` is a string with the name of the layer where your table is located, for example: 'service' or 'trusted'
- `table_name` is a string with the name of the table you want to access

The table names and layers are all listed in the `INPUT_TABLES` variable from the **Load input tables** section above

In [None]:
df: DataFrame = delta_dict.get("INSERT_LAYER_NAME_HERE").get("INSERT_TABLE_NAME_HERE").toDF() # rename your data frame as you wish!

### Transformation script

Add your transformation code here, feel free to create multiple sections for exploration, test and validation purposes.

In [None]:
# your code here

### Final transformation function

Once your script is validated, you have to put it all together in a `user_transformation` function. This function receives a list of tables as input and generates an output table.

Avoid using commands to print something out, count or do any verification such as `.show()`, `.count()` and others, since they will slow down your data transformation and consume more resources.

In [None]:
def user_transformation(delta_dict):
    
    # Load dataframes
    df: DataFrame = delta_dict.get("INSERT_LAYER_NAME_HERE").get("INSERT_TABLE_NAME_HERE").toDF()
    
    # Transformation script
    # ...
    # ...

    return final_df

### Transformation test

Run this block of code to ensure you haven't missed andy piece of code and your final function is returning the desired dataframe.

You DON'T need to copy it to the platform, it's just a verification step.

In [None]:
final_df = user_transformation(delta_dict)
final_df.show(5)

# Add transformation to Nekt

Now that you have tested your final function, go to [Add transformation](https://app.nekt.ai/transformations/add-transformation) at Nekt, select your input tables, give your new table a name, and paste the `user_transformation(delta_dict)` in the code section. 