# Advanced features

This notebook will demonstrate a few more advanced features of the Jupyter Ververica Platform integration.

First we again load the module and connect to an Ververica Platform instance. Make sure you installed `jupyter-vvp` on the kernel you are using.

In [3]:
%reload_ext jupytervvp
%connect_vvp -n default localhost -p 8080 -s mySession --force

Registering jupytervvp for vvp.


<jupytervvp.vvpsession.VvpSession at 0x298531de790>

Next we create a table, this time we are using the Kafka connector, which has a few options that need to be set.

Refer to the [Ververica Platform documentation](https://docs.ververica.com/sql-eap/sql_development/table_view.html#supported-connectors) for further information.

In [4]:
%%flink_sql
CREATE TABLE `kafka_table` (id int)
COMMENT 'SomeComment'
WITH (
    'connector' = 'kafka',
    'topic' = 'topic',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'orderGroup',
    'format' = 'csv'
)

{'result': 'RESULT_SUCCESS'}

Of course, tables can also be altered. Here we use the `ALTER TABLE` statement to change the topic of the `kafka_table`.

In [5]:
%%flink_sql
ALTER TABLE `kafka_table` SET (
    'connector' = 'kafka',
    'topic' = 'other_topic',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'orderGroup',
    'format' = 'csv'
)

{'result': 'RESULT_SUCCESS'}

Variables can also be used:

In [6]:
topic_name = "myTopic"
parallelism = 2
deployment_name = "testName"

In [7]:
%%flink_sql
CREATE TABLE `var_table` (id int)
COMMENT 'SomeComment'
WITH (
    'connector' = 'kafka',
    'topic' = '{topic_name}',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'orderGroup',
    'format' = 'csv'
)

{'result': 'RESULT_SUCCESS'}

Deployment parameters (for e.g., `INSERT INTO` statements) can be set as a Python dictionary object.
Notice that variables here are not interpolated and usual Python syntax can be used:

In [8]:
my_deployment_parameters = {
        "metadata.name": "{}".format(deployment_name),
        "spec.template.spec.parallelism": parallelism,
        "spec.upradeStrategy.kind": "STATEFUL",
        "spec.restoreStrategy.kind": "LATEST_STATE",
        "spec.template.spec.flinkConfiguration.state.backend": "filesystem",
        "spec.template.spec.flinkConfiguration.taskmanager.memory.managed.fraction": "0.1",
        "spec.template.spec.flinkConfiguration.high-availability": "vvp-kubernetes"
}

The following succeeds only when a default deployment target has been set up using the platform,
or when a target has been set up and is specified via a settings object.

In [9]:
%%flink_sql -p my_deployment_parameters -d
INSERT INTO var_table SELECT * FROM kafka_table

That concludes this example and we will clean up the tables we created.

In [10]:
%%flink_sql
DROP TABLE `kafka_table`

{'result': 'RESULT_SUCCESS'}

In [11]:
%%flink_sql
DROP TABLE `var_table`

{'result': 'RESULT_SUCCESS'}

In [13]:
%%flink_sql
SHOW TABLES

Unnamed: 0,table name
