## Segmentation in Pinot

Table contents in Pinot are expected to grow infinitely and thus need to be distributed across multiple nodes. The dataset is split into segments, which are comparable to shards/partitions in classical RDBMS. Segmentation is done in a time-based fashion, meaning that rows in a given segment will be timewisely close to each other.
Segments store all columns of a table and organize data in columnar orientation for high encoding efficiency and optional pre-aggregation of metrics. In addition to values, segments store indices and other lookup-related data structures like dictionaries. By default values are stored using dictionary encoding, meaning that values are represented as dictionary IDs that reference a corresponding dictionary entry. This way, values can be stored with the minimum number of bits required, which depends on the cardinality of the column. 

In [None]:
# all imports
import requests
import json
import io
import re
import os
import shutil
import fileinput
import tarfile
import pandas as pd
from kafka import KafkaConsumer

In [None]:
# some helpers
def server_name_from_instance(instance):
    return re.search('pinot-server-[0-9]+', instance).group()

def query_sql(query):
    print("query: " + query)
    return requests.get('http://pinot-broker.pinot:8099/query/sql', params={
        "sql" : query,
        "trace": "true"
    }).json()

def query_result_to_dataframe(result):
    return pd.DataFrame(columns=result['resultTable']['dataSchema']['columnNames'], data=result['resultTable']['rows'])

def extract_query_statistics_from_result(result):
    query_statistics_fields = ["numServersQueried","numServersResponded","numSegmentsQueried","numSegmentsProcessed","numSegmentsMatched","numConsumingSegmentsQueried","numDocsScanned","numEntriesScannedInFilter","numEntriesScannedPostFilter","numGroupsLimitReached","totalDocs","timeUsedMs"]
    return { key: result[key] for key in query_statistics_fields }

def extract_query_statistics_from_result_dataframe(result):
    return pd.DataFrame({"value": extract_query_statistics_from_result(result)})

ordinal_pattern = re.compile(r'__[0-9]+__([0-9]+)__')
def sort_by_ascending_ordinal(segments):
    segments.sort(key=lambda L: (int(ordinal_pattern.search(L).group(1)), L))

def segment_metadata_for_table(table):
    segments = requests.get(f'http://pinot-controller.pinot:9000/segments/{table}').json()
    
    segment_metadata = {}
    for segments_item in segments:
        for table_type, type_segments in segments_item.items():
            for segment in type_segments:
                segment_type_name = f"{segment}_{table_type}"
                segment_metadata[segment_type_name] = requests.get(f'http://pinot-controller.pinot:9000/segments/{table}/{segment}/metadata').json()
    
    return segment_metadata

def segment_metadata_of_nth_segment(segment_metadata, n, table_type="REALTIME"):
    segments_of_type = []
    for segment in segment_metadata.keys():
        if segment.endswith("_" + table_type):
            segments_of_type.append(segment)
    
    sort_by_ascending_ordinal(segments_of_type)
    return segment_metadata[segments_of_type[n]]


def start_time_of_nth_segment(segment_metadata, n, table_type="REALTIME"):
    return segment_metadata_of_nth_segment(segment_metadata, n, table_type)["segment.start.time"]

In [None]:
# consumer = KafkaConsumer(group_id='test', bootstrap_servers=['pinot-kafka.pinot:9092'])
# consumer.topics()

In [None]:
# requests.get('http://pinot-controller.pinot:9000/schemas/trips').json()

In [None]:
table_config = {
  "tableName": "",
  "tableType": "REALTIME",
  "segmentsConfig": {
    "timeColumnName": "trip_start_time_millis",
    "timeType": "MILLISECONDS",
    "retentionTimeUnit": "DAYS",
    "retentionTimeValue": "60",
    "schemaName": "trips",
    "replication": "1",
    "replicasPerPartition": "1"
  },
  "tenants": {},
  "tableIndexConfig": {
    "loadMode": "MMAP",
    "invertedIndexColumns": [
        "rider_name",
        "driver_name",
        "start_location",
        "end_location"
    ],
    "streamConfigs": {
      "streamType": "kafka",
      "stream.kafka.consumer.type": "simple",
      "stream.kafka.topic.name": "trips",
      "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
      "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
      "stream.kafka.zk.broker.url": "pinot-kafka-zookeeper:2181",
      "stream.kafka.broker.list": "pinot-kafka:9092",
      "realtime.segment.flush.threshold.time": "12h",
      "realtime.segment.flush.threshold.size": "80000",
      "stream.kafka.consumer.prop.auto.offset.reset": "smallest"
    }
  },
  "metadata": {
    "customConfigs": {}
  }
}

