From 34163f8c3ec5f7d819ac7e45c0ef59c06aeaeeab Mon Sep 17 00:00:00 2001 From: Denis Romanov Date: Tue, 16 May 2023 20:07:45 +0300 Subject: [PATCH] Add importer for Kraken downloadable historical data --- actions/generic.py | 4 +-- actions/high.py | 12 ++++---- actions/kraken.py | 75 ++++++++++++++++++++++++++++++++++++++++++++++ common.py | 3 +- conf.py | 23 ++++++++++---- fs.py | 4 +-- 6 files changed, 105 insertions(+), 16 deletions(-) create mode 100644 actions/kraken.py diff --git a/actions/generic.py b/actions/generic.py index 5a61053..ba9dbc0 100644 --- a/actions/generic.py +++ b/actions/generic.py @@ -5,12 +5,12 @@ class Status(Cmd): def run(self, *args: str, **kwargs: Any) -> Tuple[int | None, str | Exception | None]: - raise NotImplementedError + return 1, "not implemented" class GC(Cmd): def run(self, *args: str, **kwargs: Any) -> Tuple[int | None, str | Exception | None]: - raise NotImplementedError + return 1, "not implemented" def getcmd(name: str) -> Tuple[Cmd | None, str | None]: diff --git a/actions/high.py b/actions/high.py index a486011..79969f5 100644 --- a/actions/high.py +++ b/actions/high.py @@ -42,15 +42,15 @@ def __parse_arg(arg: str) -> Tuple[str, str | Exception | None]: @staticmethod def __process_arg(arg: str, symbol: Symbol, store: Store) -> str | Exception | None: try: - df = pd.read_csv(arg, header=0, usecols=range(5)) - except Exception as e: + df = pd.read_csv(arg, skiprows=1, header=None, usecols=range(5)) + except OSError as e: return e if symbol.time is None: - df["Date"] = pd.to_datetime(df["Date"], format="%Y%m%d %H:%M", utc=True) + df[0] = pd.to_datetime(df[0], format="%Y%m%d %H:%M", utc=True) else: - df["Date"] = pd.to_datetime(df["Date"], format="%Y%m%d %H:%M") - df["Date"] = df["Date"].dt.tz_localize(tzinfo(symbol.time)) - for date, gf in df.groupby(df["Date"].dt.date): + df[0] = pd.to_datetime(df[0], format="%Y%m%d %H:%M") + df[0] = df[0].dt.tz_localize(tzinfo(symbol.time)) + for date, gf in df.groupby(df[0].dt.date): err = store.put(Block(symbol.name, symbol.market, cast(datetime.date, date), gf)) if err is not None: return err diff --git a/actions/kraken.py b/actions/kraken.py new file mode 100644 index 0000000..019141c --- /dev/null +++ b/actions/kraken.py @@ -0,0 +1,75 @@ +import datetime +import os +import zipfile +from typing import IO, Any, Tuple, cast + +import pandas as pd + +from common import Cmd, Symbol, p, tzinfo +from fs import Block, Store + + +class Import(Cmd): + def run(self, *args: str, **kwargs: Any) -> Tuple[int | None, str | Exception | None]: + symbols: dict[str, Symbol] | None = kwargs.get("symbols") + if not symbols: + return None, None + path: str | os.PathLike[str] | None = kwargs.get("path") + path = path if path is not None else "" + store = Store(path) + for arg in args: + p(f"Processing {arg}... ", end="") + err = self.__process_arg(arg, symbols, store) + if isinstance(err, FileNotFoundError): + p("file not found") + return 1, None + if err is not None: + p() + return 2, err + p("done.") + return None, None + + def __process_arg(self, arg: str, symbols: dict[str, Symbol], store: Store) -> str | Exception | None: + try: + with zipfile.ZipFile(arg, "r") as z: + for filename in z.namelist(): + sym, ok = self.__parse_filename(filename) + if not ok: + continue + if sym.lower() not in symbols: + continue + with z.open(filename, "r") as f: + err = self.__process_file(f, symbols[sym.lower()], store) + if err is not None: + return err + except (OSError, zipfile.BadZipFile) as e: + return e + return None + + @staticmethod + def __process_file(file: IO[bytes], symbol: Symbol, store: Store) -> str | Exception | None: + df = pd.read_csv(file, header=None, usecols=range(6)) + start = int(symbol.start.timestamp()) if symbol.start is not None else 0 + df = df[df[0] >= start] + df[0] = pd.to_datetime(df[0], unit="s", utc=True) + if symbol.time is not None: + df[0] = df[0].dt.tz_convert(tzinfo(symbol.time)) + for date, gf in df.groupby(df[0].dt.date): + err = store.put(Block(symbol.name, symbol.market, cast(datetime.date, date), gf)) + if err is not None: + return err + return None + + @staticmethod + def __parse_filename(filename: str) -> Tuple[str, bool]: + if not filename.endswith("_1.csv"): + return "", False + return filename.removesuffix("_1.csv"), True + + +def getcmd(name: str) -> Tuple[Cmd | None, str | None]: + match name: + case "import": + return Import(), None + case _: + return None, f"command {name} not found in module {__name__}" diff --git a/common.py b/common.py index 8a92277..575e341 100644 --- a/common.py +++ b/common.py @@ -1,3 +1,4 @@ +import datetime import sys from dataclasses import dataclass from functools import lru_cache @@ -10,7 +11,7 @@ class Symbol: name: str market: str | None = None time: str | None = None - start: str | None = None + start: datetime.datetime | None = None @dataclass diff --git a/conf.py b/conf.py index a30cd15..d7ee546 100644 --- a/conf.py +++ b/conf.py @@ -1,5 +1,6 @@ from __future__ import annotations +import datetime import os from collections.abc import Iterable, Iterator, Mapping from dataclasses import replace @@ -73,21 +74,33 @@ def _walk_symbols( case "name" | "symbols": pass case "market": - if not isinstance(v, str): + if type(v) != str: raise TypeError symbol.market = v case "time": - if not isinstance(v, str): + if type(v) != str: raise TypeError if tzinfo(v) is None: raise Error(f"unknown time zone: {v}") symbol.time = v case "start": - if not isinstance(v, str): + if type(v) == int: + symbol.start = datetime.datetime(v, 1, 1) + elif type(v) == str: + symbol.start = datetime.datetime.fromisoformat(v) + elif type(v) == datetime.date: + symbol.start = datetime.datetime.combine(v, datetime.time.min) + elif type(v) == datetime.datetime: + symbol.start = v + else: raise TypeError - symbol.start = v case _: raise Error(f"unexpected key: {k}") + if symbol.start is not None and symbol.start.tzinfo is None: + if symbol.time is not None: + symbol.start = symbol.start.replace(tzinfo=tzinfo(symbol.time)) + else: + symbol.start = symbol.start.replace(tzinfo=datetime.timezone.utc) if name is not None: yield replace(symbol, name=name) elif symbols is not None: @@ -113,7 +126,7 @@ def _walk_actions( case "name" | "actions": pass case "using": - if not isinstance(v, str): + if type(v) != str: raise TypeError action.using = v case _: diff --git a/fs.py b/fs.py index 46c9a0c..3fe945b 100644 --- a/fs.py +++ b/fs.py @@ -30,13 +30,13 @@ def put(self, block: Block) -> str | Exception | None: df = block.records.copy() def ts(dt: datetime.datetime) -> str: - if dt.tzinfo is None or dt.tzinfo == self._utc: + if dt.tzinfo is None or dt.tzinfo is datetime.timezone.utc or dt.tzinfo is self._utc: return dt.strftime("%Y-%m-%dT%H:%M:%SZ") return dt.isoformat("T", "seconds") df[df.columns[0]] = df.iloc[:, 0].map(ts) if block.market: - df.insert(0, "Symbol", f"{block.symbol}:{block.market}") + df.insert(0, "Symbol", f"{block.market}:{block.symbol}") else: df.insert(0, "Symbol", block.symbol) return self.__store(block, df)