Sample PyFlink application reading from and writing to Kinesis Data Stream.
- Flink version: 1.20
- Flink API: Table API & SQL
- Flink Connectors: Kinesis Connector
- Language: Python
This example provides the basic skeleton for a PyFlink application.
The application is written in Python, but operators are defined using SQL. This is a popular way of defining applications in PyFlink, but not the only one. You could attain the same results using Table API ar DataStream API, in Python.
The job can run both on Amazon Managed Service for Apache Flink, and locally for development.
- Python 3.11
- PyFlink library:
apache-flink==1.20.0
- Java JDK 11 and Maven
⚠️ As of 2024-06-27, the Flink Python library 1.20.x may fail installing on Python 3.12. We recommend using Python 3.11 for development, the same Python version used by Amazon Managed Service for Apache Flink runtime 1.20.
JDK and Maven are used to download and package any required Flink dependencies, e.g. connectors, and to package the application as
.zip
file, for deployment to Amazon Managed Service for Apache Flink.
The application expects two Kinesis Data Streams, one for input and one for output. Single-shard Streams in Provisioned mode will be sufficient.
The stream ARN and regions are passed to the application as runtime configuration parameters (see below). The application defines no default stream ARN and region.
The application must have sufficient permissions to read from the input Kinesis Stream, and write to the output Kinesis Stream.
When running locally, you need valid AWS credentials/profile that allows reading and writing from the streams.
When running on Amazon Managed Service for Apache Flink the runtime configuration comes from Runtime Properties.
When running locally, the configuration is read from the application_properties.json
file located in the project folder.
Runtime parameters:
Group ID | Key | Mandatory | Example Value | Notes |
---|---|---|---|---|
InputStream0 |
stream.arn |
Y | arn:aws:kinesis:<region>:<accountId>:stream/ExampleInputStream |
Input stream ARN. |
InputStream0 |
aws.region |
Y | us-east-1 |
Region for the input stream. |
InputStream0 |
flink.source.init.position |
N | LATEST |
Stream initial position in the input stream. LATEST by default |
InputStream0 |
flink.source.init.timestamp |
N | 2025-02-17T21:45:42.123 |
Initial position timestamp. Ignored unless initial position is AT_TIMESTAMP |
OutputStream0 |
stream.arn |
Y | arn:aws:kinesis:<region>:<accountId>:stream/ExampleOutputStream |
Output stream ARN. |
OutputStream0 |
aws.region |
Y | us-east-1 |
Region for the output stream. |
In addition to these configuration properties, when running a PyFlink application in Managed Flink you need to set two Additional configuring for PyFink application on Managed Flink.
To tell Managed Flink what Python script to run and the fat-jar containing all dependencies, you need to specific some additional Runtime Properties, as part of the application configuration:
Group ID | Key | Mandatory | Value | Notes |
---|---|---|---|---|
kinesis.analytics.flink.run.options |
python |
Y | main.py |
The Python script containing the main() method to start the job. |
kinesis.analytics.flink.run.options |
jarfile |
Y | lib/pyflink-dependencies.jar |
Location (inside the zip) of the fat-jar containing all jar dependencies. |
⚠️ If you forget adding these parameters to the Runtime properties, the application will not start.
- Make sure you have created the Kinesis Streams and you have a valid AWS session that allows you to publish to the Streams (the way of doing it depends on your setup)
- Run
mvn package
once, from this directory. This step is required to download the jar dependencies - the Kinesis connector in this case - Edit the local configuration application_properties.json file to match the streams you have created.
- Set the environment variable
IS_LOCAL=true
. You can do from the prompt or in the run profile of the IDE - Run
main.py
You can also run the python script directly from the command line, like python main.py
. This still require running mvn package
before.
If you are using Virtual Environments, make sure the to select the venv as a runtime in your IDE.
If you forget the set the environment variable IS_LOCAL=true
or forget to run mvn package
the application fails on start.
⚠️ The application does not log or print anything. If you do not see any output in the console, it does not mean the application is not running. The output is sent to the Kinesis streams. You can inspect the content of the streams using the Data Viewer in the Kinesis console
Note: if you modify the Python code, you do not need to re-run mvn package
before running the application locally.
By default, the PyFlink application running locally does not send logs to the console. Any exception thrown by the Flink runtime (i.e. not due to Python error) will not appear in the console. The application may appear to be running, but actually continuously failing and restarting.
To see any error messages, you need to inspect the Flink logs. By default, PyFlink will send logs to the directory where the PyFlink module is installed (Flink home). Use this command to find the directory:
$ python -c "import pyflink;import os;print(os.path.dirname(os.path.abspath(pyflink.__file__))+'/log')"
- Make sure you have the required input and output Kinesis Streams
- Create a Managed Flink application
- Modify the application IAM role to allow reading from the input and writing to the output Kinesis Stream
- Package the application: run
mvn clean package
from this directory - Upload to an S3 bucket the zip file that the previous creates in the
./target
subdirectory - Configure the Managed Flink application: set Application Code Location to the bucket and zip file you just uploaded
- Configure the Runtime Properties of the application, creating the Group ID, Keys and Values as defined in the application_properties.json
- Do not forget to add the Additional configuring for PyFink application on Managed Flink
- Start the data generator script
- Start the application
- When the application transitions to "Ready" you can open the Flink Dashboard to verify the job is running, and you can inspect the data published to the output Kinesis Streams, using the Data Viewer in the Kinesis console.
Amazon Managed Service for Apache Flink sends all logs to CloudWatch Logs. You can find the name of the Log Group and Log Stream in the configuration of the application, in the console.
Errors caused by the Flink engine are usually logged as ERROR
and easy to find. However, errors reported by the Python
runtime are not logged as ERROR
.
Apache Flink logs any entry reported by the Python runtime using a logger named org.apache.flink.client.python.PythonDriver
.
The easiest way to find errors reported by Python is using CloudWatch Insight, and run the following query:]
fields @timestamp, message
| sort @timestamp asc
| filter logger like /PythonDriver/
| limit 1000
⚠️ If the Flink jobs fails to start due to an error reported by Python, for example a missing expected configuration parameters, the Amazon Managed Service for Apache Flink may report as Running but the job fails to start. You can check whether the job is actually running using the Apache Flink Dashboard. If the job is not listed in the Running Job List, it means it probably failed to start due to an error.In CloudWatch Logs you may find an
ERROR
entry with not very explanatory message "Run python process failed". To find the actual cause of the problem, run the CloudWatch Insight above, to see the actual error reported by the Python runtime.
Follow this process to make changes to the Python code
- Modify the code locally (test/run locally, as required)
- Re-run
mvn clean package
- if you skip this step, the zipfile is not updated, and contains the old Python script. - Upload the new zip file to the same location on S3 (overwriting the previous zip file)
- In the Managed Flink application console, enter Configure, scroll down and press Save Changes
- If your application was running when you published the change, Managed Flink stops the application and restarts it with the new code
- If the application was not running (in Ready state) you need to click Run to restart it with the new code
⚠️ by design, Managed Flink does not detect the new zip file automatically. You control when you want to restart the application with the code changes. This is done saving a new configuration from the console or using the UpdateApplication API.
Use the Python script provided, to generate sample stock data to Kinesis Data Stream.
The application consumes data from a Kinesis source and publishes without any modification to a Kinesis sink.
This examples also demonstrate how to include jar dependencies - e.g. connectors - in a PyFlink application, and how to package it, for deploying on Amazon Managed Service for Apache Flink.
Any jar dependencies must be added to the <dependencies>
block in the pom.xml file.
In this case, we have included flink-connector-aws-kinesis-streams
.
Executing mvn package
takes care of downloading any defined dependencies and create a single "fat-jar" containing all of them.
This file, is generated in the ./target
subdirectory and is called pyflink-dependencies.jar
The
./target
directory and any generated files are not supposed to be committed to git.
Note: because we are using Maven to build a "fat-jar", we should not use the "SQL" dependencies, named "-sql-". In fact, we are including
flink-connector-aws-kinesis-streams
instead offlink-sql-connector-aws-kinesis-streams
. Using "SQL" dependencies may increase the size of the jar.
When running locally, for example in your IDE, PyFlink will look for this jar file in ./target
.
When you are happy with your Python code and you are ready to deploy the application to Amazon Managed Service for Apache Flink,
run mvn package
again. The zip file you find in ./target
is the artifact to upload to S3, containing
both jar dependencies and your Python code.