Skip to content

morsapaes/pyflink-zeppelin

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

3 Commits
 
 
 
 

Repository files navigation

Interactive Data Exploration with PyFlink and Zeppelin Notebooks

⚠️ Update: This repository will no longer be actively maintained. Please check the Ververica fork.

See the slides for more context.

Pre-requisites

Getting Zeppelin up and running

You can find instructions on how to run Zeppelin with the Flink interpreter in this blogpost. In the future, this repo will have a docker-compose setup that wraps everything up for you.

Exploring the Movie Lovers on MUBI dataset (Kaggle)

Once Zeppelin is up and running, import the ApacheCon@Home 2020_2FH7SDKQ8.zpln notebook. You'll be looking at three different files:

  • mubi_movie_data.csv - Data from all movies registered on Mubi.

  • mubi_ratings_data.csv - Data from ratings on Mubi for users who did not set their profile in private mode (~15 million rows).

  • mubi_ratings_user_data.csv - Aggregated data from users for a specific day.

Interpreters

Zeppelin has support for a wide range of interpreters, like Flink, R, Python, Markdown, bash and a lot of others. It allows you to use multiple interpreters in a single notebook, too! Here, you'll use a mix of the %flink.pyflink (PyFlink) and %flink.bsql (Flink Batch SQL) interpreters. For %flink.pyflink, Zeppelin creates the following environment variables for you:

  • s_env: StreamExecutionEnvironment

  • b_env: ExecutionEnvironment

  • st_env: StreamTableEnvironment

  • bt_env: BatchTableEnvironment

For simplicity, you'll be batch-reading files stored locally. The cool thing about PyFlink is that you can e.g. replace the existing source tables with Kafka-backed ones that continuously consume data and not have to change any of the code (other than bt_env to st_env).

Creating the Source Tables

The recommended (and quickest way) to define a source table is to use SQL DDL. Tables are defined as two parts: the logical schema, laying out the table columns and data types; and the connector configuration, defining the physical system "backing" the table. You can use either interpreter for this!

create_source_tables

Running Basic Analytics

As an example, you can use PyFlink to query the mubi_movies table and get the average movie popularity per movie release year. What does it tell us?

  • There are two clear outlier years when it comes to popularity (1878 and 1902). If you investigate further, there is only one movie release in each of these years on Mubi ("Sallie Gardner at a Gallop" and "A Trip to the Moon") — they're just really popular!

  • The 1920s were a busy period for silent movie releases, which seem pretty popular with Mubi users.

  • The 1920s-1960s are considered the golden era of Hollywood, so that can explain the increased popularity of movies released in this period.

avg_popularity

Using Dynamic Input Forms

top10

Using Pandas (and other Python libraries)

One way to use PyFlink with Pandas is to first use it to reduce the amount of data we want to act upon (which might be a considerably small subset of the original dataset), taking advantage of how performant PyFlink is even on the largest of large datasets; and then convert the resulting table into a Pandas DataFrame using .toPandas().

Plotting a Histogram

histogram


And that's it!

For the latest updates, follow Apache Flink and Apache Zeppelin on Twitter.

About

Analyzing MUBI data with PyFlink (+ pandas) and interactive Zeppelin notebooks. 📈

Resources

Stars

Watchers

Forks