In [1]:
import time
from confluent_kafka import DeserializingConsumer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer
from confluent_kafka.serialization import StringDeserializer

In [2]:
# Read schema from file
with open('schema3.txt', 'r') as f:
    schema_str = f.read()

In [4]:
print(time.time())
topic = "tb-kafka-avro8"

schema_registry_conf = {"url": "http://10.105.144.163:8085"}

schema_registry_client = SchemaRegistryClient(schema_registry_conf)

avro_deserializer = AvroDeserializer(schema_registry_client, schema_str)

string_deserializer = StringDeserializer('utf_8')

consumer_conf = {'bootstrap.servers': "10.102.117.121:9092",
                 'key.deserializer': string_deserializer,
                 'value.deserializer': avro_deserializer,
                 'group.id': "test",
                 'auto.offset.reset': "latest"
                }

consumer = DeserializingConsumer(consumer_conf)
consumer.subscribe([topic])

i = 0

while i < 10:
    i+=1
    try:
        # SIGINT can't be handled when polling, limit timeout to 1 second.
        msg = consumer.poll(1.0)
        if msg is None:
            print("none")
            continue
        
        if msg is not None:
            print(msg.value())

    except KeyboardInterrupt:
        break

consumer.close()

1672313302.0463
{'ts': 1672313289395, 'data_key': 'dummy_humidity', 'data_value': '67.36'}
{'ts': 1672313289395, 'data_key': 'dummy_location', 'data_value': 'xjbm84nkttaknwfdpldi'}
{'ts': 1672313289394, 'data_key': 'dummy_temp', 'data_value': '20'}
{'ts': 1672313289396, 'data_key': 'dummy_bool', 'data_value': 'false'}
{'ts': 1672313294393, 'data_key': 'dummy_bool', 'data_value': 'false'}
{'ts': 1672313294393, 'data_key': 'dummy_location', 'data_value': 'hpgj4fciimna3pvnp14i'}
{'ts': 1672313294392, 'data_key': 'dummy_temp', 'data_value': '21'}
{'ts': 1672313294392, 'data_key': 'dummy_temp', 'data_value': '21'}
{'ts': 1672313299402, 'data_key': 'dummy_bool', 'data_value': 'true'}
{'ts': 1672313299402, 'data_key': 'dummy_location', 'data_value': 'qk0d0sfbkiragow7y14i'}


In [14]:
def hello():
    print("hello")

In [16]:
hello().

hello


In [None]:
import json
# s3 params
with open('s3_params.txt', 'r') as f:
    s3_params = json.loads(f.read())
s3_key = s3_params['access-key']
s3_secret = s3_params['secret-key']
s3_url = s3_params['endpoint_url']
s3_binance_cred_path = s3_params['cred_path']

In [None]:
from s3fs.core import S3FileSystem
# s3fs client
s3 = S3FileSystem(
     anon=False,
     key=s3_key,
     secret=s3_secret,
     use_ssl=False,
     client_kwargs={'endpoint_url': s3_url})

In [None]:
from pyspark import SparkConf
from pyspark.sql import SparkSession

config = {
    "spark.kubernetes.namespace": "spark",
    "spark.kubernetes.container.image": "localhost:32000/spark:3.1.2-hadoop-3.2.0-aws",
    "spark.executor.instances": "1",
    "spark.executor.memory": "1g",
    "spark.executor.cores": "1",
    "spark.driver.blockManager.port": "7777",
    "spark.driver.port": "2222",
    "spark.driver.host": "jupyter.spark.svc.cluster.local",
    "spark.driver.bindAddress": "0.0.0.0",
    "spark.hadoop.fs.s3a.endpoint": s3_url,
    "spark.hadoop.fs.s3a.access.key": s3_key,
    "spark.hadoop.fs.s3a.secret.key": s3_secret,
    "spark.hadoop.fs.s3a.connection.ssl.enabled": "false",
    "spark.hadoop.fs.s3a.path.style.access": "true",
    "spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
    "spark.hadoop.com.amazonaws.services.s3.enableV4": "true",
    "spark.hadoop.fs.s3a.aws.credentials.provider": "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider",
    "spark.hadoop.fs.s3a.committer.name": "directory",
    "spark.sql.sources.commitProtocolClass": "org.apache.spark.internal.io.cloud.PathOutputCommitProtocol",
    "spark.sql.parquet.output.committer.class": "org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter"
}

def get_spark_session(app_name: str, conf: SparkConf):
    conf.setMaster("k8s://https://kubernetes.default.svc.cluster.local")
    for key, value in config.items():
        conf.set(key, value)    
    return SparkSession.builder.appName(app_name).config(conf=conf).getOrCreate()

In [None]:
spark = get_spark_session("aws_localstack", conf)

In [None]:
try:
    df = spark.read.csv('s3a://data/test/BTCUSDT_MinuteBars.csv', header=True)
    df.printSchema()
    print(df.count())
