Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Presto Pinot Connector #13413

Closed
wants to merge 2 commits into from
Closed

Conversation

haibow
Copy link

@haibow haibow commented Sep 18, 2019

== RELEASE NOTES ==

General Changes
* Add Pinot Connector

Pinot Connector Changes
* Add first version of Pinot Connector

Adding Pinot connector that allows using Presto to query Apache Pinot™ (Incubating), a realtime distributed OLAP database.

This is a snapshot version running at Uber till 04/2019, minus internal features that touches core Presto engine, e.g. limit pushdown, simple aggregation pushdown (COUNT/SUM/MAX/MIN), plus a few touchups for open source. It was serving ~40k queries per day.

Note there has been a lot more features and improvements on top of this version including advanced aggregation pushdown (GROUPBY, etc). There are plans to open source the next version in the foreseeable future, with possibility of incremental contribution after de-coupling some components.

Getting Started (will prepare a .rst file for the connector documentation)

Credits to

Prerequisite

  • Mac OS X or Linux
  • Java 8 Update 151 or higher (8u151+), 64-bit. Both Oracle JDK and OpenJDK are supported.
  • Maven 3.3.9+ (for building)

Setting up Pinot

Download and build Pinot
git clone https://github.com/apache/incubator-pinot.git
cd incubator-pinot/
git checkout tags/release-0.1.0
mvn install package -DskipTests -Pbin-dist
Start Pinot
cd pinot-distribution/target/apache-pinot-incubating-0.1.0-bin/apache-pinot-incubating-0.1.0-bin
bin/start-controller.sh > ~/tmp/pinot_controller_cmd.log &
bin/start-broker.sh > ~/tmp/pinot_broker_cmd.log &
bin/start-server.sh > ~/tmp/pinot_server_cmd.log &
Prepare Pinot Data

Create some Pinot segments:

mkdir airlineStatsData
cp sample_data/airlineStats_data.avro airlineStatsData/.

bin/pinot-admin.sh CreateSegment -schemaFile sample_data/airlineStats_schema.json -dataDir airlineStatsData -tableName airlineStats -segmentName airlineStatsSeg_0 -outDir airlineStatsData/airlineStatsSeg_0
bin/pinot-admin.sh CreateSegment -schemaFile sample_data/airlineStats_schema.json -dataDir airlineStatsData -tableName airlineStats -segmentName airlineStatsSeg_1 -outDir airlineStatsData/airlineStatsSeg_1
bin/pinot-admin.sh CreateSegment -schemaFile sample_data/airlineStats_schema.json -dataDir airlineStatsData -tableName airlineStats -segmentName airlineStatsSeg_2 -outDir airlineStatsData/airlineStatsSeg_2

Create Table:

bin/pinot-admin.sh AddSchema  -schemaFile sample_data/airlineStats_schema.json -exec
bin/pinot-admin.sh AddTable -filePath sample_data/airlineStats_offline_table_config.json -controllerPort 9000 -exec

Upload Segments:

bin/pinot-admin.sh UploadSegment -controllerPort 9000 -segmentDir airlineStatsData/airlineStatsSeg_0
bin/pinot-admin.sh UploadSegment -controllerPort 9000 -segmentDir airlineStatsData/airlineStatsSeg_1
bin/pinot-admin.sh UploadSegment -controllerPort 9000 -segmentDir airlineStatsData/airlineStatsSeg_2
If your network env changed, you may need to redo the steps above to start Zookeeper and Pinot.
Verify Pinot is running

Open the following url in the browser, and verify Pinot is running and serving data

http://localhost:9000/query#

Setting up Presto

Build Presto
mvn clean install -DskipTests -T 4
Running Presto in IDE:

Use the following options to create a run configuration:

Main Class: com.facebook.presto.server.PrestoServer
VM Options: -ea -XX:+UseG1GC -XX:G1HeapRegionSize=32M -XX:+UseGCOverheadLimit -XX:+ExplicitGCInvokesConcurrent -Xmx2G -Dconfig=etc/config.properties -Dlog.levels-file=etc/log.properties
Working directory: $MODULE_DIR$
Use classpath of module: presto-main

