In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.hortonworks.shc:shc-core:1.1.0.3.1.2.8-3 pyspark-shell --files $SPARK_HOME/conf/hbase-site.xml'

In [2]:
from pyspark.sql import SQLContext, SparkSession

In [3]:
spark = SparkSession \
        .builder \
        .appName('test') \
        .getOrCreate()

In [4]:
sc = spark.sparkContext

In [5]:
sqlc = SQLContext(sc)

In [6]:
import json

catalog = json.dumps({
    "table": {"namespace": "default", "name": "coins"},
    "rowkey": "key",
    "columns": {
        "id":{"cf": "rowkey", "col": "key", "type": "string"},
        "open":{"cf": "p", "col": "open", "type": "float"},
        "high":{"cf": "p", "col": "high", "type": "float"},
        "low":{"cf": "p", "col": "low", "type": "float"},
        "close":{"cf": "p", "col": "close", "type": "float"},
    }
})

In [19]:
import pyspark.sql.functions as F
import pyspark.sql.types as t
from datetime import datetime

df = sqlc.read.options(catalog=catalog) \
    .format('org.apache.spark.sql.execution.datasources.hbase') \
    .load()

@F.udf(t.StructType([t.StructField("exchange", t.StringType(), True), t.StructField("symbol", t.StringType(), True), t.StructField("time", t.TimestampType(), True)]))
def split_cols(array):
    return (array[0], array[1], datetime.fromtimestamp(int(array[2])))

df.withColumn('id', split_cols(F.split('id', '\n'))) \
  .select('id.*', 'open', 'high', 'low', 'close').show()

+--------+-------+-------------------+--------+--------+--------+--------+
|exchange| symbol|               time|    open|    high|     low|   close|
+--------+-------+-------------------+--------+--------+--------+--------+
|Coinbase|BTC-USD|2020-07-31 09:10:00|11122.24| 11152.9|11121.97| 11152.9|
|Coinbase|BTC-USD|2020-07-31 09:15:00| 11153.4|11160.75|11149.05|11159.81|
|Coinbase|BTC-USD|2020-07-31 09:20:00| 11159.8| 11159.8|11145.72|11147.45|
|Coinbase|BTC-USD|2020-07-31 09:25:00|11147.45|11157.13|11147.44|11155.94|
|Coinbase|BTC-USD|2020-07-31 09:30:00|11155.93|11155.94|11134.48|11137.29|
|Coinbase|BTC-USD|2020-07-31 09:35:00|11137.29|11152.34|11137.17|11149.96|
|Coinbase|BTC-USD|2020-07-31 09:40:00|11151.22|11151.23|11119.59|11125.41|
|Coinbase|BTC-USD|2020-07-31 09:45:00|11125.41|11125.41|11120.13|11120.14|
|Coinbase|BTC-USD|2020-07-31 09:50:00|11120.13|11123.55|11115.67|11118.46|
|Coinbase|BTC-USD|2020-07-31 09:55:00|11118.45|11123.74|11115.82|11117.06|
|Coinbase|BTC-USD|2020-07

In [8]:
df.filter("id >= 'Coinbase\nLTC-USD\n1596185000' AND id <= 'Coinbase\nLTC-USD\n1596187000'") \
  .withColumn('id', split_cols(F.split('id', '\n'))) \
  .select('id.*', 'open', 'high', 'low', 'close') \
  .show()

+--------+-------+-------------------+-----+-----+-----+-----+
|exchange| symbol|               time| open| high|  low|close|
+--------+-------+-------------------+-----+-----+-----+-----+
|Coinbase|LTC-USD|2020-07-31 09:10:00|56.78|56.95|56.78|56.94|
|Coinbase|LTC-USD|2020-07-31 09:15:00|56.95|56.97|56.94|56.96|
+--------+-------+-------------------+-----+-----+-----+-----+



In [20]:
t0 = int(datetime().now().timestamp())
t1 = int((datetime.now() - datetime.timedelta(hour=1)).timestamp())

eth_df = df.filter("id >= 'Coinbase\nLTC-USD\n1596185000' AND id <= 'Coinbase\nLTC-USD\n1596187000'") \
  .withColumn('id', split_cols(F.split('id', '\n'))) \
  .select('id.*', 'open', 'high', 'low', 'close')

eth_df.show()

+--------+-------+-------------------+------+------+------+------+
|exchange| symbol|               time|  open|  high|   low| close|
+--------+-------+-------------------+------+------+------+------+
|Coinbase|ETH-USD|2020-07-31 09:10:00|338.97| 339.0|338.81| 339.0|
|Coinbase|ETH-USD|2020-07-31 09:15:00|339.23|339.61| 338.9|339.55|
|Coinbase|ETH-USD|2020-07-31 09:20:00|339.55|339.56|339.08|339.23|
|Coinbase|ETH-USD|2020-07-31 09:25:00|339.24|339.94|339.23|339.93|
|Coinbase|ETH-USD|2020-07-31 09:30:00|339.94|339.94|339.12|339.25|
|Coinbase|ETH-USD|2020-07-31 09:35:00|339.26| 339.4|338.94|339.38|
|Coinbase|ETH-USD|2020-07-31 09:40:00|339.38|339.38| 336.9|337.36|
|Coinbase|ETH-USD|2020-07-31 09:45:00|337.42|338.05|337.39|337.83|
|Coinbase|ETH-USD|2020-07-31 09:50:00|337.76|337.76|337.15| 337.5|
|Coinbase|ETH-USD|2020-07-31 09:55:00| 337.5|337.74| 337.5|337.74|
|Coinbase|ETH-USD|2020-07-31 10:00:00|337.89|337.89|337.63|337.85|
|Coinbase|ETH-USD|2020-07-31 10:05:00|337.71|337.71|337.27|337

In [10]:
import plotly.graph_objs as go
from plotly.offline import plot
import pandas as pd

In [21]:
eth_pd = eth_df.toPandas()
eth_pd.head(10)

Unnamed: 0,exchange,symbol,time,open,high,low,close
0,Coinbase,ETH-USD,2020-07-31 09:10:00,338.970001,339.0,338.809998,339.0
1,Coinbase,ETH-USD,2020-07-31 09:15:00,339.230011,339.609985,338.899994,339.549988
2,Coinbase,ETH-USD,2020-07-31 09:20:00,339.549988,339.559998,339.079987,339.230011
3,Coinbase,ETH-USD,2020-07-31 09:25:00,339.23999,339.940002,339.230011,339.929993
4,Coinbase,ETH-USD,2020-07-31 09:30:00,339.940002,339.940002,339.119995,339.25
5,Coinbase,ETH-USD,2020-07-31 09:35:00,339.26001,339.399994,338.940002,339.380005
6,Coinbase,ETH-USD,2020-07-31 09:40:00,339.380005,339.380005,336.899994,337.359985
7,Coinbase,ETH-USD,2020-07-31 09:45:00,337.420013,338.049988,337.390015,337.829987
8,Coinbase,ETH-USD,2020-07-31 09:50:00,337.76001,337.76001,337.149994,337.5
9,Coinbase,ETH-USD,2020-07-31 09:55:00,337.5,337.73999,337.5,337.73999


In [22]:
fig = go.Figure(data=[go.Candlestick(x=eth_pd['time'],
                open=eth_pd['open'],
                high=eth_pd['high'],
                low=eth_pd['low'],
                close=eth_pd['close'])])
plot(fig)

'temp-plot.html'