table_config["tableName"] = "trips_segmentation_1"
print(requests.post('http://pinot-controller.pinot:9000/tables', json=table_config).json())

table_config["tableName"] = "trips_segmentation_2"
table_config["segmentsConfig"]["replication"] = "3"
table_config["segmentsConfig"]["replicasPerPartition"] = "3"
table_config["tableIndexConfig"]["streamConfigs"]["realtime.segment.flush.threshold.size"] = "50000"
print(requests.post('http://pinot-controller.pinot:9000/tables', json=table_config).json())

In [None]:
# response = requests.get('http://pinot-controller.pinot:9000/segments/trips_segmentation_1').json()
# segments_1 = response[0]['REALTIME']
# sort_by_ascending_ordinal(segments_1)
# pd.DataFrame(segments_1, columns=['trips_segmentation_1'])

In [None]:
# response = requests.get('http://pinot-controller.pinot:9000/segments/trips_segmentation_2').json()
# segments_2 = response[0]['REALTIME']
# sort_by_ascending_ordinal(segments_2)
# pd.DataFrame(segments_2, columns=['trips_segmentation_2'])

In [None]:
segment_metadata_1 = segment_metadata_for_table("trips_segmentation_1")
pd.DataFrame(segment_metadata_1)

In [None]:
segment_metadata_2 = segment_metadata_for_table("trips_segmentation_2")
pd.DataFrame(segment_metadata_2)

In [None]:
# get data from first 2 segments
query_for_trips_segmentation_1 = f"select driver_name, sum(count) as trips_count from trips_segmentation_1 where trip_start_time_millis < {start_time_of_nth_segment(segment_metadata_1, 1)} group by driver_name order by trips_count desc limit 5"

query_result = query_sql(query_for_trips_segmentation_1)
display(query_result_to_dataframe(query_result))
display(extract_query_statistics_from_result_dataframe(query_result))

In [None]:
# get data from first 3 segments
query_for_trips_segmentation_2 = f"select driver_name, sum(count) as trips_count from trips_segmentation_2 where trip_start_time_millis < {start_time_of_nth_segment(segment_metadata_2, 2)} group by driver_name order by trips_count desc limit 5"

query_result = query_sql(query_for_trips_segmentation_2)
display(query_result_to_dataframe(query_result))
display(extract_query_statistics_from_result_dataframe(query_result))

## Query Routing / Processing

Brokers are responsible for maintaining routing tables, which contain mappings between segments of a table and servers where they are hosted on. This allows brokers to efficiently scatter received queries across servers.

In [None]:
# some helpers
def routing_table_for_query(query):
    print("query: " + query)
    return requests.get('http://pinot-broker.pinot:8099/debug/routingTable/sql', params={
        "query" : query
    }).json()

def routing_table_for_table(table):
    return requests.get(f'http://pinot-broker.pinot:8099/debug/routingTable/{table}').json()

def external_view_for_table(table):
    return requests.get(f'http://pinot-controller.pinot:9000/tables/{table}/externalview').json()

def routing_table_for_query_dataframe(query):
    rt = routing_table_for_query(query)
    rt_data = {}

    for server, server_segments in rt.items():
        server_name = server_name_from_instance(server)
        for s in server_segments:
            rt_data[s] = server_name

    rt_data_list = []
    for segment, server in rt_data.items():
        rt_data_list.append({"segment": segment, "server": server})

    rt_data_list.sort(key=lambda L: (int(ordinal_pattern.search(L["segment"]).group(1)), L))
    return pd.DataFrame(rt_data_list)

def routing_table_for_table_dataframe(table):
    rt = routing_table_for_table(table)
    rt_data = {}

    for table_name_type, table_rt in rt.items():
        table_type = re.search('REALTIME|OFFLINE', table_name_type).group()
        for server, server_segments in table_rt.items():
            server_name = server_name_from_instance(server)
            for s in server_segments:
                try:
                    rt_data[s][table_type] = server_name
                except KeyError:
                    rt_data[s] = {table_type: server_name}

    rt_data_list = []
    for segment, type_server in rt_data.items():
        segment_data = {"segment": segment}
        for table_type, server in type_server.items():
            segment_data[table_type] = server
        rt_data_list.append(segment_data)

    rt_data_list.sort(key=lambda L: (int(ordinal_pattern.search(L["segment"]).group(1)), L))
    return pd.DataFrame(rt_data_list)

