Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Design] Presto-on-Spark: A Tale of Two Computation Engines #13856

Open
wenleix opened this issue Dec 12, 2019 · 46 comments
Open

[Design] Presto-on-Spark: A Tale of Two Computation Engines #13856

wenleix opened this issue Dec 12, 2019 · 46 comments
Labels
Roadmap A top level roadmap item
Milestone

Comments

@wenleix
Copy link
Contributor

wenleix commented Dec 12, 2019

Links/Resources:

Abstract

The architecture tradeoff between MapReduce and parallel database has been an open discussion since the dawn of MapReduce system over a decade ago. At Facebook, we have been spent past several years in scaling Presto to Facebook-scale batch workload.

Presto Unlimited aims at solving such scalability challenges. After revisiting the key architecture change (e.g. disaggregated shuffle) required to further scale Presto, we decided Presto-on-Spark as the path to further scale Presto. See the rest of the design doc for details.

We believe this is only a first step towards more confluence between the Spark and the Presto communities, and a major step towards enabling unified SQL experience between interactive and batch use cases.

Introduction

Presto was originally designed for interactive queries but has evolved into a unified engine for both interactive and batch use cases. Scaling an MPP architecture database to batch data processing over Internet-scale datasets is known to be an extremely difficult problem [1].

Presto Unlimited aims at solving such scalability challenges. To truly scale Presto Unlimited to Internet-scale batch workloads we need the following (excluding coordinator scaling and spilling):

  1. Scales shuffle. This requires to either implement MapReduce-style shuffle or integrate with a disaggregated shuffle service such as Cosco.
  2. Scales Presto worker execution. This includes resource isolation, straggler detection, speculative execution, etc.
  3. Scales Presto resource management. A fine grained resource management is required when a single query can take years of CPU. Such concept is known as Mapper/Reducer in MapReduce, executor in Spark, and lifespan in Presto, similar to YARN/Mesos.

We realized these work lays down the foundation for a general-purpose parallel data processing system, such as Spark, FlumeJava, Dryad. Note such data processing system has its own usage and well-defined programming abstraction, and requires years to mature.

We found Presto should leverage existing well-developed systems to scale to large batch workload, instead of “embedding” such a system inside Presto. We also believe such collaboration would help the whole Big Data community to better understand the abstraction between SQL engine and data processing system, as well as evolve and refine the execution primitives to provide near-optimal performance without sacrificing the abstractions.

We choose to leverage Spark as the parallel data processing system to further scale Presto Unlimited as it’s the most widely used open source system in this category. However, the design and architecture here should apply to any other parallel data processing system as well.

Architecture

Screen Shot 2019-12-16 at 4 19 16 PM

  1. Presto Planner needs to know it’s generating plan for Spark execution, and can thus reduce unnecessary nodes (e.g. LocalExchange)

  2. On Spark worker, it includes:
    Construct operator factory chain (a.k.a DriverFactory) through LocalExecutionPlanner
    Instatinate driver by binding the input split, and run the driver

  3. Send the data to a SparkOutputBuffer which will emit to Spark.

@mahengyang
Copy link

mahengyang commented Dec 13, 2019

excellent job! A unify entry for batch data processing and ad-hoc is very import for user. spark,hive,flink,mysql,elasticsearch,mongodb and so on, some is for calculate, and other is for store data, but user could connect them through Presto!

@wenleix wenleix changed the title [Design] Presto-on-Spark [Design] Presto-on-Spark: A Tale of Two Computation Engines Dec 16, 2019
@wenleix wenleix pinned this issue Dec 18, 2019
@wenleix wenleix added the Roadmap A top level roadmap item label Dec 18, 2019
@wenleix
Copy link
Contributor Author

wenleix commented Dec 18, 2019

TODOs (for tracking purpose, keep updating):

@arhimondr
Copy link
Member

presto-spark-classloader-interface

As per earlier discussion, we decided to go with this name explicitly to emphasize that this module is only needed for the classloader isolation, and not for anything fundamental. Once Spark supports classloader isolation internally (or once it is migrated to Java 9+ that supports Java modules), this artificial module should be removed.

@wenleix
Copy link
Contributor Author

wenleix commented Dec 24, 2019

@arhimondr :

I see. But I do see we might also want to put some common classes into classloader-interface package, I think TaskProcessors is already there. See for example
#13760 (comment) , always use serialized byte array makes code more difficult to understand

@wubiaoi
Copy link

wubiaoi commented Dec 31, 2019

What's the difference between doing this and sparksql?

@wenleix
Copy link
Contributor Author

wenleix commented Dec 31, 2019

@wubiaoi : From user experience perspective, Presto-on-Spark will provide the exact language and semantic between interactive and batch. While both Presto and SparkSQL is ANSI-SQL compatible, note there is no “ANSI SQL” as a language: ANSI SQL is an (in some way loose) specification. Many SQL dialects are claimed to be ANSI SQL compatible (notably, Oracle, SQL Server and DB2), yet they are significantly incompatible with each other.

As more details explained in this Quora answer:

ANSI SQL is a specification, not a particular product. It's a document, describing the official features of the SQL language.

Every brand of SQL RDBMS implements a subset of ANSI SQL. Every brand of SQL RDBMS I'm aware of adds some features to the language that are not in the ANSI SQL specification (example: indexes). And each brand implements features in its own way, not necessarily compatible with the others.

Even the language and semantic can be exactly the same, Presto-on-Spark provides unified SQL experience for interactive and batch use case. The unified SQL experience means not only the SQL language and semantic is the same, but the experience should also be similar. This is because while SQL is originally designed to be a declarative language, in almost all practice, user depends on engine-specific implementation details, and use it as imperative language in some part, to get the best performance. The SQL experience includes, but not limited to:

  • Semantics (e.g. NULL handle)
  • Subtle behavior (e.g. the maximum array/map can be handled, emit NULL vs. throw exception)
  • Language hint
  • UDF experience
  • How the plan will be optimized
  • How the SQL will be executed (e.g. performance implication for different way to write SQL, using UNNEST vs. lambda)

I will explain the technical perspective in a separate comment :)

@wenleix
Copy link
Contributor Author

wenleix commented Dec 31, 2019

@wubiaoi : From technical perspective, SparkSQL execution model is row-oriented + whole stage codegen[1], while Presto execution model is columnar processing + vectorization. So architecture-wise Presto-on-Spark will be more similar to the early research prototype Shark [2].

The design trade-offs between row-oriented + whole stage codegen vs. columnar processing + vectorization deserves a very long discussion , I will let @oerling to provide more insights :) . However, with modern Big Data where denormalization is omnipresent, we do see an ever-increasing value of columnar processing + vectorization [3]

[1] Apache Spark as a Compiler: Joining a Billion Rows per Second on a Laptop: https://databricks.com/blog/2016/05/23/apache-spark-as-a-compiler-joining-a-billion-rows-per-second-on-a-laptop.html
[2] Shark: SQL and Rich Analytics at Scale: https://cs.stanford.edu/~matei/papers/2013/sigmod_shark.pdf
[3] Everything You Always Wanted To Do in Table Scan: https://prestodb.io/blog/2019/06/29/everything-you-always-wanted-to-do-in-a-table-scan

@wubiaoi
Copy link

wubiaoi commented Jan 2, 2020

@wenleix 👍 Thank you very much for the explanation.
Is it better to run only the costly stage on spark?
image

@wenleix
Copy link
Contributor Author

wenleix commented Jan 8, 2020

@wubiaoi : While this is certainly possible, this complicates the execution a lot as it requires coordination between two (heterogeneous) execution engine. Also, why not use Presto Unlimited in this case? :)

@KannarFr
Copy link

As I'm looking for HTTP service instead of spark-submit I can work on it. But now you get what I want to do, right? WDYT about it?

@arhimondr
Copy link
Member

Classic presto acts like a service, with an HTTP endpoint to fetch the results. Are you hitting the scalability wall with the classic presto?

As I'm looking for HTTP service instead of spark-submit I can work on it. But now you get what I want to do, right? WDYT about it?

I'm not sure if Spark even supports gradual fetching of the results. You can investigate it. But currently we are collecting results via the collect call, that returns all the results all at once.

As a middle ground you can change your workload to slightly different

  1. Run INSERT INTO tmp_table .... in Presto on Spark that will write the results into a temporary table
  2. Run SELECT * FROM tmp_table in classic Presto to fetch the results

Generally speaking Presto on Spark is mostly designed to run insert queries, that's why we don't care much about returning the results.

@KannarFr
Copy link

Presto on Spark allows changing the catalog for each query by creating Presto runner for each query, correct? Classic Presto does not support to load/unload catalogs: #12605.

My main goal is to provide context (presto catalog) for classic Presto for each query. But in fact, we need to support a very high scale. And I found this project that seems to match my requirements.

@arhimondr
Copy link
Member

Could you please describe your usecase a little bit more? Maybe there's a better way to achieve this dynamic catalog behaviour?

@KannarFr
Copy link

KannarFr commented Sep 24, 2020

Considering millions of catalogs of different types (mysql, psql, ...). Thousands of clients. So a lot of queries.

A client comes with its catalog and query on an HTTP service.

This service sends to Presto/Presto-on-Spark (let's call it system) catalog list and query to run.

Then the system should run the query and stream results through HTTP chunks to limit RAM usage if possible to answer to the client.

This is the use case, it seems simple but its implementation is not.

@wenleix
Copy link
Contributor Author

wenleix commented Sep 25, 2020

@KannarFr : From operation/service perspective, Presto-on-Spark is more like Spark. Thus in my opinion we should leverage what Spark provides for such service (instead of thinking it in the Presto coordinator way).

@djiangc
Copy link

djiangc commented Nov 22, 2020

@arhimondr @wenleix Is it possible to run multiple SQL queries in the query file?

@arhimondr
Copy link
Member

@djiangc Unfortunately no. But that should be an easy feature to add.

@djiangc
Copy link

djiangc commented Nov 26, 2020

@arhimondr @wenleix another question. It seems I can't use cluster deploy-mode on spark-submit for presto-spark-launcher, only client is supported. Is this true or am I missing something?

@arhimondr
Copy link
Member

@djiangc Yes, currently only the client mode is supported.

@djiangc
Copy link

djiangc commented Dec 4, 2020

@arhimondr thanks for your response. I have another question, can I do insert overwrite?
set session hive.insert_existing_partitions_behavior='OVERWRITE';insert into test3 select *,2 from test

@arhimondr
Copy link
Member

@djiangc Currently the launcher doesn't support setting session properties. You must enable the OVERWRITE behaviour with a configuration property: https://github.com/prestodb/presto/blob/master/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java#L561

Also it should be pretty easy to add parameter to the launcher that accepts session properties.

@djiangc
Copy link

djiangc commented Dec 4, 2020

many thanks for your help and pointer, I got the partition overwrite working with spark-presto-launcher @arhimondr

@JituS
Copy link

JituS commented Feb 4, 2021

@arhimondr I am not able to run insert in overwrite mode by setting above property. Is it not supported with s3?
@djiangc Are you using s3 or hdfs?
Getting bellow exception:
java.lang.IllegalStateException: Overwriting existing partition doesn't support DIRECT_TO_TARGET_EXISTING_DIRECTORY write mode

@rguillome
Copy link

Hi,

@arhimondr I understand the philosophy behind this sentence

Generally speaking Presto on Spark is mostly designed to run insert queries

but the insert needs a predefined destination table with a schema, format, location, right?

As an AWS user, what I would find very usefull is to write the result of a presto-on-spark Select to an S3 location and run a Glue crawler on that location to have the table and the infered schema automatically created.

Maybe a CLI argument configuring the dataOutputLocation would do the trick ?

@arhimondr
Copy link
Member

@rguillome Hi! Thanks for reaching out.

In our case we know the output schema in advance, thus we always ending up running INSERT INTO ... an existing table. If the schema is unknown for your usecase did you consider running CREATE TABLE AS SELECT ... to create a temporary table with a well defined schema?

@rguillome
Copy link

rguillome commented Apr 15, 2021

Hi @arhimondr

I was trying to CREATE TABLE AS SELECT ... with an external location but encountered this line in the presto-hive HiveMetadata.beginCreateTable method:

if (getExternalLocation(tableMetadata.getProperties()) != null) { throw new PrestoException(NOT_SUPPORTED, "External tables cannot be created using CREATE TABLE AS"); }

So basically I will try to push a MR with those current changes already made in trinodb

I wonder if the the ultimate solution should'nt be an option to write each final split to a hdfs or S3 location directly to avoid the gathering at the driver level. We could imagine having all the benefits of Hadoop FS organisation (partionning, bucketing, sort and splits). But I'm not already cumfortable with all the details that It would involve to dig into this for now.

@GithubZhitao
Copy link

Is the presto-on-spark's physical plan be applied by DynamicFilter(vs DynamicPartitionPrune) ?

@huleilei
Copy link

@wubiaoi : From technical perspective, SparkSQL execution model is row-oriented + whole stage codegen[1], while Presto execution model is columnar processing + vectorization. So architecture-wise Presto-on-Spark will be more similar to the early research prototype Shark [2].

The design trade-offs between row-oriented + whole stage codegen vs. columnar processing + vectorization deserves a very long discussion , I will let @oerling to provide more insights :) . However, with modern Big Data where denormalization is omnipresent, we do see an ever-increasing value of columnar processing + vectorization [3]

[1] Apache Spark as a Compiler: Joining a Billion Rows per Second on a Laptop: https://databricks.com/blog/2016/05/23/apache-spark-as-a-compiler-joining-a-billion-rows-per-second-on-a-laptop.html [2] Shark: SQL and Rich Analytics at Scale: https://cs.stanford.edu/~matei/papers/2013/sigmod_shark.pdf [3] Everything You Always Wanted To Do in Table Scan: https://prestodb.io/blog/2019/06/29/everything-you-always-wanted-to-do-in-a-table-scan

SparkSQL 3.0+ execution model is aslo columnar processing + vectorization

@GithubZhitao
Copy link

Is the presto-on-spark's physical plan be applied by DynamicFilter(vs DynamicPartitionPrune) ?

It is !

@whutpencil
Copy link

@wenleix Hello, I have a question. Although the compatibility is increased, for queries with small amount of data, isn't the query speed slowed down after adding materialized shuffle? At the same time, I would like to ask how the improved Presto and sparksql compare in terms of a large amount of data?

@rongrong
Copy link
Contributor

@wenleix Hello, I have a question. Although the compatibility is increased, for queries with small amount of data, isn't the query speed slowed down after adding materialized shuffle? At the same time, I would like to ask how the improved Presto and sparksql compare in terms of a large amount of data?

The idea is to run small queries on classic Presto, and run large (won't fit within memory limit) / long running queries (more likely to be affected by cluster stability issues) using Presto-on-Spark.

@GithubZhitao
Copy link

@rongrong
One more question. How presto-on-spark deal with the large amount of data transportation when execute large queries.
As I know, data transport by broadcast machanism. Will all the these moved data go through from spark-driver, which is a single point to coordinate all global data streams. Any bottleneck ?

@whutpencil
Copy link

@rongrong Does this mean that if the user fails to execute through Presto and finds that the SQL is a large query, then submit it through Presto on spark? Does the user have a process of switching the submission method?
At first, I mistakenly thought that all Presto queries were submitted through Presto on spark.

@GithubZhitao
Copy link

GithubZhitao commented Mar 4, 2022

@rongrong Does this mean that if the user fails to execute through Presto and finds that the SQL is a large query, then submit it through Presto on spark? Does the user have a process of switching the submission method?
At first, I mistakenly thought that all Presto queries were submitted through Presto on spark.

As I know, these are totally two processes; You must develop the judge logic to decide whether it is a large query.
Presto-on-spark is exactly a spark process if ignoring the presto's code logic. Either has nothing to do with other. @whutpencil

@476474988
Copy link

use same sql. does the presto-on-spark use less memory and more times?

@tdcmeehan tdcmeehan unpinned this issue Apr 19, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Roadmap A top level roadmap item
Projects
None yet
Development

No branches or pull requests