# Getting Started with PyFLink with Soumil Shah

# Step 1: Install Libraray and Packages 

In [1]:
! pip install Faker

Collecting Faker
  Using cached Faker-33.3.0-py3-none-any.whl.metadata (15 kB)
Using cached Faker-33.3.0-py3-none-any.whl (1.9 MB)
Installing collected packages: Faker
Successfully installed Faker-33.3.0


In [2]:
! pip show apache-flink

Name: apache-flink
Version: 1.20.0
Summary: Apache Flink Python API
Home-page: https://flink.apache.org
Author: Apache Software Foundation
Author-email: dev@flink.apache.org
License: https://www.apache.org/licenses/LICENSE-2.0
Location: /opt/anaconda3/envs/apache_flink_practice/lib/python3.10/site-packages
Requires: apache-beam, apache-flink-libraries, avro-python3, cloudpickle, fastavro, httplib2, numpy, pandas, pemja, protobuf, py4j, pyarrow, python-dateutil, pytz, requests, ruamel.yaml
Required-by: 


In [3]:
! java -version

openjdk version "21.0.1" 2023-10-17 LTS
OpenJDK Runtime Environment Corretto-21.0.1.12.1 (build 21.0.1+12-LTS)
OpenJDK 64-Bit Server VM Corretto-21.0.1.12.1 (build 21.0.1+12-LTS, mixed mode, sharing)


# Step 2 : Basics 

# PyFlink Offers 
* DataStream API
* Table API 

# Table API 
* Apache Flink offers a Table API as a unified, relational API for batch and stream processing, i.e., queries are executed with the same semantics on unbounded, real-time streams or bounded, batch data sets and produce the same results. The Table API in Flink is commonly used to ease the definition of data analytics, data pipelining, and ETL applications.

## Table Enviroment 


-> streaming TableEnvironment
```
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
```


->batch TableEnvironment
```
env_settings = EnvironmentSettings.in_batch_mode()
table_env = TableEnvironment.create(env_settings)
```


#### Creating DataFraeme from LIst of Tuples

In [8]:
from pyflink.table import EnvironmentSettings, TableEnvironment
from faker import Faker

# Create a batch TableEnvironment
env_settings = EnvironmentSettings.in_batch_mode()
table_env = TableEnvironment.create(env_settings)

# Initialize Faker
fake = Faker()

# Generate fake data and convert it into a PyFlink table with column names
data = [(fake.name(), fake.city(), fake.state()) for _ in range(10)]  # Generate 10 rows of fake data

# Define column names
column_names = ["name", "city", "state"]

# Create a PyFlink table with column names
table = table_env.from_elements(data, schema=column_names)

# Print the table
table.execute().print()


+--------------------------------+--------------------------------+--------------------------------+
|                           name |                           city |                          state |
+--------------------------------+--------------------------------+--------------------------------+
|                  Logan Johnson |                West Lauriefort |                       Michigan |
|                Zachary Jenkins |                   North Daniel |                         Oregon |
|                Courtney Wright |                    Port Joseph |                    Connecticut |
|                  Alicia Wagner |                      Leachbury |                     New Mexico |
|                Elizabeth Scott |              West Jeffreymouth |                       Oklahoma |
|                 Jessica Torres |                      Emilyfort |                        Indiana |
|                   Shawn Martin |          North Jonathanborough |                       A

# Creating Temp View 


In [9]:
table_env.create_temporary_view('source_table', table)

table_env.execute_sql(f"SELECT * FROM source_table ").print()

+--------------------------------+--------------------------------+--------------------------------+
|                           name |                           city |                          state |
+--------------------------------+--------------------------------+--------------------------------+
|                  Logan Johnson |                West Lauriefort |                       Michigan |
|                Zachary Jenkins |                   North Daniel |                         Oregon |
|                Courtney Wright |                    Port Joseph |                    Connecticut |
|                  Alicia Wagner |                      Leachbury |                     New Mexico |
|                Elizabeth Scott |              West Jeffreymouth |                       Oklahoma |
|                 Jessica Torres |                      Emilyfort |                        Indiana |
|                   Shawn Martin |          North Jonathanborough |                       A

### Selecting a column

In [13]:
table.select(col("name"), col("city")).execute().print()

+--------------------------------+--------------------------------+
|                           name |                           city |
+--------------------------------+--------------------------------+
|                  Logan Johnson |                West Lauriefort |
|                Zachary Jenkins |                   North Daniel |
|                Courtney Wright |                    Port Joseph |
|                  Alicia Wagner |                      Leachbury |
|                Elizabeth Scott |              West Jeffreymouth |
|                 Jessica Torres |                      Emilyfort |
|                   Shawn Martin |          North Jonathanborough |
|                 Samuel Hampton |                   Williamville |
|                  Sarah Kennedy |                    Thomasmouth |
|                      Emily May |                      Whitefort |
+--------------------------------+--------------------------------+
10 rows in set


### Filtering Data

In [15]:
from pyflink.table.expressions import col

table \
    .select(col("name"), col("city"), col("state")) \
    .where(col("state") == 'Connecticut') \
    .execute().print()

+--------------------------------+--------------------------------+--------------------------------+
|                           name |                           city |                          state |
+--------------------------------+--------------------------------+--------------------------------+
|                Courtney Wright |                    Port Joseph |                    Connecticut |
+--------------------------------+--------------------------------+--------------------------------+
1 row in set


### Group By

In [16]:
table \
    .group_by(col("state")) \
    .select(col("state").alias("state"), col("name").count.alias("count")) \
    .execute().print()

+--------------------------------+----------------------+
|                          state |                count |
+--------------------------------+----------------------+
|                        Indiana |                    1 |
|                       Oklahoma |                    1 |
|                    Mississippi |                    1 |
|                    Connecticut |                    1 |
|                        Wyoming |                    1 |
|                         Oregon |                    1 |
|                       Arkansas |                    1 |
|                       Michigan |                    1 |
|                     New Mexico |                    2 |
+--------------------------------+----------------------+
9 rows in set


# Creating SINK

In [17]:
table_env.execute_sql("""
    CREATE TABLE print_sink (
        name STRING, 
        city STRING,
        state STRING
    ) WITH (
        'connector' = 'print'
    )
""")

table_env.execute_sql("""
    INSERT INTO print_sink
        SELECT * FROM source_table
""").wait()



1> +I[Logan Johnson, West Lauriefort, Michigan]
1> +I[Zachary Jenkins, North Daniel, Oregon]
1> +I[Courtney Wright, Port Joseph, Connecticut]
1> +I[Alicia Wagner, Leachbury, New Mexico]
1> +I[Elizabeth Scott, West Jeffreymouth, Oklahoma]
1> +I[Jessica Torres, Emilyfort, Indiana]
1> +I[Shawn Martin, North Jonathanborough, Arkansas]
1> +I[Samuel Hampton, Williamville, New Mexico]
1> +I[Sarah Kennedy, Thomasmouth, Mississippi]
1> +I[Emily May, Whitefort, Wyoming]


# Collect Results to Client 

In [18]:
table_result = table_env.execute_sql(f"SELECT * FROM source_table ")

with table_result.collect() as results:
   for result in results:
       print(result)


<Row('Logan Johnson', 'West Lauriefort', 'Michigan')>
<Row('Zachary Jenkins', 'North Daniel', 'Oregon')>
<Row('Courtney Wright', 'Port Joseph', 'Connecticut')>
<Row('Alicia Wagner', 'Leachbury', 'New Mexico')>
<Row('Elizabeth Scott', 'West Jeffreymouth', 'Oklahoma')>
<Row('Jessica Torres', 'Emilyfort', 'Indiana')>
<Row('Shawn Martin', 'North Jonathanborough', 'Arkansas')>
<Row('Samuel Hampton', 'Williamville', 'New Mexico')>
<Row('Sarah Kennedy', 'Thomasmouth', 'Mississippi')>
<Row('Emily May', 'Whitefort', 'Wyoming')>


# Convert Pandas DataFrame to PyFlink Table and Vice Versa

In [20]:
pandas_df = table.to_pandas()
pandas_df

Unnamed: 0,name,city,state
0,Logan Johnson,West Lauriefort,Michigan
1,Zachary Jenkins,North Daniel,Oregon
2,Courtney Wright,Port Joseph,Connecticut
3,Alicia Wagner,Leachbury,New Mexico
4,Elizabeth Scott,West Jeffreymouth,Oklahoma
5,Jessica Torres,Emilyfort,Indiana
6,Shawn Martin,North Jonathanborough,Arkansas
7,Samuel Hampton,Williamville,New Mexico
8,Sarah Kennedy,Thomasmouth,Mississippi
9,Emily May,Whitefort,Wyoming


In [24]:
# Create a PyFlink Table from a Pandas DataFrame with the specified row type
table_temp = table_env.from_pandas(pandas_df)
table_temp.execute().print()

+--------------------------------+--------------------------------+--------------------------------+
|                           name |                           city |                          state |
+--------------------------------+--------------------------------+--------------------------------+
|                  Logan Johnson |                West Lauriefort |                       Michigan |
|                Zachary Jenkins |                   North Daniel |                         Oregon |
|                Courtney Wright |                    Port Joseph |                    Connecticut |
|                  Alicia Wagner |                      Leachbury |                     New Mexico |
|                Elizabeth Scott |              West Jeffreymouth |                       Oklahoma |
|                 Jessica Torres |                      Emilyfort |                        Indiana |
|                   Shawn Martin |          North Jonathanborough |                       A

# UDF

In [25]:
table.execute().print()

+--------------------------------+--------------------------------+--------------------------------+
|                           name |                           city |                          state |
+--------------------------------+--------------------------------+--------------------------------+
|                  Logan Johnson |                West Lauriefort |                       Michigan |
|                Zachary Jenkins |                   North Daniel |                         Oregon |
|                Courtney Wright |                    Port Joseph |                    Connecticut |
|                  Alicia Wagner |                      Leachbury |                     New Mexico |
|                Elizabeth Scott |              West Jeffreymouth |                       Oklahoma |
|                 Jessica Torres |                      Emilyfort |                        Indiana |
|                   Shawn Martin |          North Jonathanborough |                       A

# UDF

In [26]:
import uuid
import functools  # Import functools

from pyflink.table.udf import udf
from pyflink.table.expressions import col, call
from pyflink.table import TableEnvironment, EnvironmentSettings



def generate_guid():
    return str(uuid.uuid4())


myhash = udf(functools.partial(a), result_type='STRING')

result_table = table.select(col("city"), col("name"), call(myhash).alias("guid"))

result_table.execute().print()

NameError: name 'a' is not defined

# Referneces 
* https://nightlies.apache.org/flink/flink-docs-master/docs/dev/python/table/intro_to_table_api/