In [5]:
import json
# s3 params
with open('s3_params.txt', 'r') as f:
    s3_params = json.loads(f.read())
s3_key = s3_params['access-key']
s3_secret = s3_params['secret-key']
s3_url = s3_params['endpoint_url']
s3_binance_cred_path = s3_params['cred_path']

In [6]:
# Read schema from file
with open('schema.txt', 'r') as f:
    schema_str = f.read()

In [7]:
from s3fs.core import S3FileSystem
# s3fs client
s3 = S3FileSystem(
     anon=False,
     key=s3_key,
     secret=s3_secret,
     use_ssl=False,
     client_kwargs={'endpoint_url': s3_url})

In [8]:
from pyspark import SparkConf
from pyspark.sql import SparkSession

config = {
    "spark.kubernetes.namespace": "spark",
    "spark.kubernetes.container.image": "localhost:32000/spark:3.1.2-hadoop-3.2.0-aws",
    "spark.executor.instances": "1",
    "spark.executor.memory": "1g",
    "spark.executor.cores": "1",
    "spark.driver.blockManager.port": "7777",
    "spark.driver.port": "2222",
    "spark.driver.host": "jupyter.spark.svc.cluster.local",
    "spark.driver.bindAddress": "0.0.0.0",
    "spark.hadoop.fs.s3a.endpoint": s3_url,
    "spark.hadoop.fs.s3a.access.key": s3_key,
    "spark.hadoop.fs.s3a.secret.key": s3_secret,
    "spark.hadoop.fs.s3a.connection.ssl.enabled": "false",
    "spark.hadoop.fs.s3a.path.style.access": "true",
    "spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
    "spark.hadoop.com.amazonaws.services.s3.enableV4": "true",
    "spark.hadoop.fs.s3a.aws.credentials.provider": "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider",
    "spark.hadoop.fs.s3a.committer.name": "directory",
    "spark.sql.sources.commitProtocolClass": "org.apache.spark.internal.io.cloud.PathOutputCommitProtocol",
    "spark.sql.parquet.output.committer.class": "org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter"
}

def get_spark_session(app_name: str, conf: SparkConf):
    conf.setMaster("k8s://https://kubernetes.default.svc.cluster.local")
    for key, value in config.items():
        conf.set(key, value)    
    return SparkSession.builder.appName(app_name).config(conf=conf).getOrCreate()

In [9]:
spark = get_spark_session("aws_localstack", conf)

21/11/01 16:34:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [10]:
spark.version

'3.1.2'

In [None]:
try:
    df = spark.read.csv('s3a://data/test/BTCUSDT_MinuteBars.csv', header=True)
    df.printSchema()
    print(df.count())
except Exception as exp:
    print(exp)

spark.stop()

In [37]:
spark = get_spark_session("localstack", conf)

try:
    df = spark.read.parquet('s3a://data/test/test.parquet/')
    df.printSchema()
    print(df.count())
except Exception as exp:
    print(exp)

#spark.stop()

[Stage 8:>                                                          (0 + 1) / 1]

Found duplicate column(s) in the data schema: `e`, `k_l`, `k_q`, `k_t`, `k_v`


21/10/15 21:17:38 WARN DataSource: Found duplicate column(s) in the data schema and the partition schema: `e`, `k_l`, `k_q`, `k_t`, `k_v`


In [11]:
from typing import Any, Dict, List
from s3fs.core import S3FileSystem
from pyarrow import Table, parquet as pq
from pandas import DataFrame, Series

def to_df(data: List[Dict[str, Any]]) -> DataFrame:                                                                                                         
    df = DataFrame()
    for item in data:
        indexes = []
        values = []
        for k, v in item.items():
            indexes.append(k)
            values.append(v)
        df = df.append(Series(values, index=indexes), ignore_index=True)
    return df

# s3fs client
fs = S3FileSystem(
     anon=False,
     key=s3_key,
     secret=s3_secret,
     use_ssl=False,
     client_kwargs={'endpoint_url': s3_url})

path_to_s3_object = "s3://data/test/test.parquet"

data = [
  {
    "hoge": 1,
    "foo": "blah",
  },
  {
    "boo": "test",
    "bar": 123,
  },
]
df = to_df(data)
pq.write_to_dataset(
    Table.from_pandas(df),
    path_to_s3_object,
    filesystem=fs,
    use_dictionary=True,
    #compression="snappy",
    version="2.0",
)

In [12]:
import argparse

from confluent_kafka import DeserializingConsumer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer
from confluent_kafka.serialization import StringDeserializer

In [None]:
def main(args):
    
    topic = args.topic

    schema_registry_conf = {'url': args.schema_registry}

    schema_registry_client = SchemaRegistryClient(schema_registry_conf)

    avro_deserializer = AvroDeserializer(schema_registry_client, schema_str)
    
    string_deserializer = StringDeserializer('utf_8')

    consumer_conf = {'bootstrap.servers': args.bootstrap_servers,
                     'key.deserializer': string_deserializer,
                     'value.deserializer': avro_deserializer,
                     'group.id': args.group,
                     'auto.offset.reset': "earliest"}

    consumer = DeserializingConsumer(consumer_conf)
    consumer.subscribe([topic])

    while True:
        try:
            # SIGINT can't be handled when polling, limit timeout to 1 second.
            msg = consumer.poll(1.0)
            print(msg.value())
            
        except KeyboardInterrupt:
            break

    consumer.close()