def external_view_for_table_dataframe(table):
    ev = external_view_for_table(table)
    ev_data = {}

    for table_type, ev_per_type in ev.items():
        if ev_per_type == None:
            continue
        
        for segment, segment_servers in ev_per_type.items():
            if not segment in ev_data:
                ev_data[segment] = {}
            for server, state in segment_servers.items():
                server_name = server_name_from_instance(server)
                try:
                    ev_data[segment][table_type].append(server_name)
                except KeyError:
                    ev_data[segment][table_type] = [server_name]

    return pd.DataFrame(ev_data).transpose()

In [None]:
external_view_for_table_dataframe("trips_segmentation_1")

In [None]:
external_view_for_table_dataframe("trips_segmentation_2")

In [None]:
routing_table_for_query_dataframe(query_for_trips_segmentation_1.replace("trips_segmentation_1", "trips_segmentation_1_REALTIME"))

In [None]:
routing_table_for_query_dataframe(query_for_trips_segmentation_2.replace("trips_segmentation_2", "trips_segmentation_2_REALTIME"))

# Batch ingestion and Hybrid Tables

Segments are transferred as tar archives and can be downloaded from the controller.

In [None]:
table_config = {
  "tableName": "trips_hybrid",
  "tableType": "OFFLINE",
  "segmentsConfig": {
    "timeColumnName": "trip_start_time_millis",
    "timeType": "MILLISECONDS",
    "retentionTimeUnit": "DAYS",
    "retentionTimeValue": "60",
    "schemaName": "trips",
    "replication": "1"
  },
  "tenants": {},
  "tableIndexConfig": {
    "loadMode": "MMAP",
    "invertedIndexColumns": [
        "rider_name",
        "driver_name",
        "start_location",
        "end_location"
    ]
  },
  "metadata": {
    "customConfigs": {}
  }
}

# create offline table
print(requests.post('http://pinot-controller.pinot:9000/tables', json=table_config).json())

# create realtime table
table_config["tableType"] = "REALTIME"
table_config["segmentsConfig"]["replicasPerPartition"] = "1"
table_config["tableIndexConfig"]["streamConfigs"] = {
  "streamType": "kafka",
  "stream.kafka.consumer.type": "simple",
  "stream.kafka.topic.name": "trips",
  "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
  "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
  "stream.kafka.zk.broker.url": "pinot-kafka-zookeeper:2181",
  "stream.kafka.broker.list": "pinot-kafka:9092",
  "realtime.segment.flush.threshold.time": "12h",
  "realtime.segment.flush.threshold.size": "50000",
  "stream.kafka.consumer.prop.auto.offset.reset": "smallest"
}
print(requests.post('http://pinot-controller.pinot:9000/tables', json=table_config).json())

In [None]:
external_view_for_table_dataframe("trips_hybrid")

In [None]:
# helpers for transforming realtime segments to offline segment
tmp_hybrid_basedir = "/tmp/trips_hybrid"
try:
    os.mkdir(tmp_hybrid_basedir)
except FileExistsError:
    pass

def path_for_realtime_tar(segment_name):
    return f"{tmp_hybrid_basedir}/{segment_name}.tar.gz"

def path_for_offline_dir(segment_name):
    return f"{tmp_hybrid_basedir}/{segment_name}_offline"

def path_for_offline_tar(segment_name):
    return f"{tmp_hybrid_basedir}/{segment_name}_offline.tar.gz"

def download_segment(segment_metadata):
    segment_name = segment_metadata["segment.name"]
    download_url = segment_metadata["segment.realtime.download.url"]
    segment_realtime_tar = path_for_realtime_tar(segment_name)

    # cleanup old downloads
    try:
        os.remove(segment_realtime_tar)
    except OSError:
        pass

    # download realtime segment tar
    response = requests.get(download_url, stream=True)
    with open(segment_realtime_tar, 'wb') as out_file:
        shutil.copyfileobj(response.raw, out_file)
    del response
    
    print(f"segment {segment_name} downloaded from {download_url} to {segment_realtime_tar}")
    return segment_realtime_tar

def untar_segment(segment_metadata):
    segment_name = segment_metadata["segment.name"]
    segment_offline_basedir = path_for_offline_dir(segment_name)
    segment_realtime_tar = path_for_realtime_tar(segment_name)

    # cleanup old artifacts if any
    shutil.rmtree(segment_offline_basedir, ignore_errors=True)

    # extract downloaded segment tar
    with tarfile.open(segment_realtime_tar, 'r:gz') as tar:
        tar.extractall(path=segment_offline_basedir)

    print(f"segment {segment_name} untarred to {segment_offline_basedir}")
    return segment_offline_basedir

def transform_segment(segment_metadata):
    realtime_table_name = segment_metadata["segment.table.name"]
    offline_table_name = realtime_table_name.replace("REALTIME", "OFFLINE")
    segment_name = segment_metadata["segment.name"]
    segment_offline_basedir = path_for_offline_dir(segment_name)
    
    # modify metadata.properties of segment
    segment_offline_dir = segment_offline_basedir + "/" + segment_name
    metadata_file = segment_offline_dir + "/v3/metadata.properties"
    metadata_contents = None
    with open(metadata_file, 'r') as file:
      metadata_contents = file.read()
    
    metadata_contents = metadata_contents.replace(realtime_table_name, offline_table_name)
    
    with open(metadata_file, 'w') as file:
      file.write(metadata_contents)
    del metadata_contents

    # create new offline segment tar
    segment_offline_tar = path_for_offline_tar(segment_name)
    with tarfile.open(segment_offline_tar, 'w:gz') as tar:
        tar.add(segment_offline_dir, arcname=segment_name)

    print(f"segment {segment_name} transformed to offline segment to {segment_offline_tar}")
    return segment_offline_tar

def upload_segment_to_offline_table(segment_metadata):
    realtime_table_name = segment_metadata["segment.table.name"]
    segment_name = segment_metadata["segment.name"]
    segment_offline_tar = path_for_offline_tar(segment_name)
    table_name = realtime_table_name.replace("_REALTIME", "_OFFLINE")
    
    # POST segment as multipart/form-data for key 'segment'
    with open(segment_offline_tar, 'rb') as tar:
        response = requests.post(f'http://pinot-controller.pinot:9000/v2/segments?table={table_name}', files={
            'segment': tar
        })
        print(response)
        print(response.json())

def transform_and_upload_nth_segment_to_offline_table(segment_metadata, n):
    nth_meta = segment_metadata_of_nth_segment(segment_metadata, n, table_type="REALTIME")
    
    # download, transform and upload all in one row
    download_segment(nth_meta)
    untar_segment(nth_meta)
    transform_segment(nth_meta)
    upload_segment_to_offline_table(nth_meta)

In [None]:
segment_metadata_hybrid = segment_metadata_for_table("trips_hybrid")

transform_and_upload_nth_segment_to_offline_table(segment_metadata_hybrid, 0)
transform_and_upload_nth_segment_to_offline_table(segment_metadata_hybrid, 1)

### Segment URI Push
Let controller fetch segment tar from some blob store ([docs](https://docs.pinot.apache.org/basics/data-import/batch-ingestion#segment-uri-push)).

In [None]:
# response = requests.post('http://pinot-controller.pinot:9000/v2/segments?table=trips_hybrid', headers={
#     'UPLOAD_TYPE': 'URI',
#     'DOWNLOAD_URI': download_url
# })
# print(response)
# print(response.json())

### Segment Tar Push
Push segment tar to controller ([docs](https://docs.pinot.apache.org/basics/data-import/batch-ingestion#segment-tar-push)).

In [None]:
# upload transformed segment
# upload_segment_to_offline_table(segment_metadata_hybrid)

### Show external view for hybrid table

In [None]:
external_view_for_table_dataframe("trips_hybrid")

In [None]:
# segment_metadata_hybrid = segment_metadata_for_table("trips_hybrid")
# pd.DataFrame(segment_metadata_hybrid)

### Example Query: Top 5 drivers

In [None]:
query_for_hybrid = """
    SELECT driver_name, sum(count) as trips
    FROM trips_hybrid
    GROUP BY driver_name
    HAVING trips > 1
    LIMIT 5
    """

query_result_to_dataframe(query_sql(query_for_hybrid))