### Parameters

In [None]:
BOOTSTRAP_SERVERS = 'b-2.emartai-streamhub-d.bnlyhx.c2.kafka.ap-northeast-2.amazonaws.com:9094,b-1.emartai-streamhub-d.bnlyhx.c2.kafka.ap-northeast-2.amazonaws.com:9094,b-3.emartai-streamhub-d.bnlyhx.c2.kafka.ap-northeast-2.amazonaws.com:9094'
CONSUMER_CONFIGS = 'security.protocol=SSL,ssl.truststore.location=kafka-to-bigquery_certs_emartai-streamhub-datalake-cluster-stg_kafka.client.truststore.jks,ssl.truststore.password=changeit,ssl.keystore.location=kafka-to-bigquery_certs_emartai-streamhub-datalake-cluster-stg_kafka.client.keystore.jks,ssl.keystore.password=changeit,ssl.key.password=changeit'
INPUT_TOPICS = 'tfactory-checkin'
CONSUMER_GROUP = 'test-group-1'
OUTPUT_TABLE = 'test_db.test_table'
OUTPUT_VIEW = 'test_db.test_view'
OUTPUT_PATH = 'hdfs:///tmp/test/test_table/'
DT = '20210605'
JOB = 'kafka-to-ye-hive-tfactory-checkin-test_db.test_table-20210605'

In [None]:
import os
os.environ['HADOOP_USER_NAME'] = 'airflow'

In [None]:
consumer_config_list = []
remote_file_list = []
spark_file_list = []

for config in CONSUMER_CONFIGS.split(','):
    tokens = config.split('=')
    key = tokens[0]
    value = tokens[1]
    if value.startswith('hdfs://'):
        remote_file_list.append(value)
        
        filename = value.split('/').pop()
        spark_file_list.append(filename)
        
        consumer_config_list.append(f'{key}={filename}')
    else:
        consumer_config_list.append(f'{key}={value}')
        
CONSUMER_CONFIGS = ','.join(consumer_config_list)
print(CONSUMER_CONFIGS)

REMOTE_FILES = ','.join(remote_file_list)
print(REMOTE_FILES)
%env REMOTE_FILES={REMOTE_FILES}

SPARK_FILES = ','.join(spark_file_list)

['security.protocol', 'SSL']
['ssl.truststore.location', 'kafka-to-bigquery_certs_emartai-streamhub-datalake-cluster-stg_kafka.client.truststore.jks']
['ssl.truststore.password', 'changeit']
['ssl.keystore.location', 'kafka-to-bigquery_certs_emartai-streamhub-datalake-cluster-stg_kafka.client.keystore.jks']
['ssl.keystore.password', 'changeit']
['ssl.key.password', 'changeit']

env: REMOTE_FILES=



In [4]:
%%bash -x

set -ex

str=$REMOTE_FILES