except Exception as exp:
    print(exp)

spark.stop()

In [None]:
spark = get_spark_session("localstack", conf)

try:
    df = spark.read.parquet('s3a://data/test/test.parquet/')
    df.printSchema()
    print(df.count())
except Exception as exp:
    print(exp)

#spark.stop()

In [None]:
from typing import Any, Dict, List
from s3fs.core import S3FileSystem
from pyarrow import Table, parquet as pq
from pandas import DataFrame, Series

def to_df(data: List[Dict[str, Any]]) -> DataFrame:                                                                                                         
    df = DataFrame()
    for item in data:
        indexes = []
        values = []
        for k, v in item.items():
            indexes.append(k)
            values.append(v)
        df = df.append(Series(values, index=indexes), ignore_index=True)
    return df

# s3fs client
fs = S3FileSystem(
     anon=False,
     key=s3_key,
     secret=s3_secret,
     use_ssl=False,
     client_kwargs={'endpoint_url': s3_url})

path_to_s3_object = "s3://data/test/test.parquet"

data = [
  {
    "hoge": 1,
    "foo": "blah",
  },
  {
    "boo": "test",
    "bar": 123,
  },
]
df = to_df(data)
pq.write_to_dataset(
    Table.from_pandas(df),
    path_to_s3_object,
    filesystem=fs,
    use_dictionary=True,
    #compression="snappy",
    version="2.0",
)

In [None]:
topic = "avro-test"

schema_registry_conf = {'url': 'http://schema-registry.kafka.svc.cluster.local:8085'}

schema_registry_client = SchemaRegistryClient(schema_registry_conf)

avro_deserializer = AvroDeserializer(schema_registry_client, schema_str)

string_deserializer = StringDeserializer('utf_8')

consumer_conf = {'bootstrap.servers': 'kafka.kafka.svc.cluster.local:9092',
                    'key.deserializer': string_deserializer,
                    'value.deserializer': avro_deserializer,
                    'group.id': "test",
                    'auto.offset.reset': "earliest"}

consumer = DeserializingConsumer(consumer_conf)
consumer.subscribe([topic])

while True:
    try:
        # SIGINT can't be handled when polling, limit timeout to 1 second.
        msg = consumer.poll(1.0)
        print(msg.value())
        
    except KeyboardInterrupt:
        break

consumer.close()


    parser = argparse.ArgumentParser(description="AvroDeserializingProducer")
    parser.add_argument('-b', dest="bootstrap_servers",
                        default="kafka.kafka.svc.cluster.local:9092", help="Bootstrap servers")
    parser.add_argument('-r', dest="schema_registry",
                        default="http://schema-registry.kafka.svc.cluster.local:8085", help="Schema registry url")
    parser.add_argument('-t', dest="topic", default="test", help="Topic")
    parser.add_argument('-s', dest="symbol", default="BTCUSDT", help="Symbol")
    parser.add_argument('-g', dest="group", default="test", help="Consumer group")


In [None]:
topic = "test"

schema_registry_conf = {"url": "http://schema-registry.kafka.svc.cluster.local:8085"}

schema_registry_client = SchemaRegistryClient(schema_registry_conf)

avro_deserializer = AvroDeserializer(schema_registry_client, schema_str)

string_deserializer = StringDeserializer('utf_8')

consumer_conf = {'bootstrap.servers': "kafka.kafka.svc.cluster.local:9092",
                 'key.deserializer': string_deserializer,
                 'value.deserializer': avro_deserializer,
                 'group.id': "test",
                 'auto.offset.reset': "earliest"}

consumer = DeserializingConsumer(consumer_conf)
consumer.subscribe([topic])

while True:
    try:
        # SIGINT can't be handled when polling, limit timeout to 1 second.
        msg = consumer.poll(1.0)
        print(msg.value())

    except KeyboardInterrupt:
        break

consumer.close()


from typing import Any, Dict, List
from s3fs.core import S3FileSystem
from pyarrow import Table, parquet as pq
from pandas import DataFrame, Series

def to_df(data: List[Dict[str, Any]]) -> DataFrame:                                                                                                         
    df = DataFrame()
    for item in data:
        indexes = []
        values = []
        for k, v in item.items():
            indexes.append(k)
            values.append(v)
        df = df.append(Series(values, index=indexes), ignore_index=True)
    return df

# s3fs client
fs = S3FileSystem(
     anon=False,
     key=s3_key,
     secret=s3_secret,
     use_ssl=False,
     client_kwargs={'endpoint_url': s3_url})

path_to_s3_object = "s3://data/test/test.parquet"

data = [
  {
    "hoge": 1,
    "foo": "blah",
  },
  {
    "boo": "test",
    "bar": 123,
  },
]
df = to_df(data)
pq.write_to_dataset(
    Table.from_pandas(df),
    path_to_s3_object,
    filesystem=fs,
    use_dictionary=True,
    #compression="snappy",
    version="2.0",
)