In [1]:
# Initiate Spark Application

import findspark
findspark.init()

import pandas as pd
pd.set_option('display.max_colwidth', None)

from pyspark.sql.session import SparkSession

spark = (SparkSession.builder
.appName("binanceAnalysis")
.config("spark.sql.warehouse.dir","hdfs://localhost:9000/warehouse")
.getOrCreate())

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [6]:
# Read Data
import pyspark.sql.functions as F
spark.conf.set("spark.sql.caseSensitive", "true")

df = (spark.read
    .option("inferSchema", "true") 
    .option("recursiveFileLookup", "true")
    .option("multiLine", "true")
    .json("hdfs://localhost:9000/datalake/raw/binance/bitcoin_klines/"))

df = df.select(
    F.timestamp_seconds(df["E"]/1000).alias('eventTime'), 
    df["s"].alias('symbol'), 
    F.timestamp_seconds(df["k.t"]/1000).alias('startTime'), 
    F.timestamp_seconds(df["k.T"]/1000).alias('closeTime'), 
    df["k.i"].alias('interval'),
    df["k.o"].cast("double").alias('openPrice'), 
    df["k.c"].cast("double").alias('closePrice'), 
    df["k.h"].cast("double").alias('highPrice'),
    df["k.l"].cast("double").alias('lowPrice'), 
    df["k.v"].cast("integer").alias('baseVolume'),
    df["k.n"].cast("integer").alias('numberTrades'), 
    df["k.x"].alias('klineClosed'), 
    df["k.q"].cast("double").alias('quoteVolume'), 
    df["k.V"].cast("integer").alias('takerBuyBaseVolume'), 
    df["k.Q"].cast("double").alias('takerBuyQuoteVolume'),
)

df = df.withColumn("date",F.to_date("eventTime"))

df.toPandas()

[Stage 7:>                                                          (0 + 1) / 1]                                                                                

Unnamed: 0,eventTime,symbol,startTime,closeTime,interval,openPrice,closePrice,highPrice,lowPrice,baseVolume,numberTrades,klineClosed,quoteVolume,takerBuyBaseVolume,takerBuyQuoteVolume,date
0,2022-07-13 10:35:00.002,BTCUSDT,2022-07-13 10:34:00,2022-07-13 10:34:59.999,1m,19815.42,19825.59,19858.56,19803.91,226,5117,True,4.492710e+06,106,2.120155e+06,2022-07-13
1,2022-07-13 10:36:00.001,BTCUSDT,2022-07-13 10:35:00,2022-07-13 10:35:59.999,1m,19825.59,19830.34,19853.47,19812.08,212,4168,True,4.209448e+06,119,2.372130e+06,2022-07-13
2,2022-07-13 10:37:00.001,BTCUSDT,2022-07-13 10:36:00,2022-07-13 10:36:59.999,1m,19830.33,19853.23,19892.57,19827.79,395,6763,True,7.864629e+06,226,4.504673e+06,2022-07-13
3,2022-07-13 10:38:00.001,BTCUSDT,2022-07-13 10:37:00,2022-07-13 10:37:59.999,1m,19853.23,19875.28,19882.11,19841.11,257,4884,True,5.111216e+06,147,2.924248e+06,2022-07-13
4,2022-07-13 10:39:00.001,BTCUSDT,2022-07-13 10:38:00,2022-07-13 10:38:59.999,1m,19875.28,19896.87,19908.88,19848.86,292,6181,True,5.819311e+06,162,3.227757e+06,2022-07-13
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
149,2022-07-13 13:34:00.001,BTCUSDT,2022-07-13 13:33:00,2022-07-13 13:33:59.999,1m,19810.65,19807.62,19814.12,19805.99,33,1506,True,6.589258e+05,22,4.552730e+05,2022-07-13
150,2022-07-13 13:43:00.001,BTCUSDT,2022-07-13 13:42:00,2022-07-13 13:42:59.999,1m,19831.12,19825.49,19836.66,19825.48,42,1798,True,8.385730e+05,18,3.733190e+05,2022-07-13
151,2022-07-13 13:46:00.001,BTCUSDT,2022-07-13 13:45:00,2022-07-13 13:45:59.999,1m,19826.10,19813.29,19828.93,19810.00,44,1886,True,8.892718e+05,19,3.814693e+05,2022-07-13
152,2022-07-13 13:51:00.001,BTCUSDT,2022-07-13 13:50:00,2022-07-13 13:50:59.999,1m,19847.03,19837.73,19850.00,19832.32,32,1951,True,6.391479e+05,17,3.476668e+05,2022-07-13


In [21]:
# Save to Parquet

(df.coalesce(1)
    .write
    .partitionBy("symbol","date")
    .mode("overwrite")
    .parquet("hdfs://localhost:9000/datalake/std/binance/"))