readarray -d , -t strarr <<<"$str"
for (( n=0; n<${#strarr[*]}; n++ ))  
do
    echo ${strarr[n]}
    hadoop fs -copyToLocal -f ${strarr[n]} .
done

+ set -ex
+ str=
+ readarray -d , -t strarr
+ (( n=0 ))
+ (( n<1 ))
+ echo
+ hadoop fs -copyToLocal -f .





+ (( n++  ))
+ (( n<1 ))


In [8]:
from pydatafabric.vault_utils import get_secrets

def get_spark_for_kafka(scale=0, queue=None):
    import os
    import uuid
    import tempfile
    from pyspark.sql import SparkSession
    from pydatafabric.vault_utils import get_secrets

    tmp_uuid = str(uuid.uuid4())
    app_name = f"emart-{os.environ.get('USER', 'default')}-{tmp_uuid}"
    if not queue:
        if "JUPYTERHUB_USER" in os.environ:
            queue = "dmig_eda"
        else:
            queue = "airflow_job"
    os.environ["ARROW_PRE_0_15_IPC_FORMAT"] = "1"

    key = get_secrets(mount_point="datafabric",path="gcp/emart-datafabric/datafabric")["config"]    
    key_file_name = tempfile.mkstemp()[1]
    with open(key_file_name, "wb") as key_file:
        key_file.write(key.encode())
        os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = key_file.name

    if scale in [1, 2, 3, 4]:
        spark = (
            SparkSession.builder.config("spark.app.name", app_name)
            .config("spark.driver.memory", f"{scale*8}g")
            .config("spark.executor.memory", f"{scale*3}g")
            .config("spark.executor.instances", f"{scale*8}")
            .config("spark.driver.maxResultSize", f"{scale*4}g")
            .config("spark.rpc.message.maxSize", "1024")
            .config("spark.yarn.queue", queue)
            .config("spark.ui.enabled", "false")
            .config("spark.port.maxRetries", "128")
            .config("spark.executorEnv.ARROW_PRE_0_15_IPC_FORMAT", "1")
            .config("spark.yarn.appMasterEnv.ARROW_PRE_0_15_IPC_FORMAT", "1")
            .config(
                "spark.jars",
                "hdfs:///jars/spark-bigquery-with-dependencies_2.11-0.17.3.jar,hdfs:///jars/spark-sql-kafka/net.jpountz.lz4_lz4-1.3.0.jar,hdfs:///jars/spark-sql-kafka/org.apache.kafka_kafka-clients-0.10.0.1.jar,hdfs:///jars/spark-sql-kafka/org.apache.spark_spark-sql-kafka-0-10_2.11-2.3.1.jar,hdfs:///jars/spark-sql-kafka/org.slf4j_slf4j-api-1.7.16.jar,hdfs:///jars/spark-sql-kafka/org.spark-project.spark_unused-1.0.0.jar,hdfs:///jars/spark-sql-kafka/org.xerial.snappy_snappy-java-1.1.2.6.jar",
            )
            .config("spark.files", SPARK_FILES)
            .enableHiveSupport()
            .getOrCreate()
        )
    else:
        spark = (
            SparkSession.builder.config("spark.app.name", app_name)
            .config("spark.driver.memory", "6g")
            .config("spark.executor.memory", "8g")
            .config("spark.shuffle.service.enabled", "true")
            .config("spark.dynamicAllocation.enabled", "true")
            .config("spark.dynamicAllocation.maxExecutors", "200")
            .config("spark.driver.maxResultSize", "6g")
            .config("spark.rpc.message.maxSize", "1024")
            .config("spark.yarn.queue", queue)
            .config("spark.ui.enabled", "false")
            .config("spark.port.maxRetries", "128")
            .config("spark.executorEnv.ARROW_PRE_0_15_IPC_FORMAT", "1")
            .config("spark.yarn.appMasterEnv.ARROW_PRE_0_15_IPC_FORMAT", "1")
            .config(
                "spark.jars",
                "gs://external_libs/spark/jars/spark-bigquery-with-dependencies_2.11-0.16.1.jar,gs://external_libs/spark/jars/spark-sql-kafka/net.jpountz.lz4_lz4-1.3.0.jar,gs://external_libs/spark/jars/spark-sql-kafka/org.apache.kafka_kafka-clients-0.10.0.1.jar,gs://external_libs/spark/jars/spark-sql-kafka/org.apache.spark_spark-sql-kafka-0-10_2.11-2.3.1.jar,gs://external_libs/spark/jars/spark-sql-kafka/org.slf4j_slf4j-api-1.7.16.jar,gs://external_libs/spark/jars/spark-sql-kafka/org.spark-project.spark_unused-1.0.0.jar,gs://external_libs/spark/jars/spark-sql-kafka/org.xerial.snappy_snappy-java-1.1.2.6.jar",
            )
            .config("spark.files", SPARK_FILES)
            .enableHiveSupport()
            .getOrCreate()
        )
    spark.conf.set("spark.sql.execution.arrow.enabled", "true")
    return spark

spark = get_spark_for_kafka()

Exception in thread "main" java.io.IOException: Error accessing gs://external_libs/spark/jars/spark-bigquery-with-dependencies_2.11-0.16.1.jar
	at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.getObject(GoogleCloudStorageImpl.java:2140)
	at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.getItemInfo(GoogleCloudStorageImpl.java:2032)
	at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.getFileInfoInternal(GoogleCloudStorageFileSystem.java:1091)
	at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.getFileInfo(GoogleCloudStorageFileSystem.java:1065)
	at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.getFileStatus(GoogleHadoopFileSystemBase.java:955)
	at org.apache.hadoop.fs.Globber.getFileStatus(Globber.java:65)
	at org.apache.hadoop.fs.Globber.doGlob(Globber.java:281)
	at org.apache.hadoop.fs

Exception: Java gateway process exited before sending its port number

In [None]:
input_topics_set = "('" + "','".join(INPUT_TOPICS.split(',')) + "')"
print(input_topics_set)

query = f"""
SELECT topic, partition, max(offset) as max_offset
FROM {OUTPUT_TABLE}
WHERE 1 = 1
  AND topic IN {input_topics_set}
  AND dt < '{DT}'
group by topic, partition
"""
print(query)

# get sink max offset
max_offsets_df = spark.sql(query)
max_offsets_pdf = max_offsets_df.toPandas()
max_offsets_pdf

In [None]:
max_offsets_pdf['starting_offset'] = max_offsets_pdf['max_offset'].map(lambda x: x+1)
starting_offsets_pdf = max_offsets_pdf.drop(columns=['max_offset'])
print(starting_offsets_pdf)

starting_offsets = None

if len(starting_offsets_pdf.index) > 0:
    starting_offsets = starting_offsets_pdf.values.tolist()
    print(starting_offsets)

    from itertools import groupby
    starting_offsets_dict = dict([(k, dict(map(lambda x: x[1:], list(g)))) for k, g in groupby(starting_offsets, lambda x: x[0])])
    print(starting_offsets_dict)

    import json
    starting_offsets = json.dumps(starting_offsets_dict)

print(starting_offsets)

In [None]:
df_reader = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS) \
  .option("subscribe", INPUT_TOPICS) \
  .option("includeHeaders", "true") \
  .option("enable.auto.commit", "false")

if starting_offsets is not None:
    print(starting_offsets)
    df_reader = df_reader \
        .option("startingOffsets", starting_offsets)\
        .option("auto.offset.reset", "earliest") \
        .option("failOnDataLoss", "false")
    
if CONSUMER_CONFIGS != '':
    config_tokens = CONSUMER_CONFIGS.split(',')
    print(config_tokens)
    
    for token in config_tokens:
        key_value = token.split('=')
        print(key_value)
        
        df_reader = df_reader.option(f"kafka.{key_value[0]}", key_value[1])
        
df = df_reader.load()
df.printSchema()
df.show()

In [None]:
df.registerTempTable("kafka")

query = f"""
INSERT OVERWRITE TABLE {OUTPUT_TABLE}
PARTITION (dt = "{DT}")
SELECT
    topic
  , CAST(partition AS BIGINT) AS partition
  , CAST(offset AS BIGINT) AS offset
  , CAST(CAST(timestamp AS DOUBLE) * 1000 AS BIGINT) AS timestamp
  , CAST(timestampType AS BIGINT) AS timestamp_type
  , CAST(key AS STRING) AS key
  , CAST(value AS STRING) AS value
  , CAST(NULL AS STRING) AS headers
  , "{JOB}" AS job
FROM kafka
"""
print(query)

spark.sql(query).show()

In [None]:
df_output = spark.sql(f"""
SELECT * FROM {OUTPUT_TABLE} WHERE dt = '{DT}'
""")
df_output.show()
print(f"df_output.count() = {df_output.count()}")

In [None]:
df_output = spark.sql(f"""
SELECT * FROM {OUTPUT_VIEW} WHERE dt = '{DT}'
""")
df_output.show()
print(f"df_output.count() = {df_output.count()}")

In [None]:
path = f'{OUTPUT_PATH}/dt={DT}'
df_output.write.mode("overwrite").parquet(path);

!hadoop fs -ls {OUTPUT_PATH}
!hadoop fs -ls {path}

In [None]:
spark.stop()