if __name__ == '__main__':
    parser = argparse.ArgumentParser(description="AvroDeserializingProducer")
    parser.add_argument('-b', dest="bootstrap_servers",
                        default="kafka.kafka.svc.cluster.local:9092", help="Bootstrap servers")
    parser.add_argument('-r', dest="schema_registry",
                        default="http://schema-registry.kafka.svc.cluster.local:8085", help="Schema registry url")
    parser.add_argument('-t', dest="topic", default="test", help="Topic")
    parser.add_argument('-s', dest="symbol", default="BTCUSDT", help="Symbol")
    parser.add_argument('-g', dest="group", default="test", help="Consumer group")
    main(parser.parse_args())

In [4]:
from typing import Any, Dict, List
from s3fs.core import S3FileSystem
from pyarrow import Table, parquet as pq
from pandas import DataFrame, Series
import argparse
from confluent_kafka import DeserializingConsumer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer
from confluent_kafka.serialization import StringDeserializer
import json

# s3 params
with open('s3_params.txt', 'r') as f:
    s3_params = json.loads(f.read())
s3_key = s3_params['access-key']
s3_secret = s3_params['secret-key']
s3_url = s3_params['endpoint_url']
s3_binance_cred_path = s3_params['cred_path']

# Read schema from file
with open('schema.txt', 'r') as f:
    schema_str = f.read()

def to_df(data: List[Dict[str, Any]]) -> DataFrame:                                                                                                         
    df = DataFrame()
    #for item in data:
    indexes = []
    values = []
    for k, v in data.items():
        indexes.append(k)
        values.append(v)
    df = df.append(Series(values, index=indexes), ignore_index=True)
    return df

# s3fs client
fs = S3FileSystem(
     anon=False,
     key=s3_key,
     secret=s3_secret,
     use_ssl=False,
     client_kwargs={'endpoint_url': s3_url})

path_to_s3_object = "s3://data/test/test.parquet"

topic = "test"

schema_registry_conf = {"url": "http://schema-registry.kafka.svc.cluster.local:8085"}

schema_registry_client = SchemaRegistryClient(schema_registry_conf)

avro_deserializer = AvroDeserializer(schema_registry_client, schema_str)

string_deserializer = StringDeserializer('utf_8')

consumer_conf = {'bootstrap.servers': "kafka.kafka.svc.cluster.local:9092",
                 'key.deserializer': string_deserializer,
                 'value.deserializer': avro_deserializer,
                 'group.id': "test",
                 #'auto.offset.reset': "earliest"
                }

consumer = DeserializingConsumer(consumer_conf)
consumer.subscribe([topic])

i = 0

while i < 3:
    i = i+1
    try:
        # SIGINT can't be handled when polling, limit timeout to 1 second.
        msg = consumer.poll(1.0)
        if msg is None:
            print("none")
            continue
            
        if msg is not None:
            print(hash(str(msg.value())))
            data = flatten_json(msg.value()) 
            print(type(data), data)
            
            df = to_df(data)
            print(type(df), df)
            pq.write_to_dataset(
                Table.from_pandas(df),
                path_to_s3_object,
                filesystem=fs,
                use_dictionary=True,
                #compression="snappy",
                version="2.0",
                )
    
    except KeyboardInterrupt:
        break

consumer.close()

"""
df = to_df(data)
pq.write_to_dataset(
    Table.from_pandas(df),
    path_to_s3_object,
    filesystem=fs,
    use_dictionary=True,
    #compression="snappy",
    version="2.0",
)
"""

none
none
none


%5|1635615963.227|REQTMOUT|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator/0: Timed out LeaveGroupRequest in flight (after 5003ms, timeout #0): possibly held back by preceeding blocking JoinGroupRequest with timeout in 295009ms
%4|1635615963.227|REQTMOUT|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator/0: Timed out 1 in-flight, 0 retry-queued, 0 out-queue, 0 partially-sent requests
%3|1635615963.227|FAIL|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator: kafka-0.kafka-headless.kafka.svc.cluster.local:9092: 1 request(s) timed out: disconnect (after 7993ms in state UP)


'\ndf = to_df(data)\npq.write_to_dataset(\n    Table.from_pandas(df),\n    path_to_s3_object,\n    filesystem=fs,\n    use_dictionary=True,\n    #compression="snappy",\n    version="2.0",\n)\n'

In [3]:
def flatten_json(nested_json):
    """

Flatten json object with nested keys into a single level.
        Args:
            nested_json: A nested json object.
        Returns:
            The flattened json object if successful, None otherwise.
    """
    out = {}

    def flatten(x, name=''):
        if type(x) is dict:
            for a in x:
                flatten(x[a], name + a + '_')
        elif type(x) is list:
            i = 0
            for a in x:
                flatten(a, name + str(i) + '_')
                i += 1
        else:
            out[name[:-1]] = x

    flatten(nested_json)
    return out

In [22]:
import collections
crumbs = False
def flatten(dictionary, parent_key=False, separator='.'):
    """
    Turn a nested dictionary into a flattened dictionary
    :param dictionary: The dictionary to flatten
    :param parent_key: The string to prepend to dictionary's keys
    :param separator: The string used to separate flattened keys
    :return: A flattened dictionary
    """

    items = []
    for key, value in dictionary.items():
        if crumbs: print('checking:',key)
        new_key = str(parent_key) + separator + key if parent_key else key
        if isinstance(value, collections.MutableMapping):
            if crumbs: print(new_key,': dict found')
            if not value.items():
                if crumbs: print('Adding key-value pair:',new_key,None)
                items.append((new_key,None))
            else:
                items.extend(flatten(value, new_key, separator).items())
        elif isinstance(value, list):
            if crumbs: print(new_key,': list found')
            if len(value):
                for k, v in enumerate(value):
                    items.extend(flatten({str(k): v}, new_key).items())
            else:
                if crumbs: print('Adding key-value pair:',new_key,None)
                items.append((new_key,None))
        else:
            if crumbs: print('Adding key-value pair:',new_key,value)
            items.append((new_key, value))
    return dict(items)