### Import library

In [1]:
# get data
from binance.spot import Spot 

# handle data
import pyspark.sql as ps
from pyspark.sql.functions import from_unixtime,date_format,from_utc_timestamp
from pyspark.sql.types import DateType

# train data
import pandas as pd
import numpy as np
from datetime import datetime,timedelta

# enviroment
import os
from dotenv import load_dotenv
load_dotenv("../env/app.env")

True

### Setup clients

In [2]:
BINANCE_API_KEY = os.environ.get("BINANCE_API_KEY")
BINANCE_API_SECRET = os.environ.get("BINANCE_API_SECRET")
client = Spot(key=BINANCE_API_KEY, secret=BINANCE_API_SECRET)
spark = ps.SparkSession.\
        builder.\
        appName("pyspark-notebook").\
        master("spark://spark-master:7077").\
        config("spark.executor.memory", "4096m").\
        getOrCreate()
spark.sparkContext.setLogLevel("WARN")

22/11/17 15:06:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
DAY_TO_QUERY = 1800
DURATION_EACH_DAY = 60 * 60 * 24 * 1000

### Fetch day period data

**Declare period, symbol and header**

In [10]:
PERIOD = "1d"
symbol = "BTCUSDT"
columns=['open_time', 'open', 'high', 'low', 'close', 'volume', 'close_time', 'quote_asset_volume', 'number_of_trades', 'taker_buy_base_asset_volume', 'taker_buy_quote_asset_volume', 'ignore']

**Store function**

In [11]:
# write dataframe to hdfs with partitions
def writeToHFDS(df, symbol):
    df = df \
        .withColumn("open_time", df["open_time"] / 1000) \
        .withColumn("date", from_unixtime('open_time').cast(DateType()))
    df = df \
        .withColumn("day", date_format(df['date'], 'dd')) \
        .withColumn("month", date_format(df['date'], 'MM')) \
        .withColumn("year", date_format(df['date'], 'yyyy'))

    df.write.option("header",True) \
         .mode("overwrite") \
         .partitionBy("day","month","year") \
         .parquet(f"hdfs://hadoop-namenode:9000/crypto/{symbol}/{PERIOD}.parquet")


**Get single symbol data points**

In [12]:
today = datetime.utcnow().replace(hour=7, minute=0, second=0, microsecond=0)
today_timestamp = int(today.timestamp() * 1000)

start_timestamp = today_timestamp - DAY_TO_QUERY * DURATION_EACH_DAY
end_timestamp = today_timestamp

for timestamp in range(start_timestamp, end_timestamp, DURATION_EACH_DAY * 100):
    data = client.klines(symbol, PERIOD, limit=100, startTime=timestamp, endTime=timestamp + DURATION_EACH_DAY * 100)
    if start_timestamp == timestamp:
        df = spark.createDataFrame(data, schema=columns)
    else:
        df = df.union(spark.createDataFrame(data))

writeToHFDS(df, symbol)

                                                                                

In [13]:
exchanges = client.exchange_info()
symbols = list(map(lambda item: item.get('symbol'), exchanges.get('symbols')))

today = datetime.utcnow().replace(hour=7, minute=0, second=0, microsecond=0)
today_timestamp = int(today.timestamp() * 1000)

start_timestamp = today_timestamp - DAY_TO_QUERY * DURATION_EACH_DAY
end_timestamp = today_timestamp

for symbol in symbols:
    for timestamp in range(start_timestamp, end_timestamp, DURATION_EACH_DAY * 100):
        data = client.klines(symbol, PERIOD, limit=100, startTime=timestamp, endTime=timestamp + DURATION_EACH_DAY * 100)
        if start_timestamp == timestamp:
            df = spark.createDataFrame(data, schema=columns)
        else:
            df = df.union(spark.createDataFrame(data))

    writeToHFDS(df, symbol)

[Stage 2:>                                                        (0 + 8) / 144]

KeyboardInterrupt: 



In [15]:
spark.stop()