In [None]:
from pyspark import *
from pyspark.sql import SparkSession

import os
import socket

pod_ip = socket.gethostbyname(socket.gethostname())
os.environ['PYSPARK_PYTHON'] = 'python3' # Needs to be explicitly provided as env. Otherwise workers run Python 2.7
os.environ['PYSPARK_DRIVER_PYTHON'] = 'python3'  # Same

conf = SparkConf()
conf.setAppName('rawfile_to_parquet')
conf.set('spark.driver.extraJavaOptions', '-Dio.netty.tryReflectionSetAccessible=true')
conf.set('spark.executor.extraJavaOptions', '-Dio.netty.tryReflectionSetAccessible=true')
conf.set('spark.kubernetes.container.image', 'mccho8865/spark-py:3.0.2')
# conf.set('spark.master', 'k8s://https://kubernetes.default.svc')
conf.set('spark.master', 'k8s://https://192.168.0.2:6443')
conf.set('spark.submit.deployMode', 'client')
conf.set('spark.hadoop.fs.s3a.fast.upload', 'true')
# conf.set('spark.hadoop.fs.s3a.endpoint', 'rook-ceph-rgw-my-store.rook-ceph.svc')
conf.set('spark.hadoop.fs.s3a.endpoint', '192.168.0.2:30229')
conf.set('spark.hadoop.fs.s3a.connection.ssl.enabled', 'false')
conf.set('spark.hadoop.fs.s3a.path.style.access', 'true')
conf.set('spark.hadoop.fs.s3a.access.key', 'Z780FG2AP64YD0Y2EWS8')
conf.set('spark.hadoop.fs.s3a.secret.key', 'akGdNm3vY9xSCcyscq8StdTh6BMRGtt9FChidPgn')
conf.set('spark.hadoop.fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem')
conf.set('spark.kubernetes.file.upload.path', 's3a://spark')
conf.set('spark.kubernetes.node.selector.spark', '')
conf.set('spark.sql.session.timeZone', 'Asia/Seoul')
conf.set('spark.driver.extraJavaOptions', '-Duser.timezone=Asia/Seoul')
conf.set('spark.executor.extraJavaOptions', '-Duser.timezone=Asia/Seoul')
conf.set('spark.eventLog.enabled', 'True')
conf.set('spark.eventLog.dir', 's3a://logs/spark-hs/')

conf.set('spark.jars', '/jars/hadoop-aws-3.2.0.jar,/jars/aws-java-sdk-bundle-1.12.103.jar')
conf.set('spark.executor.instances', '1')
conf.set('spark.driver.memory', '4g')
conf.set('spark.executor.memory', '16g')
conf.set('spark.driver.host', pod_ip)
conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")

sc = SparkContext.getOrCreate(conf=conf)

spark = SparkSession(sc)

In [None]:
raw_df = spark.read.option("mode", "DROPMALFORMED").text('s3a://coin-bucket/warehouse/raw/ticker/')
raw_df = raw_df.where(f"dt = '{dt}'")
# raw_df = raw_df.where("20210513 <= dt and dt < 20210610")

In [None]:
raw_df.explain()

In [None]:
raw_df.show()

In [None]:
raw_df.explain()

In [None]:
raw_df.groupby('dt').count().show()

In [None]:
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import *
value_schema = StructType([ StructField('data', StringType()),
                            StructField('status', StringType())])
value_df = raw_df.withColumn('value_json', from_json('value', value_schema)) \
                 .select(col('value_json.data').alias('data'), col('value_json.status').alias('status'))
value_df = value_df.filter('status = 0000').select('data')
value_df.cache()
# value_df.limit(10).toPandas()['data'][0]

In [None]:
value_df.show()

In [None]:
ticker_schema = StructType([StructField('acc_trade_value', StringType(), True),
                            StructField('acc_trade_value_24H', StringType(), True),
                            StructField('closing_price', StringType(), True),
                            StructField('fluctate_24H', StringType(), True),
                            StructField('fluctate_rate_24H', StringType(), True),
                            StructField('max_price', StringType(), True),
                            StructField('min_price', StringType(), True),
                            StructField('opening_price', StringType(), True),
                            StructField('prev_closing_price', StringType(), True),
                            StructField('units_traded', StringType(), True),
                            StructField('units_traded_24H', StringType(), True),
                            StructField('coin', StringType(), True),
                            StructField('timestamp', StringType(), True)])
ticker_array_schema = ArrayType(ticker_schema, True)

In [None]:
import json
from pyspark.sql.functions import udf, explode, from_unixtime, to_date
@udf(ticker_array_schema)
def parse_raw_ticker(data):
    data = json.loads(data)
    timestamp = data['date']
    del data['date']
    coins = list(data.keys())
    out = []
    for coin in coins:
        coin_item = data[coin]
        coin_item["coin"] = coin
        coin_item["timestamp"] = timestamp
        out.append(coin_item)
    return out

In [None]:
parsed_df = value_df.withColumn('parsed_arr', explode(parse_raw_ticker('data')))
parsed_df = parsed_df.select('parsed_arr.*')

In [None]:
parsed_df.show(10)

In [None]:
out_df = parsed_df.select(col('coin').cast(StringType()),
                         from_unixtime(col('timestamp')/1000).alias('timestamp'),
                         col('opening_price').cast(DoubleType()),
                         col('closing_price').cast(DoubleType()),
                         col('min_price').cast(DoubleType()),
                         col('max_price').cast(DoubleType()),
                         col('units_traded').cast(DoubleType()),
                         col('acc_trade_value').cast(DoubleType()),
                         col('prev_closing_price').cast(DoubleType()),
                         col('units_traded_24H').cast(DoubleType()),
                         col('acc_trade_value_24H').cast(DoubleType()),
                         col('fluctate_24H').cast(DoubleType()),
                         col('fluctate_rate_24H').cast(DoubleType()),
                         from_unixtime(col('timestamp')/1000, 'yyyy-MM-dd').alias('dt'))
out_df = out_df.distinct()

In [None]:
out_df.repartition('dt').write.partitionBy(['dt']).mode('overwrite').parquet('s3a://coin-bucket/warehouse/data/ticker')

In [None]:
spark.stop()