Join GitHub today
GitHub is home to over 28 million developers working together to host and review code, manage projects, and build software together.Sign up
High Level Architecture
Goal of Pinot is to provide analytics on any given data set. The input data set may exists either in Hadoop or Kafka. At LinkedIn, most tracking data is published into Kafka and it eventually moves to Hadoop via ETL process. In order to provides fast analytics, Pinot organizes the data into columnar format and make use of various indexing technologies such as bitmap, inverted index etc. Data on Hadoop is converted into Index Segment via Map reduce jobs. Index segments are then pushed to Pinot cluster. Pinot servers load these index segments and serve user queries. The freshness of data depends on the frequency at which Map Reduce jobs are run on Hadoop. This typically takes few hours. Most applications are not sensitive to the freshness of the data and live with few hours to a days lag. However, some use cases such as Who Viewed My Profile(WVMP) require analytics in near real time. In order to support such use cases, Pinot has the ability to directly consume from Kafka and generate index segments on the fly.
Input data on HDFS can be in any format such as AVRO, CSV, JSON etc. Pinot supports query within a single table schema. If the fields to be queried are distributed across multiple data sets, clients have to join the data sets into one prior to creating Pinot Index Segments. Pinot Index Segments will be created on the joined dataset. After the indexes are generated on Hadoop, we push the data to the online serving cluster. Pinot Server nodes load the indexes and are ready to serve queries. Index segments are automatically deleted after a pre-configured retention period.
Real-time flow is slightly different from the Hadoop flow. Real-time nodes directly listen to Kafka stream and generate the index segments in memory and flush them to disk periodically. Queries go over the in-flight data in memory as well the segments persisted to disk. The real-time nodes are configured with a much shorter retention period such as 3 days. The idea is that data from Kafka is pushed to Hadoop where the Index Segments are generated and pushed to Historical nodes before the retention period.
From a user point of view, all queries are sent at Pinot Broker. The user does not have to worry about the real time and historical nodes. Pinot Broker is smart enough to query realtime and historical nodes separately and merge the results before sending back the response. For example, let's say user queries select count(*) from table where time > T time stamp, Pinot understands that time is a special dimension and breaks the query appropriately between realtime and historical. If the historical nodes have data until the previous day(T1), then broker generates two separate queries #1 select count(*) from table where time > T and time < T1 #2**.** select count(*) from table where time > T1. Broker sends query #1 to Historical Node and #2 to Realtime Node.The reason for selecting the historical cluster for most of the days is to get better query latency. The data in Hadoop is generally aggregated on a daily basis where as in real time cluster it is aggregated only for a given minute/hour range. Hadoop allows us to apply sophisticated optimization techniques during segment generation which improves the query performance.
Pinot Components Architecture
Pinot index segments are created on Hadoop. Clients provide the input data to Pinot segment creation job. Pinot expects the input data to be pre-joined as needed. Typically users run joins/decoration jobs as part of this step. The data can be of any format such as AVRO, CSV, JSON.
Segment creation on Hadoop
Pinot team provides the library for generating the segments. Pinot expects the input data to be stored in HDFS. Data on HDFS is divided into splits of 256/512 MB splits. Pinot Index Creation Job will launch one mapper for each split and generate the new segments. In some cases, irrespective of the number of splits and data size, clients require fixed number of segments generated on a daily basis. This can be achieved by partitioning the data based on a leading key and using a fixed number of partitions. For additional information on how to generate Pinot Index Segments see here [Offline Index Creation] (Offline-Index-Creation)
Segment move from HDFS to NFS
After Pinot segments are generated on Hadoop, they need to be transferred over to online serving cluster. This is done via Hadoop server push job provided by Pinot. This job is non map reduce java job that runs on the Hadoop gateway machine via azkaban. It reads the data file from HDFS and performs a send file HTTP POST on one of the endpoints exposed by the Pinot Controllers. The files are then saved to NFS directories mounted on the controller nodes.
After saving the segment to NFS, controller assigns the segment to one of the pinot servers. The assignment of a segment to Pinot server is maintained and managed by Helix.
Segment move from NFS to Historical Node
The segment to Pinot server assignment is stored in Helix Idealstate. Helix monitors the liveness of a server, when the server starts up, Helix will notify the pinot server about this segment. The metadata about this segment contains the URI to fetch the segment and is stored in Helix Property Store backed by Zookeeper. Pinot server downloads the segment file from the controller and extracts its content on the local disk.
The untarred segments consist of metadata along with forward index and inverted index(if enabled) for each column. Based on the load mode (memory, mmap) the index segment is either loaded into memory or mmapped by the server. This happens as part of OFFLINE-ONLINE transition triggered through Helix. Once the transition is completed successfully, the broker is notified via Helix that it is ready to be served. Pinot Broker will start routing queries to the server hosting the new segment.
Real time Node
Tracking data is emitted into Kafka. Data is divided into partitions which are in turn distributed among multiple kafka brokers. When a resource is created in Pinot, we assign a set of pinot instances to start consuming the data from a specific kafka topic. In the initial version, we are using high level kafka consumer to distribute the consumption across various pinot instances. If a Pinot server fails, the consumption is redistributed among the remaining nodes. This rebalancing is handled within the high level kafka consumer group. For reliability, Pinot can run more than one consumer groups per kafka topic.
Once pinot server consumes a pre configured number of events (e.g. 5 million), the in memory data is converted into an offline segment (similar to the hadoop job). Once the segment is successfully generated, we commit the offset in Kafka. If there is a failure, we always restart consumption from the last checkpoint. The format of the segment generated is the same as the format generated by the hadoop jobs. The reason for doing this is – the newly generated segment can be re-assigned to a offline node. This will become handy once we have the indexing service and we reduce the dependency on Hadoop Pipeline on a day to day basis.
Realtime segments also have retention configured according to the use case requirement. Unlike the offline segments where the retention can be in months, realtime segments retention can only be in days (typically 3 days). The reason is that real time segments created by consuming data from Kafka cannot have all optimizations that we can apply on Hadoop. Moreover, data from Kafka will end up on Hadoop and daily segments can be re-generated on Hadoop. This allows us to apply advanced optimization techniques during segment creation such as better compression techniques, sorting data according to leading key etc. Another thing to note is that Kafka does not guarantee exactly once delivery. This might result in temporary loss of accuracy(mostly unnoticeable) in results. But having the Hadoop allows us to fix such issues.
Pinot Cluster Management
All Pinot Servers and Brokers are managed by Apache Helix. Apache Helix is a generic cluster management framework to manage partitions and replicas in a distributed system. See helix.apache.org for additional information.
Helix divides nodes into 3 logical components based on their responsibilities:
- Participant: The nodes that actually host the distributed resources
- Spectator: The nodes that simply observe the current state of each Participant and routes requests accordingly. Routers, for example, need to know the instance on which a partition is hosted and its state in order to route the request to the appropriate endpoint
- Controller: The node that observes and controls the Participant nodes. It is responsible for coordinating all transitions in the cluster and ensuring that state constraints are satisfied while maintaining cluster stability
Pinot Terminology and its mapping to Helix Concepts. See Pinot Core Concepts and Terminology
- Pinot Segment: This is modeled as Helix Partition. Each Pinot Segment can have multiple copies referred to as Replicas.
- Pinot Table: Multiple Pinot segment are grouped into a logical entity referred to as Pinot Table. All segments belonging to a Pinot Table have the same schema.
- Pinot Server: This is modeled as a Helix Participant. Pinot Server host the segments (Helix Partition) belonging to one or more Pinot Table (Helix Resource).
- Pinot Broker: This is modeled as a Helix Spectator that observes the cluster for changes in the state of segments and Pinot Server. In order support Multi tenancy in Pinot Brokers, Pinot Brokers are also modeled as Helix Participants.
Zookeeper: Zookeeper is used to store the state of the cluster. Its also used to store configuration needed for Helix and Pinot. Only dynamic configuration that is specific to a use case such as table schema, number of segments and other metadata is stored in Zookeeper. Zookeeper is also used by Helix Controller to communicate with the PARTICIPANT and SPECTATORS. Zookeeper is strongly consistent and fault tolerant. We typically run 3 or5 Zookeeper in production. Only one Zookeeper ensemble is shared across all Pinot Clusters.
**Pinot Controller: **All admin commands such as creating allocating Pinot Server and Brokers for each use case, Creating New Table or Uploading New Segments go through Pinot Controller. Pinot Controller wraps Helix Controller within the same process. All Pinot Admin Commands internally get translated to Helix Commands via Helix Admin Apis.
- Allocate Pinot Server/Broker: This commands is run when we on board new use case or to allocate additional capacity to an existing use case. This parameter simply takes in the #1 use case name X #2. number of Pinot Servers S and number of Brokers B needed for the use case. Pinot Controllers uses Helix Tagging Api to tag S Server Instances and B Brokers Instances in the cluster as X. This means all subsequent tables that belong to use case X will be assigned to same set of S Pinot Instances. The B brokers will get a Helix State Transition indicating they have been allocated to Server all queries for use case X. All queries for use case X, will go through these brokers.
- Create Table: This will create an Empty IdealState for a Table. This table must also be tagged as X which means all segments of this table will be allocated to Instances that have the same tag X. Additional metadata such as table retention, allocation strategy etc is stored in Zookeeper using Helix Property Store Api.
- Upload Segment: Pinot Controller adds an segment entry to the table IdealState. The number of entries added is according the number of replicas configured for the Table T. While its possible to let Helix decide the assignment of Segment to Pinot Server Instance by using AUTO Idealstate mode, in the current version we use the CUSTOM Idealstate mode. See Helix Idealstate Mode for additional information. Pinot Controllers has its own assignment strategy. The default assignment strategy is to assign the segments and its replicas to Servers that have least number of Segments allocated. It also ensures that replicas are not assigned to the same node.
Helix Controller: As explained in previous section all Pinot Admin commands simply get translated into Helix Admin Commands. The Helix commands in turn update the metadata stored in Zookeeper. Helix Controller acts as the brain of the system and translates all metadata changes into a set of actions and is responsible for the execution of these actions on the respective participants. This is achieved via State Transitions. See Helix Architecture for additional info. Helix Controller is also responsible for monitoring the Pinot Servers. If a Pinot Server start up or goes down, Helix Controller detects it and updates the external view accordingly. Pinot Broker observes these changes and updates the routing table dynamically (this feature is provided by Helix library). All queries are routed as per this routing table.
See Multi tenancy in Pinot 2.0 for more info on how Multi tenancy is solved in Pinot 2.0.
The responsibility of Broker is to route a given query to appropriate Pinot Server instances, collect the responses and merge the responses into final result and send it back to the client. The two main steps involved in this are
service discovery: Service discovery is the mechanism of knowing what Tables are hosted in the cluster and location of the Table Segments and the time range for each segment. As explained in the previous section, this is derived from the information stored in Zookeeper via Helix library. Broker uses this information not only to compute the subset of the nodes to send the request to, it prunes the number of segments to be queries. This is achieved by looking at the time range in the query and applying that range to filter the segments based on the time range value in its metadata. Since there are Multiple replicas hosting the same segments, Broker has the flexibility to chose any of the segments to route the query. Pinot has implemented multiple strategies to pick the servers. Few possible approaches are #1. uniformly distribute the segments across the nodes. #2. Greedy algorithm: Maximize/minimize the number of servers the queries gets router to #3. Random selection of server for every segment. While #1 and #2 optimize for a given query, they may perform poorly(uneven load on servers) across all queries. In other words, local optima does necessarily mean global optima. Current default algorithm use #3 i.e Random selection of server for every segment.
Scatter gather: Once the broker computes the set of nodes to route the query, the requests are routed to the respective Pinot Server nodes. Each server nodes processes the query and returns the response, broker will merge the responses from individual Server and returns the response to the client. The merging logic is dependent on the query selection with Limit, aggregation, group by top K etc. If any of the servers fails to process the query or Time's out, broker will return partial result. We might support additional modes in future, where the broker can either re-try only the failed queries. Another approach is to always send the query to multiple servers and use the result from the one that comes back the fastest.
Pinot Index Segment
Pinot Index Segment is the columnar representation of the raw data. The raw data is generally represented in a row oriented format which can be AVRO, JSON, CSV etc. Converting row oriented format into columnar can reduce storage space and allow fast scan for specific columns. Row oriented format is efficient when query is either updating or reading a specific row in the data. This is typically the case with OLTP use cases where relational databases such as Oracle, MySQL etc is used. Columnar databases on the other hand is efficient and faster when query needs to go over many rows to produce the result, which is typical in Analytics use cases. The difference is speed is magnified when number of rows goes into thousands and millions.
Columnar format also provides storage related benefits. Some of the columns contains values that are repetitive. For e.g. if the column is of type country then storing in row oriented format will require space proportional varchar(100) * number of rows. Columnar format can apply various encoding such as Fixed Bit and on top of that, apply compression algorithm to compress the data further. While these techniques can be applied for row oriented storage as well, the encoding and compression is much more effective when stored in columnar format. Some techniques such as run length encoding can be easily applied in columnar databases formats. For example, lets say all values for a column in a segment are the same, then all we need to store is the value and the number of times it occurs (RLE encoding). Row oriented databases cannot capitalize on such data patterns.
Columnar formats definitely have their down sides, creating efficient columnar formats typically take time and once created they cannot be mutated easily. While this might a problem for OLTP workloads, typical OLAP use cases consists of TimeSeries data which is immutable.
Anatomy of Index Segment
Pinot splits the entire data into multiple segments for manageability and efficiency. See Pinot Core Concepts and Terminology for more info. As described in previous section, raw data is split into columnar data. This section describes the columnar data format for each of the possible data types.
Segment Metadata (metadata.properties)
This file contains metadata about the segment such as
|Name of the segment e.g cars_daily_2015-04-11_2015-04-11_1|
|segment.resource.name||Name of the resource e.g. cars|
|segment.table.name||Name of the table|
|segment.dimension.column.names||Dimension column Names e.g. Model, tags|
|segment.metric.column.names||Metric column names e.g. as price, count|
|segment.time.column.name||Column that represents time, e.g.Year|
|segment.time.interval||Time interval of the segment. Can be hourly, daily, weekly, monthly|
|segment.start.time||min time value in the data corresponding to this segment.|
|segment.end.time||min time value in the data corresponding to this segment.|
|segment.time.unit||unit of time for start and end time|
Some of the values in the metadata are used during query processing. For example, a segment can be pruned if the query time range does not over lap with segment min-max time range.
Apart from the above segment related metadata, we also store metadata for each column. The following describes the properties stored on a per column basis.
|Column Property Name||Description|
|Number of unique values for this column in this segment|
|column..totalDocs||Number of documents in the segment|
|column..dataType||Data type of this column, INT, FLOAT,STRING etc|
|column..bitsPerElement||If dictionary encoding is applied, how many bits are needed to store each value|
|column..lengthOfEachEntry||If the column type is String, this indicates the max length of character needed to store this value. Similar to varchar(100).|
|column..columnType||Type of the column Dimension, Metric, Time|
|column..isSorted||Do the values for this column appear in sorted order in the segment|
|column..hasNullValue||Can there be a null value for this column|
|column..hasDictionary||Was a dictionary used to encode the data|
|column..hasInvertedIndex||Does this column have inverted index|
|column..isSingleValues||Is this column single valued or multi valued|
|column..maxNumberOfMultiValues||Applicable to Multi Value column.Max number of values per document|
|column..totalNumberOfEntries||Applicable to Multi Value column. Total number of entries across all documents in the segment.|
Creation Metadata (creation.meta)
Dictionary is used to encode the values in the column. Applying dictionary encoding can significantly reduce the data size. This is especially the case when the cardinality of the column is low (up to thousands). One option is to always used integer data type to encode the values. But in some cases the number of unique values are few and hence we use fixed bit encoding. For example, if the number of unique values is 3, we just need 2 bits to represent the actual value. Dictionary file stores the mapping between dictionary id and actual value.
While dictionary encoding saves space, it introduces additional look up cost during query processing. We need convert the dictionary id back to actual value at run time. This is known to cause significant over head when the number of looks up required is high.Look up cannot be hash map since hash map end up requiring additional memory. Linear scan on the dictionary to perform a look up will be very slow, one simple optimization is to sort the values and perform a binary search for the look up. Due to these properties of dictionary, we cannot blindly apply dictionary encoding on all columns. Having said that, we do apply dictionary for all columns in the current version on Pinot 2.0, however in future we will apply dictionary encoding when the cardinality is <10k and only for dimension columns.
Dictionary does allow us to speed up the query processing, since dictionary allows us to know all the unique values for a column in a segment. We can skip the processing of a segment during predicate evaluation if the RHS of the predicate is absent in the dictionary.
Currently, we generate one dictionary per segment, in future, we will explore the idea of maintaining a global dictionary across all segments. This will further reduce the dictionary over head and allows us convert look ups on dictionary into hash map look ups.
Forward Index (.sv.sorted.fwd)
Single Value Sorted Forward Index (.sv.sorted.fwd)
Single Value unsorted Forward Index (.sv.unsorted.fwd)
If the values in a column are not sorted, we have the following possible optimizations
- Dictionary encoding if feasible. See previous section on when we apply dictionary encoding
- Snappy or LZO or LZ4 or ZLIB compression
In the current version of Pinot, we only apply dictionary encoding that allows us to compress the data using Fixed bit encoding. In subsequent versions, we will evaluate other compression techniques such as snappy etc. While these compression techniques save space, there is additional over head to decompress them on the fly. The challenge here is to get the right trade-off between compressing the data and query latency
Multi Value Forward Index (.mv.fwd)
In some cases, the columns are multi-valued such as skill sets of a member. While dictionary encoding can be applied to multi value columns, forward index is as straight forward as in single value use case since the number of values per column can be arbitrary. The challenge here is to retrieve the values corresponding to a given document id without scanning the entire data. In order to achieve that, we create an additional Header section that stores the start, end offset for each document followed by a data section where the encoded values are stored in a linear fashion. This is described in the picture below. The header section needs additional storage space. If the over head becomes significant, we resort to skip list based implementation which is similar to the previous idea but instead of maintaining start and end offset for every doc id, we maintain the start, end offset for every N documents (chunk). Retrieving a value requires us to first locate the chunk and then perform a linear scan within the chunk to locate the values. An additional bitmap is maintained to mark the start offset and end offset for doc id.
Query Execution Phases
- Query Parsing: Pinot supports a slightly modified version of SQL which we refer to as PQL. PQL only supports a subset of SQL for example Pinot does not support Joins, nested sub queries etc. We use Antlr to parse the query into a parse tree. In this phase, all syntax validations are performed and default values are set for missing elements.
- Logical Plan Phase: This phase takes in query parse tree and outputs a Logical Plan Tree. This phase is single threaded and is simple and constructs the appropriate logical plan operator tree based on the query type (selection, aggregation, group by etc) and metadata provided by the data source.
- Physical Plan Phase: This phase further optimizes the plan based on individual segment. The optimization applied in this phase can be different across various segments.
- Executor Service: Once we have per segment physical operator tree, executor service takes up the responsibility of scheduling the query processing tasks on each and every segment.
For additional details on Query processing see Pinot Query Execution