The working directory should be the presto-main subdirectory. In IntelliJ, using $MODULE_DIR$ accomplishes this automatically.

Running Presto CLI
presto-cli/target/presto-cli-*-executable.jar  --catalog pinot
presto> use airlinestats;

Query Pinot table through Presto

presto:airlinestats> select count(AirlineID) from airlinestats;
 _col0
-------
 29238
(1 row)

Query 20190828_224004_00003_3wqm2, FINISHED, 1 node
Splits: 20 total, 20 done (100.00%)
0:04 [29.2K rows, 114KB] [8.29K rows/s, 32.4KB/s]

@facebook-github-bot
Copy link
Collaborator

Thank you for your pull request and welcome to our community. We require contributors to sign our Contributor License Agreement, and we don't seem to have you on file. In order for us to review and merge your code, please sign up at https://code.facebook.com/cla. If you are contributing on behalf of someone else (eg your employer), the individual CLA may not be sufficient and your employer may need the corporate CLA signed.

If you have received this in error or have any questions, please contact us at cla@fb.com. Thanks!

@zhenxiao zhenxiao self-requested a review September 18, 2019 05:35
@zhenxiao zhenxiao self-assigned this Sep 18, 2019
@zhenxiao
Copy link
Collaborator

Thank you for the work, @haibow
could you please sign the CLA, and merge the 2 commits into 1?

@highker highker self-requested a review September 18, 2019 18:34
Copy link
Contributor

@highker highker left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overall looks good. Did a very fast skim through only on coding style. Will get into logic shortly.

@@ -0,0 +1,4 @@
connector.name=pinot
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file may not be necessary.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added it here to help testing locally. Is it ok to keep it here?


<properties>
<air.main.basedir>${project.parent.basedir}</air.main.basedir>
<dep.httpcore.version>4.4.6</dep.httpcore.version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • We usually put the dependency declaration + their version numbers in the root pom.xml.
  • There are too many dependencies. I will need to dig deeper into that later on. One quick comment on this is netty has bee included by presto already. Can we reuse it?

presto-pinot/pom.xml Outdated Show resolved Hide resolved
@facebook-github-bot
Copy link
Collaborator

Thank you for signing our Contributor License Agreement. We can now accept your code for this (and any) Facebook open source project. Thanks!

Copy link
Contributor

@highker highker left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First thorough pass. > 95% are minor comments. There are a few essential comments buried among all these nits. Make sure those are address.

@highker highker self-requested a review September 24, 2019 00:00
Copy link
Contributor

@highker highker left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are too many unnecessary dependencies. Let's strip them out slowly. Let's focus on http client and json for this iteration. We can come back to netty in the next round.

*/
package com.facebook.presto.pinot;

import com.alibaba.fastjson.JSONArray;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use jackson (e.g., in JsonFunctions) or smile (e.g., in io.airlift.jaxrs.SmileMapper) library instead of introducing new dependency. (Also, you have already used jackson in this class).


String responseBody = sendHttpGet(url);
JSONObject resp = JSONObject.parseObject(responseBody);
JSONArray routingTableSnapshots = resp.getJSONArray("routingTableSnapshot");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make "routingTableSnapshot" constant

log.debug("Ignoring routingTable for %s", tableNameWithType);
continue;
}
JSONArray routingTableEntriesArray = snapshot.getJSONArray("routingTableEntries");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make "routingTableEntries" constant

String url = String.format(ROUTING_TABLE_API_TEMPLATE, getBrokerHost(tableName), tableName);

String responseBody = sendHttpGet(url);
JSONObject resp = JSONObject.parseObject(responseBody);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit response

{
final String url = String.format(GET_ALL_TABLES_API_TEMPLATE, getControllerUrl());
String responseBody = sendHttpGet(url);
Map<String, List<String>> responseMap = objectMapper.readValue(responseBody, Map.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit new TypeReference<Map<String, List<String>>>() {}

PINOT_FAILURE_GETTING_TABLE(1, EXTERNAL),
PINOT_FAILURE_GETTING_SCHEMA(2, EXTERNAL),
PINOT_FAILURE_QUERYING_DATA(3, EXTERNAL),
PINOT_FAILURE_INITIATING_RESOURCES(4, INTERNAL_ERROR);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not used

requestIdGenerator = new AtomicLong(0);
prestoHostId = getDefaultPrestoId();

final MetricsRegistry registry = new MetricsRegistry();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove final; please exam all other local variables that have final

<dep.httpclient.version>4.5.3</dep.httpclient.version>
<dep.yammer.version>2.2.0</dep.yammer.version>
<dep.fastjson.version>1.1.24</dep.fastjson.version>
<dep.netty.version>3.10.6.Final</dep.netty.version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are too many dependencies. @wenleix @hellium01, could you help taking a look if there is anything we could remove? For example, is it possible to use com.facebook.drift.transport.netty lib? We have explicitly excluded netty-all dep in this file.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of trying to force the specific version module / project wise - the pinot client has to be shaded (see https://github.com/prestodb/presto-cassandra-server, https://github.com/prestodb/presto-cassandra-driver)

return zookeeperUrl;
}

@Config("zookeeper-uri")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

put pinot. at the beginning of all configs. Examples can be found in CassandraClientConfig

<dependency>
<groupId>javax.validation</groupId>
<artifactId>validation-api</artifactId>
<version>${dep.validation-api.version}</version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this line is redundant

Copy link
Contributor

@wenleix wenleix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the contribution!

I agree with @highker there are many dependencies added. Instead of adding many dependencies, (e.g. httpclient, yammer, netty) that might conflicts with other modules.

I am wondering if it's possible to create a project like presto-pinot-client and includes such library in a shaded way? For example, take a look at https://github.com/prestodb/presto-cassandra-server, https://github.com/prestodb/presto-cassandra-driver.

@arhimondr What do you think?

@highker
Copy link
Contributor

highker commented Sep 25, 2019

Like what @wenleix has suggested, there are many shaded libraries in prestodb account. We will need to create another repo to put the shaded pinot. Then in prestodb/presto, we depend on the shaded version.

Copy link
Member

@arhimondr arhimondr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did a quick pass. Please address and let me have another look

<groupId>org.apache.pinot</groupId>
<artifactId>pinot-api</artifactId>
<version>${dep.pinot.version}</version>
<exclusions>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As @wenleix mentioned, instead of having all the exclusions here - it would be nice to have a separate projects that does the exclusions / shadings for the conflicting library (similarly to https://github.com/prestodb/presto-cassandra-server, https://github.com/prestodb/presto-cassandra-driver)

<dep.httpclient.version>4.5.3</dep.httpclient.version>
<dep.yammer.version>2.2.0</dep.yammer.version>
<dep.fastjson.version>1.1.24</dep.fastjson.version>
<dep.netty.version>3.10.6.Final</dep.netty.version>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of trying to force the specific version module / project wise - the pinot client has to be shaded (see https://github.com/prestodb/presto-cassandra-server, https://github.com/prestodb/presto-cassandra-driver)

<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't exclude this library. The other library that conflicts with this must be excluded.

<artifactId>javax.servlet-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These dependencies are no longer test. Should these be moved to the top?

private static final String TABLE_SCHEMA_API_TEMPLATE = "http://%s/tables/%s/schema";
private static final String ROUTING_TABLE_API_TEMPLATE = "http://%s/debug/routingTable/%s";
private static final String TIME_BOUNDARY_API_TEMPLATE = "http://%s/debug/timeBoundary/%s";
private static final CloseableHttpClient HTTP_CLIENT = HttpClients.createDefault();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't initialize it as a static thing. Please use injection


public class TestPinotQueryBuilder
{
private static List<PinotColumnHandle> columnHandles = Arrays.asList(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ImmutableList.of()

public void testGetPinotQuerySelectAll()
{
String expectedQuery = "SELECT varchar, int, secondsSinceEpoch, boolean, double FROM table LIMIT 10";
Assert.assertEquals(expectedQuery, getPinotQuery(new PinotConfig(), columnHandles, "", "", "table", 10));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

static import


public class TestPinotSplitManager
{
private static final PinotSplitManager pinotSplitManager = new PinotSplitManager(new PinotConnection(new PinotClusterInfoFetcher(new PinotConfig())));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

constants should be spelled like PINOT_SPLIT_MANAGER, etc ..


public class TestPinotSplitManager
{
private static final PinotSplitManager pinotSplitManager = new PinotSplitManager(new PinotConnection(new PinotClusterInfoFetcher(new PinotConfig())));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However please avoid having static fields. This is a memory leak. Pleas use instance filed + BeforeClass + AfterClass

TupleDomain<ColumnHandle> constraintSummary = TupleDomain.withColumnDomains(domainMap);

String expectedFilter = "((city_id < 10)) AND (country_name IN (\"cn\",\"us\"))";
Assert.assertEquals(expectedFilter, pinotSplitManager.getPinotPredicate(constraintSummary));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

static imports everywhere

@haibow
Copy link
Author

haibow commented Sep 26, 2019

Thanks for the comments @highker @wenleix @arhimondr . I will get back on this a bit later.

@highker
Copy link
Contributor

highker commented Oct 19, 2019

superseded by #13504

@highker highker closed this Oct 19, 2019
agrawaldevesh pushed a commit to agrawaldevesh/presto that referenced this pull request Oct 24, 2019
This PR introduces the presto-pinot connector. It subsumes the earlier effort in PR prestodb#13413.

This presto-pinot connector works as follows:
* It builds upon the new plan connector optimizer framework to push the maximal query to the pinot system
* By default, aggregation and limit queries are send to the pinot broker, while just filter queries are send to the servers parallelly. This is useful when joining against another (say hive) table wherein the join needs to be parallelized and segments from pinot (fetched directly from the pinot servers) need to be joined with hive splits. If you wish to not send any queries to the broker, please set the catalog session property prefer_broker_queries to false.
* You can disable parallel queries (useful for low latency deployment) we do at Uber, by setting the catalog property forbid_segmnt_queries=false.
* The connector needs a way to talk to the controller (to fetch the brokers and the table schema). You can either provide the controller URI or the URI of the Muttley proxy (an RPC proxy mechanism we use at Uber). It is also possible to use the pinot-rest-proxy instead of going to the pinot-broker (again something we use for our test pinot clusters at Uber).

The connector is organized into two submodules: presto-pinot-lib and presto-pinot-plugin. We chose this multi module organization because at Uber we have another internal connector that federates over presto-pinot-lib and presto-aresdb-lib (another low latency database like pinot). This is needed because each plugin is in its own classpath otherwise. This implementation detail is hidden from users of the pinot plugin.

In addition the pinot connector would split the filter expressions such that expressions that can be pushed down to pinot are done so, and those that aren't are handled in presto.

It handles the full range of pinot PQL and respects pinot's weirdnesses to guarantee that all results are still in the expected SQL semantics.

Usage:
Place the catalog file like so in the etc/catalog directory. The schema name is immaterial and it can be anything. (since pinot does not have a notion of schema). Example catalog files can be found below:

```
deveshagrawal-C02VP027J9J5:presto deveshagrawal$ cat local_run/etc/catalog/pinotnotparallel.properties
connector.name=pinot
pinot.controller-url=controller_host:9000
pinot.scan-parallelism-enabled=false

deveshagrawal-C02VP027J9J5:presto deveshagrawal$ cat local_run/etc/catalog/pinotparallel.properties
connector.name=pinot
pinot.controller-url=controller_host:9000
pinot.scan-parallelism-enabled=true
```

Now given a file like this, you can query like so:
```
select date_trunc('hour', from_unixtime(time_column)), count(1) from pinotnotparallel.whatever.pinot_table_name where col = 5 group by 1 order by 2 desc
```
or
```
select * from pinotnotparallel.whatever.pinot_table_name where col = 5 limit 10
```
or a join with a hive dimension table:
```select hive_table.city_name, count(1) from hive.warehouse.dim_table hive_table join pinotparallel.whatever.pinot_table_name pinot.city_id = hive_table.city_id and pinot.ts between cast(to_unixtime(now() - interval '1' hour) as bigint) and cast(to_unixtime(now()) as bigint) pinot.col = 5 group by 1 order by 2 desc limit 10
```

To keep the presto side dependancies clean, the pinot dependancies were shaded and packaged into a separate driver artifact released from https://github.com/prestodb/presto-pinot-driver.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants