In [1]:
#
# Copyright 2021 Rovio Entertainment Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# PRE-REQUISITES

        AWS_PROFILE=smoke
        JAR_BUCKET=<REPLACE THIS>

## IF TESTING PYTHON CHANGES MANUALLY

3. Build a zip of the python wrapper:

        cd python \
          && zip --exclude='*.pyc' --exclude='*__pycache__*' --exclude='*~' --exclude='.pytest_cache' \
            -FSr ../target/rovio_ingest.zip rovio_ingest ; cd ..

4. Copy the zip to s3:

        aws s3 --profile $AWS_PROFILE cp \
          target/rovio_ingest.zip \
          s3://$JAR_BUCKET/tmp/juho/druid/python/rovio_ingest.zip

Then invert the boolean in the cell below to use it in spark_conf.
And skip the cell that would call install_pypi_package.

## IF TESTING JAR CHANGES MANUALLY:

1. Build the package (shaded jar) on command line:

        mvn package -DskipTests

2. A) Copy the shaded jar to s3:

        aws s3 --profile $AWS_PROFILE cp \
          target/rovio-ingest-1.0.5_spark_3.0.1-SNAPSHOT.jar \
          s3://$JAR_BUCKET/tmp/vivek/druid/jars/rovio-ingest-1.0.5_spark_3.0.1-SNAPSHOT.jar

2. B) Copy the plain jar to s3: 

        aws s3 --profile $AWS_PROFILE cp \
          target/original-rovio-ingest-1.0.5_spark_3.0.1-SNAPSHOT.jar \
          s3://$JAR_BUCKET/tmp/vivek/druid/jars/original-rovio-ingest-1.0.5_spark_3.0.1-SNAPSHOT.jar

Then invert the boolean in the cell below to use it in spark_conf.

In [2]:
%load_ext sparkmagic.magics

In [3]:
ENV = "cloud"
PREFIX = "tmp/vivek/"

In [4]:
import boto3

ssm_client = boto3.session.Session(profile_name=ENV).client(service_name="ssm")

# secrets can be added at
# https://console.aws.amazon.com/systems-manager/parameters/?region=us-east-1
def get_param(secret_name: str) -> str:
    return ssm_client.get_parameter(Name="/dataengineering/" + secret_name)["Parameter"]["Value"]

In [5]:
import json
from IPython import get_ipython

def set_spark_config(conf_dict):
    get_ipython().run_cell_magic('spark', 'config', json.dumps(conf_dict))

def create_spark_session_with_host(host):
    get_ipython().run_line_magic('spark', 'add -l python -u http://{}:8998'.format(host))

In [None]:
packages_bucket = get_param("rovio-ingest/packages_bucket")

spark_conf = {
  "conf": {
    "spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive": "true",
    "spark.sql.hive.caseSensitiveInferenceMode": "NEVER_INFER",
    "spark.pyspark.python": "python3",
    "spark.sql.session.timeZone": "UTC",
  }
}

# Assuming AWS EMR
if True:
    jars_base_path = "s3://{packages_bucket}/{PREFIX}druid/jars"
    jars = (
        f"{jars_base_path}/rovio-ingest-1.0.5_spark_3.0.1-SNAPSHOT.jar,"
        f"{jars_base_path}/datasketches-hive-1.2.0.jar,"
        f"{jars_base_path}/datasketches-java-4.1.0.jar,"
        f"{jars_base_path}/datasketches-memory-2.0.0.jar"
    )
    
    spark_conf["conf"]["spark.pyspark.python"] = "python3"
    spark_conf["conf"]["spark.pyspark.virtualenv.enabled"] = "true"
    spark_conf["conf"]["spark.pyspark.virtualenv.type"] = "native"
    spark_conf["conf"]["spark.pyspark.virtualenv.bin.path"] = "/usr/bin/virtualenv"
    spark_conf["conf"]["spark.jars"] = jars
         

set_spark_config(spark_conf)
create_spark_session_with_host(get_param("spark3/shared/host"))

# to debug problems in session creation, see livy session logs at http://{host}:8998/ui

In [None]:
%%spark

# This extension is provided by AWS EMR.
# If not on EMR:
#    A) install the module with pip on the cluster before creating the spark session
#    B) build a zip & use with spark.submit.pyFiles
# Use the latest stable release from PyPI.
spark.sparkContext.install_pypi_package("rovio-ingest")
# Use a specific version to install a pre-release from PyPI.
#spark.sparkContext.install_pypi_package("rovio-ingest==0.0.1.dev14")

In [8]:
%%spark

import boto3

ssm_client = boto3.session.Session(region_name="us-east-1").client(service_name="ssm")

def get_param(secret_name: str) -> str:
    return ssm_client.get_parameter(Name="/dataengineering/" + secret_name)["Parameter"]["Value"]

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
%%spark
from datetime import datetime
from pyspark.sql import functions as f, types as t, SparkSession

from py4j.java_gateway import java_import
from rovio_ingest import DRUID_SOURCE
from rovio_ingest.extensions.dataframe_extension import ConfKeys, add_dataframe_druid_extension

# fix df.explain on EMR 6
java_import(spark._sc._jvm, "org.apache.spark.sql.api.python.*")

add_dataframe_druid_extension()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Sketches build

In [10]:
%%spark

schema = 'revenue:DOUBLE, purchase_count:bigint, app_id:STRING, player_id STRING, event_date:TIMESTAMP'
df = spark.createDataFrame([[30.0, 10, 'testclient', "p1", datetime(2023, 1, 1)],
                            [15.0, 1, 'testclient', "p2", datetime(2023, 1, 1)],
                            [0.0, 0, 'testclient', "p3", datetime(2023, 1, 1)],
                            [0.0, 0, 'testclient', "p4", datetime(2023, 1, 1)],
                            [5.0, 2, 'testclient', "p1", datetime(2023, 1, 2)],
                            [0.0, 0, 'testclient', "p2", datetime(2023, 1, 2)]],
                            schema)
df = df.withColumn('dau', f.lit(1))
df.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+--------------+----------+---------+-------------------+---+
|revenue|purchase_count|    app_id|player_id|         event_date|dau|
+-------+--------------+----------+---------+-------------------+---+
|   30.0|            10|testclient|       p1|2023-01-01 00:00:00|  1|
|   15.0|             1|testclient|       p2|2023-01-01 00:00:00|  1|
|    0.0|             0|testclient|       p3|2023-01-01 00:00:00|  1|
|    0.0|             0|testclient|       p4|2023-01-01 00:00:00|  1|
|    5.0|             2|testclient|       p1|2023-01-02 00:00:00|  1|
|    0.0|             0|testclient|       p2|2023-01-02 00:00:00|  1|
+-------+--------------+----------+---------+-------------------+---+

In [11]:
%%spark

from py4j.java_gateway import java_import
from rovio_ingest import DRUID_SOURCE
from rovio_ingest.extensions.dataframe_extension import ConfKeys, add_dataframe_druid_extension

# fix df.explain on EMR 6
java_import(spark._sc._jvm, "org.apache.spark.sql.api.python.*")

add_dataframe_druid_extension()

df_prepared = df.repartition_by_druid_segment_size('event_date', segment_granularity='DAY')
df_prepared.explain(True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

== Parsed Logical Plan ==
Project [revenue#0, purchase_count#1L, app_id#2, player_id#3, event_date#4, dau#10, __PARTITION_TIME__#50, __PARTITION_NUM__#71]
+- RepartitionByExpression [__PARTITION_TIME__#50, __PARTITION_NUM__#71]
   +- Project [revenue#0, purchase_count#1L, app_id#2, player_id#3, event_date#4, dau#10, __PARTITION_TIME__#50, __num_rows__#62, cast((cast((__num_rows__#62 - 1) as double) / cast(5000000 as double)) as int) AS __PARTITION_NUM__#71]
      +- Project [revenue#0, purchase_count#1L, app_id#2, player_id#3, event_date#4, dau#10, __PARTITION_TIME__#50, __num_rows__#62]
         +- Project [revenue#0, purchase_count#1L, app_id#2, player_id#3, event_date#4, dau#10, __PARTITION_TIME__#50, __num_rows__#62, __num_rows__#62]
            +- Window [row_number() windowspecdefinition(__PARTITION_TIME__#50, __PARTITION_TIME__#50 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS __num_rows__#62], [__PARTITION_TIME__#50], [__PARTITION_TIME

In [12]:
%%spark
df_prepared.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- revenue: double (nullable = true)
 |-- purchase_count: long (nullable = true)
 |-- app_id: string (nullable = true)
 |-- player_id: string (nullable = true)
 |-- event_date: timestamp (nullable = true)
 |-- dau: integer (nullable = false)
 |-- __PARTITION_TIME__: timestamp (nullable = true)
 |-- __PARTITION_NUM__: integer (nullable = true)

In [13]:
%%spark
df_prepared.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+--------------+----------+---------+-------------------+---+-------------------+-----------------+
|revenue|purchase_count|    app_id|player_id|         event_date|dau| __PARTITION_TIME__|__PARTITION_NUM__|
+-------+--------------+----------+---------+-------------------+---+-------------------+-----------------+
|    0.0|             0|testclient|       p3|2023-01-01 00:00:00|  1|2023-01-01 00:00:00|                0|
|   30.0|            10|testclient|       p1|2023-01-01 00:00:00|  1|2023-01-01 00:00:00|                0|
|   15.0|             1|testclient|       p2|2023-01-01 00:00:00|  1|2023-01-01 00:00:00|                0|
|    0.0|             0|testclient|       p4|2023-01-01 00:00:00|  1|2023-01-01 00:00:00|                0|
|    0.0|             0|testclient|       p2|2023-01-02 00:00:00|  1|2023-01-02 00:00:00|                0|
|    5.0|             2|testclient|       p1|2023-01-02 00:00:00|  1|2023-01-02 00:00:00|                0|
+-------+--------------+----

In [14]:
%%spark
import json

metrics_spec = [{"type": "longSum", "name": "dau", "fieldName": "dau"},
                {"type": "longSum", "name": "purchase_count", "fieldName": "purchase_count"},
                {"type": "doubleSum", "name": "revenue", "fieldName": "revenue"},
                {
                    "type": "thetaSketch",
                    "name": "player_id_sketch",
                    "fieldName": "player_id",
                    "isInputThetaSketch": False,
                    "size": 4096
                },
                {
                    "type": "HLLSketchBuild",
                    "name": "player_id_hll",
                    "fieldName": "player_id"
                },
                {
                  "type" : "quantilesDoublesSketch",
                  "name" : "revenue_quantile",
                  "fieldName" : "revenue",
                },
                {
                  "type" : "quantilesDoublesSketch",
                  "name" : "purchase_count_quantile",
                  "fieldName" : "purchase_count",
                },
              ]
print(json.dumps(metrics_spec))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[{"type": "longSum", "name": "dau", "fieldName": "dau"}, {"type": "longSum", "name": "purchase_count", "fieldName": "purchase_count"}, {"type": "doubleSum", "name": "revenue", "fieldName": "revenue"}, {"type": "thetaSketch", "name": "player_id_sketch", "fieldName": "player_id", "isInputThetaSketch": false, "size": 4096}, {"type": "HLLSketchBuild", "name": "player_id_hll", "fieldName": "player_id"}, {"type": "quantilesDoublesSketch", "name": "revenue_quantile", "fieldName": "revenue"}, {"type": "quantilesDoublesSketch", "name": "purchase_count_quantile", "fieldName": "purchase_count"}]

In [15]:
%%spark
import json

DATA_SOURCE_NAME = "tmp_vivek_sketch_build_test"

df_prepared \
    .write \
    .mode("overwrite") \
    .format(DRUID_SOURCE) \
    .option(ConfKeys.DATA_SOURCE, DATA_SOURCE_NAME) \
    .option(ConfKeys.TIME_COLUMN, "event_date") \
    .option(ConfKeys.METADATA_DB_URI, get_param("druid/metadata_db/uri")) \
    .option(ConfKeys.METADATA_DB_USERNAME, get_param("druid/metadata_db/username")) \
    .option(ConfKeys.METADATA_DB_PASSWORD, get_param("druid/metadata_db/password")) \
    .option(ConfKeys.DEEP_STORAGE_S3_BUCKET, get_param("druid/deep_storage/bucket")) \
    .option(ConfKeys.DEEP_STORAGE_S3_BASE_KEY, "druid/segments") \
    .option(ConfKeys.METRICS_SPEC, json.dumps(metrics_spec)) \
    .option("druid.segment_storage.s3.disableacl", "true") \
    .save()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

To list the written data you can run:

    aws s3 --profile $AWS_PROFILE ls --recursive s3://{druid-deep-storage-bucket}/druid/segments/tmp_vivek_sketch_build_test/

To see something like:


    2023-07-18 20:41:04       1514 druid/segments/tmp_vivek_sketch_build_test/2023-01-01T00:00:00.000Z_2023-01-02T00:00:00.000Z/2023-07-18T17:40:43.280Z/0/index.zip
    2023-07-18 20:35:42       1385 druid/segments/tmp_vivek_sketch_build_test/2023-01-02T00:00:00.000Z_2023-01-
    2023-07-18 20:41:04       1466 druid/segments/tmp_vivek_sketch_build_test/2023-01-02T00:00:00.000Z_2023-01-03T00:00:00.000Z/2023-07-18T17:40:43.280Z/0/index.zip


And run the following in druid-sql (JDBC)


1.  raw data


    SELECT * FROM tmp_vivek_sketch_build_test

```
    __time app_id dau player_id_hll player_id_sketch purchase_count purchase_count_quantile revenue revenue_quantile
    2023-01-01T00:00:00.000Z testclient 4 "AgEHDAMIBAAOmR4KEjEsCG3Y2QaJOGoG" "AgMDAAAazJMEAAAAAACAPxM98wEU890HhRF7xjua2g9FFIa9XWbPMMoiRAt1/IA3" 11 "AgMIGoAAAAAEAAAAAAAAAAAAAAAAAAAAAAAAAAAAJEAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA8D8AAAAAAAAkQA==" 45.0 "AgMIGoAAAAAEAAAAAAAAAAAAAAAAAAAAAAAAAAAAPkAAAAAAAAAAAAAAAAAAAAAAAAAAAAAALkAAAAAAAAA+QA=="
    2023-01-02T00:00:00.000Z testclient 2 "AgEHDAMIAgCJOGoGbdjZBg==" "AgMDAAAazJMCAAAAAACAPxM98wEU890HyiJEC3X8gDc=" 2 "AgMIGoAAAAACAAAAAAAAAAAAAAAAAAAAAAAAAAAAAEAAAAAAAAAAAAAAAAAAAABA" 5.0 "AgMIGoAAAAACAAAAAAAAAAAAAAAAAAAAAAAAAAAAFEAAAAAAAAAAAAAAAAAAABRA"
```


2.  aggregation with sketches
  -- https://druid.apache.org/docs/latest/querying/sql-aggregations.html#sketch-functions


    SELECT __time,
      app_id,
      sum(dau) dau,
      APPROX_COUNT_DISTINCT_DS_HLL(player_id_hll) approx_hll_ds_players,
      APPROX_COUNT_DISTINCT_DS_THETA(player_id_sketch) approx_theta_ds_players,
      sum(revenue) revenue,
      APPROX_QUANTILE_DS(revenue_quantile, 0.25) revenue_quantile_25,
      APPROX_QUANTILE_DS(revenue_quantile, 0.50) revenue_quantile_50,
      APPROX_QUANTILE_DS(revenue_quantile, 0.75) revenue_quantile_75,
      APPROX_QUANTILE_DS(revenue_quantile, 0.99) revenue_quantile_99,
      sum(purchase_count) purchase_count,
      APPROX_QUANTILE_DS(purchase_count_quantile, 0.25) purchase_count_quantile_25,
      APPROX_QUANTILE_DS(purchase_count_quantile, 0.50) purchase_count_quantile_50,
      APPROX_QUANTILE_DS(purchase_count_quantile, 0.75) purchase_count_quantile_75,
      APPROX_QUANTILE_DS(purchase_count_quantile, 0.99) purchase_count_quantile_99
    FROM tmp_vivek_sketch_build_test
    group by 1,2
    order by 1,2

```
    __time app_id dau approx_hll_ds_players approx_theta_ds_players revenue revenue_quantile_25 revenue_quantile_50 revenue_quantile_75 revenue_quantile_99 purchase_count purchase_count_quantile_25 purchase_count_quantile_50 purchase_count_quantile_75 purchase_count_quantile_99
    2023-01-01T00:00:00.000Z testclient 4 4 4 45.0 0.0 15.0 30.0 30.0 11 0.0 1.0 10.0 10.0
    2023-01-02T00:00:00.000Z testclient 2 2 2 5.0 0.0 5.0 5.0 5.0 2 0.0 2.0 2.0 2.0
```

## Sketches merge (when source df contains sketches)

In [16]:
%%spark
# based on https://datasketches.apache.org/docs/Theta/ThetaHiveUDFs.html
# https://spark.apache.org/docs/latest/sql-ref-functions-udf-hive.html
spark.sql("create temporary function data2sketch as 'org.apache.datasketches.hive.theta.DataToSketchUDAF'")
spark.sql("create temporary function unionSketches as 'org.apache.datasketches.hive.theta.UnionSketchUDAF'")
spark.sql("create temporary function estimate as 'org.apache.datasketches.hive.theta.EstimateSketchUDF'")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[]

In [17]:
%%spark
df.createOrReplaceTempView("tmp_df")
spark.sql("select * from tmp_df").show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+--------------+----------+---------+-------------------+---+
|revenue|purchase_count|    app_id|player_id|         event_date|dau|
+-------+--------------+----------+---------+-------------------+---+
|   30.0|            10|testclient|       p1|2023-01-01 00:00:00|  1|
|   15.0|             1|testclient|       p2|2023-01-01 00:00:00|  1|
|    0.0|             0|testclient|       p3|2023-01-01 00:00:00|  1|
|    0.0|             0|testclient|       p4|2023-01-01 00:00:00|  1|
|    5.0|             2|testclient|       p1|2023-01-02 00:00:00|  1|
|    0.0|             0|testclient|       p2|2023-01-02 00:00:00|  1|
+-------+--------------+----------+---------+-------------------+---+

In [18]:
%%spark
df2 = spark.sql(
 """
 select
     event_date,
     app_id,
     sum(revenue) as revenue,
     sum(dau) as dau,
     sum(purchase_count) as purchase_count,
     data2sketch(player_id) as player_id_theta
  from tmp_df
  group by 1,2
 """
)

# Convert BinaryType column to Base64 encoded string.
df2 = df2.withColumn("player_id_theta", f.base64(f.col("player_id_theta")))
df2.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+----------+-------+---+--------------+--------------------+
|         event_date|    app_id|revenue|dau|purchase_count|     player_id_theta|
+-------------------+----------+-------+---+--------------+--------------------+
|2023-01-02 00:00:00|testclient|    5.0|  2|             2|AgMDAAAazJMCAAAAA...|
|2023-01-01 00:00:00|testclient|   45.0|  4|            11|AgMDAAAazJMEAAAAA...|
+-------------------+----------+-------+---+--------------+--------------------+

In [19]:
%%spark
df2.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- event_date: timestamp (nullable = true)
 |-- app_id: string (nullable = true)
 |-- revenue: double (nullable = true)
 |-- dau: long (nullable = true)
 |-- purchase_count: long (nullable = true)
 |-- player_id_theta: string (nullable = true)

In [20]:
%%spark
metrics_spec = [{"type": "longSum", "name": "dau", "fieldName": "dau"},
                {"type": "longSum", "name": "purchase_count", "fieldName": "purchase_count"},
                {"type": "doubleSum", "name": "revenue", "fieldName": "revenue"},
                {
                    "type": "thetaSketch",
                    "name": "player_id_theta",
                    "fieldName": "player_id_theta",
                    "isInputThetaSketch": True,
                    "size": 4096
                }
              ]
json.dumps(metrics_spec)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

'[{"type": "longSum", "name": "dau", "fieldName": "dau"}, {"type": "longSum", "name": "purchase_count", "fieldName": "purchase_count"}, {"type": "doubleSum", "name": "revenue", "fieldName": "revenue"}, {"type": "thetaSketch", "name": "player_id_theta", "fieldName": "player_id_theta", "isInputThetaSketch": true, "size": 4096}]'

In [21]:
%%spark
df_prepared2 = df2.repartition_by_druid_segment_size('event_date', segment_granularity='DAY')
df_prepared2.explain(True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

== Parsed Logical Plan ==
Project [event_date#4, app_id#2, revenue#161, dau#162L, purchase_count#163L, player_id_theta#175, __PARTITION_TIME__#231, __PARTITION_NUM__#252]
+- RepartitionByExpression [__PARTITION_TIME__#231, __PARTITION_NUM__#252]
   +- Project [event_date#4, app_id#2, revenue#161, dau#162L, purchase_count#163L, player_id_theta#175, __PARTITION_TIME__#231, __num_rows__#243, cast((cast((__num_rows__#243 - 1) as double) / cast(5000000 as double)) as int) AS __PARTITION_NUM__#252]
      +- Project [event_date#4, app_id#2, revenue#161, dau#162L, purchase_count#163L, player_id_theta#175, __PARTITION_TIME__#231, __num_rows__#243]
         +- Project [event_date#4, app_id#2, revenue#161, dau#162L, purchase_count#163L, player_id_theta#175, __PARTITION_TIME__#231, __num_rows__#243, __num_rows__#243]
            +- Window [row_number() windowspecdefinition(__PARTITION_TIME__#231, __PARTITION_TIME__#231 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentr

In [22]:
%%spark
df_prepared2.show(truncate=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+----------+-------+---+--------------+----------------------------------------------------------------+-------------------+-----------------+
|event_date         |app_id    |revenue|dau|purchase_count|player_id_theta                                                 |__PARTITION_TIME__ |__PARTITION_NUM__|
+-------------------+----------+-------+---+--------------+----------------------------------------------------------------+-------------------+-----------------+
|2023-01-01 00:00:00|testclient|45.0   |4  |11            |AgMDAAAazJMEAAAAAACAPxM98wEU890HhRF7xjua2g9FFIa9XWbPMMoiRAt1/IA3|2023-01-01 00:00:00|0                |
|2023-01-02 00:00:00|testclient|5.0    |2  |2             |AgMDAAAazJMCAAAAAACAPxM98wEU890HyiJEC3X8gDc=                    |2023-01-02 00:00:00|0                |
+-------------------+----------+-------+---+--------------+----------------------------------------------------------------+-------------------+-----------------+

In [23]:
%%spark

DATA_SOURCE_NAME = "tmp_vivek_sketch_build_test2"

df_prepared2 \
    .write \
    .mode("overwrite") \
    .format(DRUID_SOURCE) \
    .option(ConfKeys.DATA_SOURCE, DATA_SOURCE_NAME) \
    .option(ConfKeys.TIME_COLUMN, "event_date") \
    .option(ConfKeys.METADATA_DB_URI, get_param("druid/metadata_db/uri")) \
    .option(ConfKeys.METADATA_DB_USERNAME, get_param("druid/metadata_db/username")) \
    .option(ConfKeys.METADATA_DB_PASSWORD, get_param("druid/metadata_db/password")) \
    .option(ConfKeys.DEEP_STORAGE_S3_BUCKET, get_param("druid/deep_storage/bucket")) \
    .option(ConfKeys.DEEP_STORAGE_S3_BASE_KEY, "druid/segments") \
    .option(ConfKeys.METRICS_SPEC, json.dumps(metrics_spec)) \
    .option("druid.segment_storage.s3.disableacl", "true") \
    .save()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

To list the written data you can run:

    aws s3 --profile AWS_PROFILE ls --recursive s3://{druid-deep-storage-bucket}/druid/segments/tmp_vivek_sketch_build_test2/

To see something like:


    2023-07-19 10:53:45       1274 druid/segments/tmp_vivek_sketch_build_test2/2023-01-01T00:00:00.000Z_2023-01-02T00:00:00.000Z/2023-07-19T07:53:41.372Z/0/index.zip
    2023-07-19 10:53:45       1247 druid/segments/tmp_vivek_sketch_build_test2/2023-01-02T00:00:00.000Z_2023-01-03T00:00:00.000Z/2023-07-19T07:53:41.372Z/0/index.zip

And run this in druid-sql (JDBC)

1. raw data

    SELECT * FROM tmp_vivek_sketch_build_test2

```
    time app_id dau player_id_theta purchase_count revenue
    2023-01-01 00:00 testclient 4 "AgMDAAAazJMEAAAAAACAPxM98wEU890HhRF7xjua2g9FFIa9XWbPMMoiRAt1/IA3" 11 45	
    2023-01-02 00:00 testclient 2 "AgMDAAAazJMCAAAAAACAPxM98wEU890HyiJEC3X8gDc=" 2 5	
```

2.  aggregation with sketches

    Based on https://druid.apache.org/docs/latest/querying/sql-aggregations.html#sketch-functions

    SELECT __time,
      app_id,
      sum(dau) dau,
      APPROX_COUNT_DISTINCT_DS_THETA(player_id_theta) approx_theta_ds_players,
      sum(revenue) revenue
    FROM tmp_vivek_sketch_build_test2
    group by 1,2
    order by 1,2
  
```
    time             app_id     dau approx_theta_ds_players revenue
    2023-01-01 00:00 testclient 4 4 45	
    2023-01-02 00:00 testclient 2 2 5
```

In [24]:
%spark cleanup