- Scan, Select and Filter - Kamal Reddy
- Aggregation operations - Mohan Pratapa
- Column Operations - Bhaskar Reddy
- Set Operations - Chandrakanth Polisetty
- Join operations on pyflink tables - Rohan Bhandari
- Convert pandas dataframes to pyflink table - suma soma
Apache Flink is an open-source, unified stream-processing and batch-processing framework. It provides a high-throughput, low-latency streaming engine as well as support for event-time processing and state management. Its applications are fault-tolerant in the event of machine failure and support exactly-once semantics.It does not provide its own data-storage system, but provides data-source and sink connectors to systems such as Amazon Kinesis, Apache Kafka, HDFS, Apache Cassandra, and ElasticSearch
In this project, I have taken data set which contains the data on countries and its respective count and performed operations like scan, select and filter.
- Data set containing the names of different countires was taken from www.kaggle.com.
- Aggregation operations like Group by has been performed on the data set.
- Python script has been developed which has the source and sink tables setup.
- Environment path set up needs to be done in the python file.
- To execute the python script, type the command python aggregations.py
- The result has been displayed in the output/aggregation folder in different partition files.
- python 3.5+ versions
- Install apache-flink using pip command
python -m pip install apache-flink
- Two datasets to perform join operations
We need to configure the flink table environment using the BatchTableEnvironment as we are currently working on the batch data and create an instance for the enivornment.
t_env = BatchTableEnvironment.create(environment_settings=EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build())
After creating the table enivornment, we need to configure the source and sink table and need to define the schema for each table. We need to configure two sources as we are performing joins operations on two different datasets and also we need to configure the sink table with the schema that matches the expected results.
- Creating sourcing table
source_ddl1 = f"""
create table `Order`(
CustomerID BIGINT,
OrderID BIGINT
) with (
'connector' = 'filesystem',
'format' = 'csv',
'path' = '{source_data_path1}'
)
"""
- Creating sink Table
sink_ddl = f"""
create table `Result`(
OrderID BIGINT,
CustomerName VARCHAR,
Country VARCHAR
) with (
'connector' = 'filesystem',
'format' = 'csv',
'path' = '{result_data_path}'
)
"""
After creating the source and sink tables we need to perform the join operations as below:
left.join(right).where(left.CustomerID == right.CustID).select(left.OrderID , right.CustomerName, right.Country).execute_insert("Result").wait()
left_outer_result = left.left_outer_join(right, left.CustomerID == right.CustID).select(left.OrderID, right.CustomerName, right.Country)
right_outer_result = left.right_outer_join(right, left.CustomerID == right.CustID).select(left.OrderID, right.CustomerName, right.Country)
left_outer_result.execute_insert("Result").wait()
right_outer_result.execute_insert("Result").wait()
To execute, open the powershell as administrator from root folder of the project.
Then run the following command:
python joins.py
We get the output to the redirected output folder.
- Two datasets to perform set operations
- Apache Flink
- Two datasets containing information about the cricket players have been taken form www.kaggle.com
- Create the table enivornment using the following commands
t_env = BatchTableEnvironment.create(
environment_settings=EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build())
- Developed the python script with source and sink tables in it.
- These were the commands used to perform operations on datasets:
left = t_env.from_path("IPLDC")
right = t_env.from_path("IPLMI")
result = left.union(right)
result1 =left.union_all(right)
result2 = left.intersect(right)
result3 = left.intersect_all(right)
- Run the script using "python filename.py"
- result will be displayed in output/sets in your project folder.
- I have taken the data set from an external source https://www.kaggle.com/
- Performed operation like adding column to the table, droping specified column from the table, replacing the columns.
- Python script has been developed which has the source and sink tables setup.
- Used the powershell terminal to excute the python script "python column.py
# environment configuration
t_env = BatchTableEnvironment.create(
environment_settings=EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build())
- Developed the python script with source and sink tables in it.
- These were the commands used to perform operations on datasets:
source_ddl = f"""
create table Orders(
DEST_COUNTRY VARCHAR,
ORIGIN_COUNTRY VARCHAR,
Cnt BIGINT
) with (
'connector' = 'filesystem',
'format' = 'csv',
'path' = '{source_data_path}'
)
"""
t_env.execute_sql(source_ddl)
sink_ddl = f"""
create table `ResultC`(
ORIGIN_COUNTRY VARCHAR,
Cnt BIGINT
) with (
'connector' = 'filesystem',
'format' = 'csv',
'path' = '{result_data_path}'
)
"""
t_env.execute_sql(sink_ddl)
# specify table program
orders = t_env.from_path("Orders") # schema (a, b, c, rowtime)
# result = orders.add_columns(concat(orders.DEST_COUNTRY, ' code 1'))
# result.execute_insert("ResultC").wait()
# result2 = orders.add_or_replace_columns(concat(orders.DEST_COUNTRY, ' code 1').alias('desc'))
# result2.execute_insert("ResultC").wait()
result3 = orders.drop_columns(orders.DEST_COUNTRY)
result3.execute_insert("ResultC").wait()
# orders.group_by("DEST_COUNTRY").select(orders.DEST_COUNTRY, orders.DEST_COUNTRY.count.alias('cnt')).execute_insert("Result").wait()
To convert pandas Dataframe to pyflink table i am using Colaboratory a product from Google Research.
Pandas DataFrames can be converted into a PyFlink Table. Internally, PyFlink will serialize the Pandas DataFrame using Arrow columnar format. The serialized data will be processed and deserialized in Arrow source during execution. The Arrow source can also be used in streaming jobs.
[Vidgrid Link] https://app.vidgrid.com/view/7rJtDa7oJWfv
-
Apache Flink
-
Python
-
A Dataset
I took Netflix dataset from kaggle.
-
Web browser
-
Colaboratory
Colaboratory, or "Colab" for short, allows you to write and execute Python in your browser, with Zero configuration required, Free access to GPUs and Easy sharing.
Google provides Jupyter Notebook like interface to run Python code on online Virtual Machines.
With Colab you can harness the full power of popular Python libraries to analyze and visualize data.
After opening colab, select New Python3 Notebook
start importing all necessary libraries.
Add python script to the cell and run the code pressing Ctlr + ENTER keys.
Upload your dataset to colab.
Install Apache Flink using command.
- !pip install apache-flink
Add the following statements to import the libraries.
-
from pyflink.table import StreamTableEnvironment, DataTypes, table_config
-
from pyflink.datastream import StreamExecutionEnvironment
-
import pandas as pd
Table API applications begin by declaring a table environment; either a BatchTableEvironment for batch applications or StreamTableEnvironment for streaming applications. This serves as the main entry point for interacting with the Flink runtime. It can be used for setting execution parameters such as restart strategy, default parallelism, etc. The table config allows setting Table API specific configurations
env.set_parallelism(1)
t_config = table_config.TableConfig()
t_env = StreamTableEnvironment.create(env,t_config)
t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed", True)
Create a Pandas DataFrame
- df = pd.read_csv("/content/netflix_titles.csv")
Reading the columns from csv file.
- col = ['show_id', 'type', 'title']
Create a flink table using dataframe.
- table = t_env.from_pandas(df,col)
Command to show the values
- table.select("show_id, type").to_pandas()
Script:
Output:
References: